[MERGE] Updated sftp_readv

Vincent Ladeuil v.ladeuil+lp at free.fr
Thu Dec 20 15:21:49 GMT 2007


>>>>> "john" == John Arbash Meinel <john at arbash-meinel.com> writes:

    john> This changes the sftp readv code a little more. It
    john> reduces the amount of buffering it needs to do. So it
    john> no longer needs the complete collapsed section before
    john> it can start returning data.

Ok.

<snip/>

    john> Interestingly enough, there seems to be a small bug in
    john> our sftp handling. When using "sftp://localhost" I
    john> occasionally get: File
    john> "/var/lib/python-support/python2.5/paramiko/sftp_file.py",
    john> line 131, in _read_prefetch self.sftp._read_response()
    john> File
    john> "/var/lib/python-support/python2.5/paramiko/sftp_client.py",
    john> line 604, in _read_response raise SSHException('Server
    john> connection dropped: %s' % (str(e),))

    john> It is happening with both the new code and with
    john> bzr.dev. My best guess is just that we have actually
    john> streamed all the data already, and the ssh subsystem
    john> has quit on us because it isn't needed anymore, but I'm
    john> not really sure (this is with paramiko 1.6.4).

Could it be some data left unread in the buffers (wild guess) ?
Since you're coalescing, you may end up requesting more than the
caller will consume.

    john> Anyway, I would really like to get back to the 10 other
    john> patches I need to clean up and get submitted. I didn't
    john> write any new tests, and the code could use a bit more
    john> testing since it passed the test suite before my last
    john> round of corrections. As it stands it is a lot better,
    john> but if someone wants to take over and write some unit
    john> tests, I'm happy to assist.

My honest queue is stretched beyond reason already, so I'll review
that patch but will not take it further.

    john> John
    john> =:->
    john> # Bazaar merge directive format 2 (Bazaar 0.90)
    john> # revision_id: john at arbash-meinel.com-20071217233848-pq8zo3fyr9yt1rc1
    john> # target_branch: http://bazaar-vcs.org/bzr/bzr.dev
    john> # testament_sha1: 97db87f47286081baf87146365e0b3cb12538140
    john> # timestamp: 2007-12-18 08:40:42 -0600
    john> # source_branch: http://bzr.arbash-meinel.com/branches/bzr/1.1-\
    john> #   dev/sftp_chunked
    john> # base_revision_id: pqm at pqm.ubuntu.com-20071217060447-sictlq5nibqhpuec
    john> # 
    john> # Begin patch
    john> === modified file 'bzrlib/errors.py'
    john> --- bzrlib/errors.py	2007-12-15 16:04:18 +0000
    john> +++ bzrlib/errors.py	2007-12-17 16:56:33 +0000
    john> @@ -570,6 +570,16 @@
    john>          self.actual = actual
 
 
    john> +class OverlappingReadv(BzrError):
    john> +    """Raised when a readv() requests overlapping chunks of data.
    john> +
    john> +    Not all transports supports this, so the api should generally forbid it.
    john> +    (It isn't a feature we need anyway.

')'

I think we should take a decision on that point and also
includes/excludes the ability to specify duplicate offsets to
readv.

I can't imagine use cases for neither functionality, so I'd vote
to get rid of them.I f needed, the caller can handle such cases
but at the transport level and given the complexity of the
actuals implementationS, I'd prefer that we simply exclude them.

    john> +    """
    john> +
    john> +    _fmt = 'Requested readv ranges overlap'
    john> +
    john> +
    john>  class PathNotChild(PathError):
 
    john>      _fmt = 'Path "%(path)s" is not a child of path "%(base)s"%(extra)s'

    john> === modified file 'bzrlib/transport/__init__.py'
    john> --- bzrlib/transport/__init__.py	2007-12-08 23:15:18 +0000
    john> +++ bzrlib/transport/__init__.py	2007-12-17 16:56:33 +0000
    john> @@ -792,7 +792,8 @@
    john>          return offsets
 
    john>      @staticmethod
    john> -    def _coalesce_offsets(offsets, limit=0, fudge_factor=0, max_size=0):
    john> +    def _coalesce_offsets(offsets, limit=0, fudge_factor=0, max_size=0,
    john> +                          allow_overlap=False):
    john>          """Yield coalesced offsets.
 
    john>          With a long list of neighboring requests, combine them
    john> @@ -801,27 +802,26 @@
    john>          Turns  [(15, 10), (25, 10)] => [(15, 20, [(0, 10), (10, 10)])]
 
    john>          :param offsets: A list of (start, length) pairs
    john> -
    john>          :param limit: Only combine a maximum of this many pairs Some transports
    john>                  penalize multiple reads more than others, and sometimes it is
    john>                  better to return early.
    john>                  0 means no limit
    john> -
    john>          :param fudge_factor: All transports have some level of 'it is
    john> -                better to read some more data and throw it away rather 
    john> +                better to read some more data and throw it away rather
    john>                  than seek', so collapse if we are 'close enough'
    john> -
    john>          :param max_size: Create coalesced offsets no bigger than this size.
    john>                  When a single offset is bigger than 'max_size', it will keep
    john>                  its size and be alone in the coalesced offset.
    john>                  0 means no maximum size.
    john> -
    john> -        :return: yield _CoalescedOffset objects, which have members for where
    john> -                to start, how much to read, and how to split those 
    john> -                chunks back up
    john> +        :param allow_overlap: If False, raise an error if requested ranges
    john> +            overlap.
    john> +        :return: return a list of _CoalescedOffset objects, which have members
    john> +            for where to start, how much to read, and how to split those chunks
    john> +            back up
    john>          """
    john>          last_end = None
    john>          cur = _CoalescedOffset(None, None, [])
    john> +        coalesced_offsets = []
 
Why did you delete the iterator behavior ? You still iterate the
result in the caller.

    john>          for start, size in offsets:
    john>              end = start + size
    john> @@ -830,18 +830,19 @@
    john>                  and start >= cur.start
    john>                  and (limit <= 0 or len(cur.ranges) < limit)
    john>                  and (max_size <= 0 or end - cur.start <= max_size)):
    john> +                if not allow_overlap and start < last_end:
    john> +                    raise errors.OverlappingReadv()
    john>                  cur.length = end - cur.start
    john>                  cur.ranges.append((start-cur.start, size))
    john>              else:
    john>                  if cur.start is not None:
    john> -                    yield cur
    john> +                    coalesced_offsets.append(cur)
    john>                  cur = _CoalescedOffset(start, size, [(0, size)])
    john>              last_end = end
 
    john>          if cur.start is not None:
    john> -            yield cur
    john> -
    john> -        return
    john> +            coalesced_offsets.append(cur)
    john> +        return coalesced_offsets
 
    john>      def get_multi(self, relpaths, pb=None):
    john>          """Get a list of file-like objects, one for each entry in relpaths.

    john> === modified file 'bzrlib/transport/sftp.py'
    john> --- bzrlib/transport/sftp.py	2007-10-24 18:19:51 +0000
    john> +++ bzrlib/transport/sftp.py	2007-12-17 23:38:48 +0000

I think this method was already to big and too complex, you made
things worse ;-)


    john> @@ -328,34 +328,74 @@
    john>          mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
    john>                  len(offsets), len(coalesced), len(requests))
 
    john> -        # Queue the current read until we have read the full coalesced section
    john> -        cur_data = []
    john> -        cur_data_len = 0
    john> +        # We have several layers of indirection, so lets try to list them all.
    john> +        # 1) The requested offsets from the user, which are in (start, length)
    john> +        #    pairs starting at the beginning of the file.
    john> +        # 2) CoalescedOffsets which combine those absolute locations, and give
    john> +        #    an overall start and length in absolute terms, and then a list of
    john> +        #    ranges offset by CoalescedOffsets.start.
    john> +        # 3) For large Coalesced ranges, we further split them up into 32KB
    john> +        #    chunks (sftp protocol minimum supported size).
    john> +        # 4) As we read data back from the requests keep track of where we are
    john> +        #    and return any data that we can.
    john> +        # Keep track of the currently coalesced data
    john> +        buffered_data = ''
    john> +        buffered_start = 0
    john>          cur_coalesced_stack = iter(coalesced)
    john>          cur_coalesced = cur_coalesced_stack.next()
    john> +        buffered_start = cur_coalesced.start
 
    john> -        # Cache the results, but only until they have been fulfilled
    john> +        # Cache the results, but only until they have been fulfilled, this
    john> +        # mostly just stores the out-of-order data

That comment *really* helps (It would have help me a lot when I
first read the code).

    john>          data_map = {}
    john>          # turn the list of offsets into a stack
    john>          offset_stack = iter(offsets)
    john>          cur_offset_and_size = offset_stack.next()
 
    john>          for data in fp.readv(requests):
    john> -            cur_data += data
    john> -            cur_data_len += len(data)
    john> -
    john> -            if cur_data_len < cur_coalesced.length:
    john> -                continue
    john> -            assert cur_data_len == cur_coalesced.length, \
    john> -                "Somehow we read too much: %s != %s" % (cur_data_len,
    john> -                                                        cur_coalesced.length)
    john> -            all_data = ''.join(cur_data)
    john> -            cur_data = []
    john> -            cur_data_len = 0
    john> -
    john> +            # TODO: jam 20071217 We could buffer into a list, and then handle
    john> +            #       the sub-chunks, rather than buffering into a string which
    john> +            #       needs to be repeatedly memory allocated. However, it makes
    john> +            #       the processing code a lot more complex (you have to build
    john> +            #       another list with possibly parts of these buffered
    john> +            #       sections, etc)

Your complexity daemon warns you too I see :)

    john> +            buffered_data += data
    john> +            buffered_end = buffered_start + len(buffered_data)
    john> +
    john> +            # Check to see if this is enough data to satisfy some of the
    john> +            # coalesced subranges
    john> +
    john> +            new_ranges = []
    john>              for suboffset, subsize in cur_coalesced.ranges:
    john> -                key = (cur_coalesced.start+suboffset, subsize)
    john> -                data_map[key] = all_data[suboffset:suboffset+subsize]
    john> +                start = cur_coalesced.start + suboffset
    john> +                end = start + subsize
    john> +                if buffered_start > start:
    john> +                    raise AssertionError('We messed up. The data we are'
    john> +                                         ' reading is starting after the'
    john> +                                         ' point of the sub offset')


I would rather see that exercised in a test. As it is, it makes
the code flow harder to follow.

    john> +                if (buffered_start < start
    john> +                    or buffered_end < end):
    john> +                    # Either we haven't started reading, or we haven't finished
    john> +                    new_ranges.append((suboffset, subsize))
    john> +                    continue
    john> +

I think *this* part is the hardest to understand, see below for
an alternative design.

    john> +                # We have read enough data, collect it into a single string
    john> +                buf_start = start - buffered_start
    john> +                buf_end = buf_start + subsize
    john> +                data = buffered_data[buf_start:buf_end]
    john> +
    john> +                # Because this data won't be repeated, shrink the buffer
    john> +                buffered_data = buffered_data[buf_end:]
    john> +                buffered_start += buf_end
    john> +

You silently lose the ability to handle duplicate offsets.

    john> +                # Is this exactly the next requested data chunk?
    john> +                key = (start, subsize)
    john> +                if key == cur_offset_and_size:
    john> +                    yield start, data
    john> +                    cur_offset_and_size = offset_stack.next()
    john> +                else:
    john> +                    data_map[key] = data
    john> +            cur_coalesced.ranges = new_ranges
 
Again, walking several iterators in parallel make the code hard
to read.

    john>              # Now that we've read some data, see if we can yield anything back
    john>              while cur_offset_and_size in data_map:
    john> @@ -363,15 +403,24 @@
    john>                  yield cur_offset_and_size[0], this_data
    john>                  cur_offset_and_size = offset_stack.next()
 
    john> -            # We read a coalesced entry, so mark it as done
    john> -            cur_coalesced = None
    john> -            # Now that we've read all of the data for this coalesced section
    john> -            # on to the next
    john> -            cur_coalesced = cur_coalesced_stack.next()
    john> +            if buffered_end == cur_coalesced.start + cur_coalesced.length:
    john> +                # We reached the end of this section, go on to the next
    john> +                try:
    john> +                    cur_coalesced = cur_coalesced_stack.next()
    john> +                except StopIteration:
    john> +                    cur_coalesced = None
    john> +                if buffered_data:
    john> +                    # Why do we have buffered data left
    john> +                    import pdb; pdb.set_trace()

<cough>

    john> +                buffered_data = ''
    john> +                buffered_start = cur_coalesced.start
    john> +            elif buffered_end >= cur_coalesced.start + cur_coalesced.length:
    john> +                raise AssertionError('Somehow we read too much data: %s > %s'
    john> +                    % (buffered_end, cur_coalesced.start + cur_coalesced.length))
 
Same here, if that could be documented and exercised in a test,
the resulting code would be clearer.

    john>          if cur_coalesced is not None:
    john>              raise errors.ShortReadvError(relpath, cur_coalesced.start,
    john> -                cur_coalesced.length, len(data))
    john> +                cur_coalesced.length, len(buffered_data))
 
    john>      def put_file(self, relpath, f, mode=None):
    john>          """

