[MERGE] Updated sftp_readv

John Arbash Meinel john at arbash-meinel.com
Thu Dec 20 15:42:02 GMT 2007


-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

Vincent Ladeuil wrote:

...

> I think we should take a decision on that point and also
> includes/excludes the ability to specify duplicate offsets to
> readv.
> 
> I can't imagine use cases for neither functionality, so I'd vote
> to get rid of them.I f needed, the caller can handle such cases
> but at the transport level and given the complexity of the
> actuals implementationS, I'd prefer that we simply exclude them.

Well, I did set the default to allow_overlap=False. And it turns out that the
overlap check will also disallow duplicate ranges.


...

>     john>          last_end = None
>     john>          cur = _CoalescedOffset(None, None, [])
>     john> +        coalesced_offsets = []
>  
> Why did you delete the iterator behavior ? You still iterate the
> result in the caller.

Because all callers actually use a list. If you look at the sftp code, it
actually casts it into a list, and wraps a different iterator around it. We
need the data 2 times, so we have to use a list. The second iterator is just
rather than keeping an integer of which offset we are on.

> 
>     john>          for start, size in offsets:
>     john>              end = start + size
>     john> @@ -830,18 +830,19 @@
>     john>                  and start >= cur.start
>     john>                  and (limit <= 0 or len(cur.ranges) < limit)
>     john>                  and (max_size <= 0 or end - cur.start <= max_size)):
>     john> +                if not allow_overlap and start < last_end:
>     john> +                    raise errors.OverlappingReadv()
>     john>                  cur.length = end - cur.start
>     john>                  cur.ranges.append((start-cur.start, size))
>     john>              else:
>     john>                  if cur.start is not None:
>     john> -                    yield cur
>     john> +                    coalesced_offsets.append(cur)
>     john>                  cur = _CoalescedOffset(start, size, [(0, size)])
>     john>              last_end = end
>  
>     john>          if cur.start is not None:
>     john> -            yield cur
>     john> -
>     john> -        return
>     john> +            coalesced_offsets.append(cur)
>     john> +        return coalesced_offsets
>  
>     john>      def get_multi(self, relpaths, pb=None):
>     john>          """Get a list of file-like objects, one for each entry in relpaths.
> 
>     john> === modified file 'bzrlib/transport/sftp.py'
>     john> --- bzrlib/transport/sftp.py	2007-10-24 18:19:51 +0000
>     john> +++ bzrlib/transport/sftp.py	2007-12-17 23:38:48 +0000
> 
> I think this method was already to big and too complex, you made
> things worse ;-)

Are you talking about _coalesce_offsets or _sftp_readv?
> 
> 
>     john> @@ -328,34 +328,74 @@
>     john>          mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
>     john>                  len(offsets), len(coalesced), len(requests))
>  
>     john> -        # Queue the current read until we have read the full coalesced section
>     john> -        cur_data = []
>     john> -        cur_data_len = 0
>     john> +        # We have several layers of indirection, so lets try to list them all.
>     john> +        # 1) The requested offsets from the user, which are in (start, length)
>     john> +        #    pairs starting at the beginning of the file.
>     john> +        # 2) CoalescedOffsets which combine those absolute locations, and give
>     john> +        #    an overall start and length in absolute terms, and then a list of
>     john> +        #    ranges offset by CoalescedOffsets.start.
>     john> +        # 3) For large Coalesced ranges, we further split them up into 32KB
>     john> +        #    chunks (sftp protocol minimum supported size).
>     john> +        # 4) As we read data back from the requests keep track of where we are
>     john> +        #    and return any data that we can.
>     john> +        # Keep track of the currently coalesced data
>     john> +        buffered_data = ''
>     john> +        buffered_start = 0
>     john>          cur_coalesced_stack = iter(coalesced)
>     john>          cur_coalesced = cur_coalesced_stack.next()
>     john> +        buffered_start = cur_coalesced.start
>  
>     john> -        # Cache the results, but only until they have been fulfilled
>     john> +        # Cache the results, but only until they have been fulfilled, this
>     john> +        # mostly just stores the out-of-order data
> 
> That comment *really* helps (It would have help me a lot when I
> first read the code).
> 
>     john>          data_map = {}
>     john>          # turn the list of offsets into a stack
>     john>          offset_stack = iter(offsets)
>     john>          cur_offset_and_size = offset_stack.next()
>  
>     john>          for data in fp.readv(requests):
>     john> -            cur_data += data
>     john> -            cur_data_len += len(data)
>     john> -
>     john> -            if cur_data_len < cur_coalesced.length:
>     john> -                continue
>     john> -            assert cur_data_len == cur_coalesced.length, \
>     john> -                "Somehow we read too much: %s != %s" % (cur_data_len,
>     john> -                                                        cur_coalesced.length)
>     john> -            all_data = ''.join(cur_data)
>     john> -            cur_data = []
>     john> -            cur_data_len = 0
>     john> -
>     john> +            # TODO: jam 20071217 We could buffer into a list, and then handle
>     john> +            #       the sub-chunks, rather than buffering into a string which
>     john> +            #       needs to be repeatedly memory allocated. However, it makes
>     john> +            #       the processing code a lot more complex (you have to build
>     john> +            #       another list with possibly parts of these buffered
>     john> +            #       sections, etc)
> 
> Your complexity daemon warns you too I see :)

Actually, I had started to try and implement it that way. It was going to take
me too long to get all the bits right, and doing "data = buffer[start:end]" is
a whole lot simpler than trying to do the same thing from a list.

> 
>     john> +            buffered_data += data
>     john> +            buffered_end = buffered_start + len(buffered_data)
>     john> +
>     john> +            # Check to see if this is enough data to satisfy some of the
>     john> +            # coalesced subranges
>     john> +
>     john> +            new_ranges = []
>     john>              for suboffset, subsize in cur_coalesced.ranges:
>     john> -                key = (cur_coalesced.start+suboffset, subsize)
>     john> -                data_map[key] = all_data[suboffset:suboffset+subsize]
>     john> +                start = cur_coalesced.start + suboffset
>     john> +                end = start + subsize
>     john> +                if buffered_start > start:
>     john> +                    raise AssertionError('We messed up. The data we are'
>     john> +                                         ' reading is starting after the'
>     john> +                                         ' point of the sub offset')
> 
> 
> I would rather see that exercised in a test. As it is, it makes
> the code flow harder to follow.

It shouldn't be possible from the outside. So it isn't something we can
actually test.

I could take the if statement out completely, though it did help before I got
the algorithm correct.

> 
>     john> +                if (buffered_start < start
>     john> +                    or buffered_end < end):
>     john> +                    # Either we haven't started reading, or we haven't finished
>     john> +                    new_ranges.append((suboffset, subsize))
>     john> +                    continue
>     john> +
> 
> I think *this* part is the hardest to understand, see below for
> an alternative design.
> 
>     john> +                # We have read enough data, collect it into a single string
>     john> +                buf_start = start - buffered_start
>     john> +                buf_end = buf_start + subsize
>     john> +                data = buffered_data[buf_start:buf_end]
>     john> +
>     john> +                # Because this data won't be repeated, shrink the buffer
>     john> +                buffered_data = buffered_data[buf_end:]
>     john> +                buffered_start += buf_end
>     john> +
> 
> You silently lose the ability to handle duplicate offsets.
> 
>     john> +                # Is this exactly the next requested data chunk?
>     john> +                key = (start, subsize)
>     john> +                if key == cur_offset_and_size:
>     john> +                    yield start, data
>     john> +                    cur_offset_and_size = offset_stack.next()
>     john> +                else:
>     john> +                    data_map[key] = data
>     john> +            cur_coalesced.ranges = new_ranges
>  
> Again, walking several iterators in parallel make the code hard
> to read.

For HTTP you pushed the complexity down into a helper (response file). Which
might be a reasonable way to handle it here.

> 
>     john>              # Now that we've read some data, see if we can yield anything back
>     john>              while cur_offset_and_size in data_map:
>     john> @@ -363,15 +403,24 @@
>     john>                  yield cur_offset_and_size[0], this_data
>     john>                  cur_offset_and_size = offset_stack.next()
>  
>     john> -            # We read a coalesced entry, so mark it as done
>     john> -            cur_coalesced = None
>     john> -            # Now that we've read all of the data for this coalesced section
>     john> -            # on to the next
>     john> -            cur_coalesced = cur_coalesced_stack.next()
>     john> +            if buffered_end == cur_coalesced.start + cur_coalesced.length:
>     john> +                # We reached the end of this section, go on to the next
>     john> +                try:
>     john> +                    cur_coalesced = cur_coalesced_stack.next()
>     john> +                except StopIteration:
>     john> +                    cur_coalesced = None
>     john> +                if buffered_data:
>     john> +                    # Why do we have buffered data left
>     john> +                    import pdb; pdb.set_trace()
> 
> <cough>
> 
>     john> +                buffered_data = ''
>     john> +                buffered_start = cur_coalesced.start
>     john> +            elif buffered_end >= cur_coalesced.start + cur_coalesced.length:
>     john> +                raise AssertionError('Somehow we read too much data: %s > %s'
>     john> +                    % (buffered_end, cur_coalesced.start + cur_coalesced.length))
>  
> Same here, if that could be documented and exercised in a test,
> the resulting code would be clearer.

Well, the test was already present, and it is something that depends on
"fp.readv()" returning more data than we requested. Which would be the fault of
Paramiko.

I'm not really sure how you would document and exercise it in a test. But if
you really did want to test it, then you need code to handle it. Which means
that you still have this elif and raise an exception here.

So I'm not sure how it would simplify the cdoe.

> 
>     john>          if cur_coalesced is not None:
>     john>              raise errors.ShortReadvError(relpath, cur_coalesced.start,
>     john> -                cur_coalesced.length, len(data))
>     john> +                cur_coalesced.length, len(buffered_data))
>  
>     john>      def put_file(self, relpath, f, mode=None):
>     john>          """
> 
> Overall, you address the problem (but Vlad said it crashes later
> anyway) but increase the complexity of a already complex method.
> 
> Having addressed the problem in a different way for http, I may
> not be impartial but I think this could redesigned differently.
> 
> Basically the problem we want to address is:
> 
> - read parts of a file,
> - by issuing several requests,
> - each request can be more or less than a requested file part,
> - concatenate requests when needed,
> - yield requests offsets
> 
> While:
> - caching out-of-order parts,
> - minimizing latency without overflowing lower level buffers.
> 
> I think we can do that by:
> - coalesce_offsets (as done now, with parameters Rightly tuned)
> - prepare the sftp requests,
> - issue a reasonable amount of requests,
> - start processing received requests
> - below a threshold, issue more requests
> 
> By introducing a intermediate iterator, the _sftp_readv method
> may be simplified and the intermediate iterator can then find
> some balance between requests received which consume the lower
> level buffers and pending requests that still need to be sent.
> 
> In another mail you said:
> 
>     john> for start in xrange(0, len(ranges), _max_ranges):
>     john>   for data in fp.readv(ranges[start:start+_max_ranges]):
>     john>     ...
> 
>     john> As you noticed, we were sending 15k async requests,
>     john> which seems a bit abusive.  This would limit it to
>     john> 1000, at a time, which gives us 32KB*1000 = 32MB in
>     john> flight. Which sounds like enough to fill buffers. There
>     john> will still be 1/1000*num_ranges more round trips than
>     john> before, but it seems like a reasonable way to cap the
>     john> total number of requests on the wire.
> 
> The design above is about doing what you propose except :
> 
>    for data in self._buffered_requests(ranges, self._max_ranges)
> 
> where 'ranges' is really 'requests' and we may want to replace
> '_max_ranges' by '_max_size', since, after all, that's what we
> care about.
> 
> In summary, on three main criteria:
> - Doesn't reduce test coverage
> - Doesn't reduce design clarity
> - Improves bugs, features, speed, or code simplicity.
> 
> Err, I think you succeed only on part of the third: improve bugs
> ;-)

Well, I would argue it doesn't improve a bug, but features/speed. :)

> 
> Hope that helps,
> 
>      Vincent
> 
> P.S.: Since this is my first real review, don't hesitate to
> comment the review itself.
> 

I think you did very well. You also remembered to think outside of just what
was written, which is what I find the hardest to do when reviewing.

John
=:->
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.7 (Darwin)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org

iD8DBQFHao1JJdeBCYSNAAMRAiHOAJ9LfUL7uaCTb0vm8TNCDIUesqhs6QCg2PKy
h60b1qNtVrLWZf6pEdBChYU=
=9FPu
-----END PGP SIGNATURE-----



More information about the bazaar mailing list