Overall, you address the problem (but Vlad said it crashes later
anyway) but increase the complexity of a already complex method.

Having addressed the problem in a different way for http, I may
not be impartial but I think this could redesigned differently.

Basically the problem we want to address is:

- read parts of a file,
- by issuing several requests,
- each request can be more or less than a requested file part,
- concatenate requests when needed,
- yield requests offsets

While:
- caching out-of-order parts,
- minimizing latency without overflowing lower level buffers.

I think we can do that by:
- coalesce_offsets (as done now, with parameters Rightly tuned)
- prepare the sftp requests,
- issue a reasonable amount of requests,
- start processing received requests
- below a threshold, issue more requests

By introducing a intermediate iterator, the _sftp_readv method
may be simplified and the intermediate iterator can then find
some balance between requests received which consume the lower
level buffers and pending requests that still need to be sent.

In another mail you said:

    john> for start in xrange(0, len(ranges), _max_ranges):
    john>   for data in fp.readv(ranges[start:start+_max_ranges]):
    john>     ...

    john> As you noticed, we were sending 15k async requests,
    john> which seems a bit abusive.  This would limit it to
    john> 1000, at a time, which gives us 32KB*1000 = 32MB in
    john> flight. Which sounds like enough to fill buffers. There
    john> will still be 1/1000*num_ranges more round trips than
    john> before, but it seems like a reasonable way to cap the
    john> total number of requests on the wire.

The design above is about doing what you propose except :

   for data in self._buffered_requests(ranges, self._max_ranges)

where 'ranges' is really 'requests' and we may want to replace
'_max_ranges' by '_max_size', since, after all, that's what we
care about.

In summary, on three main criteria:
- Doesn't reduce test coverage
- Doesn't reduce design clarity
- Improves bugs, features, speed, or code simplicity.

Err, I think you succeed only on part of the third: improve bugs
;-)

Hope that helps,

     Vincent

P.S.: Since this is my first real review, don't hesitate to
comment the review itself.



More information about the bazaar mailing list