[MERGE] Updated sftp_readv
Vincent Ladeuil
v.ladeuil+lp at free.fr
Thu Dec 20 15:21:49 GMT 2007
>>>>> "john" == John Arbash Meinel <john at arbash-meinel.com> writes:
john> This changes the sftp readv code a little more. It
john> reduces the amount of buffering it needs to do. So it
john> no longer needs the complete collapsed section before
john> it can start returning data.
Ok.
<snip/>
john> Interestingly enough, there seems to be a small bug in
john> our sftp handling. When using "sftp://localhost" I
john> occasionally get: File
john> "/var/lib/python-support/python2.5/paramiko/sftp_file.py",
john> line 131, in _read_prefetch self.sftp._read_response()
john> File
john> "/var/lib/python-support/python2.5/paramiko/sftp_client.py",
john> line 604, in _read_response raise SSHException('Server
john> connection dropped: %s' % (str(e),))
john> It is happening with both the new code and with
john> bzr.dev. My best guess is just that we have actually
john> streamed all the data already, and the ssh subsystem
john> has quit on us because it isn't needed anymore, but I'm
john> not really sure (this is with paramiko 1.6.4).
Could it be some data left unread in the buffers (wild guess) ?
Since you're coalescing, you may end up requesting more than the
caller will consume.
john> Anyway, I would really like to get back to the 10 other
john> patches I need to clean up and get submitted. I didn't
john> write any new tests, and the code could use a bit more
john> testing since it passed the test suite before my last
john> round of corrections. As it stands it is a lot better,
john> but if someone wants to take over and write some unit
john> tests, I'm happy to assist.
My honest queue is stretched beyond reason already, so I'll review
that patch but will not take it further.
john> John
john> =:->
john> # Bazaar merge directive format 2 (Bazaar 0.90)
john> # revision_id: john at arbash-meinel.com-20071217233848-pq8zo3fyr9yt1rc1
john> # target_branch: http://bazaar-vcs.org/bzr/bzr.dev
john> # testament_sha1: 97db87f47286081baf87146365e0b3cb12538140
john> # timestamp: 2007-12-18 08:40:42 -0600
john> # source_branch: http://bzr.arbash-meinel.com/branches/bzr/1.1-\
john> # dev/sftp_chunked
john> # base_revision_id: pqm at pqm.ubuntu.com-20071217060447-sictlq5nibqhpuec
john> #
john> # Begin patch
john> === modified file 'bzrlib/errors.py'
john> --- bzrlib/errors.py 2007-12-15 16:04:18 +0000
john> +++ bzrlib/errors.py 2007-12-17 16:56:33 +0000
john> @@ -570,6 +570,16 @@
john> self.actual = actual
john> +class OverlappingReadv(BzrError):
john> + """Raised when a readv() requests overlapping chunks of data.
john> +
john> + Not all transports supports this, so the api should generally forbid it.
john> + (It isn't a feature we need anyway.
')'
I think we should take a decision on that point and also
includes/excludes the ability to specify duplicate offsets to
readv.
I can't imagine use cases for neither functionality, so I'd vote
to get rid of them.I f needed, the caller can handle such cases
but at the transport level and given the complexity of the
actuals implementationS, I'd prefer that we simply exclude them.
john> + """
john> +
john> + _fmt = 'Requested readv ranges overlap'
john> +
john> +
john> class PathNotChild(PathError):
john> _fmt = 'Path "%(path)s" is not a child of path "%(base)s"%(extra)s'
john> === modified file 'bzrlib/transport/__init__.py'
john> --- bzrlib/transport/__init__.py 2007-12-08 23:15:18 +0000
john> +++ bzrlib/transport/__init__.py 2007-12-17 16:56:33 +0000
john> @@ -792,7 +792,8 @@
john> return offsets
john> @staticmethod
john> - def _coalesce_offsets(offsets, limit=0, fudge_factor=0, max_size=0):
john> + def _coalesce_offsets(offsets, limit=0, fudge_factor=0, max_size=0,
john> + allow_overlap=False):
john> """Yield coalesced offsets.
john> With a long list of neighboring requests, combine them
john> @@ -801,27 +802,26 @@
john> Turns [(15, 10), (25, 10)] => [(15, 20, [(0, 10), (10, 10)])]
john> :param offsets: A list of (start, length) pairs
john> -
john> :param limit: Only combine a maximum of this many pairs Some transports
john> penalize multiple reads more than others, and sometimes it is
john> better to return early.
john> 0 means no limit
john> -
john> :param fudge_factor: All transports have some level of 'it is
john> - better to read some more data and throw it away rather
john> + better to read some more data and throw it away rather
john> than seek', so collapse if we are 'close enough'
john> -
john> :param max_size: Create coalesced offsets no bigger than this size.
john> When a single offset is bigger than 'max_size', it will keep
john> its size and be alone in the coalesced offset.
john> 0 means no maximum size.
john> -
john> - :return: yield _CoalescedOffset objects, which have members for where
john> - to start, how much to read, and how to split those
john> - chunks back up
john> + :param allow_overlap: If False, raise an error if requested ranges
john> + overlap.
john> + :return: return a list of _CoalescedOffset objects, which have members
john> + for where to start, how much to read, and how to split those chunks
john> + back up
john> """
john> last_end = None
john> cur = _CoalescedOffset(None, None, [])
john> + coalesced_offsets = []
Why did you delete the iterator behavior ? You still iterate the
result in the caller.
john> for start, size in offsets:
john> end = start + size
john> @@ -830,18 +830,19 @@
john> and start >= cur.start
john> and (limit <= 0 or len(cur.ranges) < limit)
john> and (max_size <= 0 or end - cur.start <= max_size)):
john> + if not allow_overlap and start < last_end:
john> + raise errors.OverlappingReadv()
john> cur.length = end - cur.start
john> cur.ranges.append((start-cur.start, size))
john> else:
john> if cur.start is not None:
john> - yield cur
john> + coalesced_offsets.append(cur)
john> cur = _CoalescedOffset(start, size, [(0, size)])
john> last_end = end
john> if cur.start is not None:
john> - yield cur
john> -
john> - return
john> + coalesced_offsets.append(cur)
john> + return coalesced_offsets
john> def get_multi(self, relpaths, pb=None):
john> """Get a list of file-like objects, one for each entry in relpaths.
john> === modified file 'bzrlib/transport/sftp.py'
john> --- bzrlib/transport/sftp.py 2007-10-24 18:19:51 +0000
john> +++ bzrlib/transport/sftp.py 2007-12-17 23:38:48 +0000
I think this method was already to big and too complex, you made
things worse ;-)
john> @@ -328,34 +328,74 @@
john> mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
john> len(offsets), len(coalesced), len(requests))
john> - # Queue the current read until we have read the full coalesced section
john> - cur_data = []
john> - cur_data_len = 0
john> + # We have several layers of indirection, so lets try to list them all.
john> + # 1) The requested offsets from the user, which are in (start, length)
john> + # pairs starting at the beginning of the file.
john> + # 2) CoalescedOffsets which combine those absolute locations, and give
john> + # an overall start and length in absolute terms, and then a list of
john> + # ranges offset by CoalescedOffsets.start.
john> + # 3) For large Coalesced ranges, we further split them up into 32KB
john> + # chunks (sftp protocol minimum supported size).
john> + # 4) As we read data back from the requests keep track of where we are
john> + # and return any data that we can.
john> + # Keep track of the currently coalesced data
john> + buffered_data = ''
john> + buffered_start = 0
john> cur_coalesced_stack = iter(coalesced)
john> cur_coalesced = cur_coalesced_stack.next()
john> + buffered_start = cur_coalesced.start
john> - # Cache the results, but only until they have been fulfilled
john> + # Cache the results, but only until they have been fulfilled, this
john> + # mostly just stores the out-of-order data
That comment *really* helps (It would have help me a lot when I
first read the code).
john> data_map = {}
john> # turn the list of offsets into a stack
john> offset_stack = iter(offsets)
john> cur_offset_and_size = offset_stack.next()
john> for data in fp.readv(requests):
john> - cur_data += data
john> - cur_data_len += len(data)
john> -
john> - if cur_data_len < cur_coalesced.length:
john> - continue
john> - assert cur_data_len == cur_coalesced.length, \
john> - "Somehow we read too much: %s != %s" % (cur_data_len,
john> - cur_coalesced.length)
john> - all_data = ''.join(cur_data)
john> - cur_data = []
john> - cur_data_len = 0
john> -
john> + # TODO: jam 20071217 We could buffer into a list, and then handle
john> + # the sub-chunks, rather than buffering into a string which
john> + # needs to be repeatedly memory allocated. However, it makes
john> + # the processing code a lot more complex (you have to build
john> + # another list with possibly parts of these buffered
john> + # sections, etc)
Your complexity daemon warns you too I see :)
john> + buffered_data += data
john> + buffered_end = buffered_start + len(buffered_data)
john> +
john> + # Check to see if this is enough data to satisfy some of the
john> + # coalesced subranges
john> +
john> + new_ranges = []
john> for suboffset, subsize in cur_coalesced.ranges:
john> - key = (cur_coalesced.start+suboffset, subsize)
john> - data_map[key] = all_data[suboffset:suboffset+subsize]
john> + start = cur_coalesced.start + suboffset
john> + end = start + subsize
john> + if buffered_start > start:
john> + raise AssertionError('We messed up. The data we are'
john> + ' reading is starting after the'
john> + ' point of the sub offset')
I would rather see that exercised in a test. As it is, it makes
the code flow harder to follow.
john> + if (buffered_start < start
john> + or buffered_end < end):
john> + # Either we haven't started reading, or we haven't finished
john> + new_ranges.append((suboffset, subsize))
john> + continue
john> +
I think *this* part is the hardest to understand, see below for
an alternative design.
john> + # We have read enough data, collect it into a single string
john> + buf_start = start - buffered_start
john> + buf_end = buf_start + subsize
john> + data = buffered_data[buf_start:buf_end]
john> +
john> + # Because this data won't be repeated, shrink the buffer
john> + buffered_data = buffered_data[buf_end:]
john> + buffered_start += buf_end
john> +
You silently lose the ability to handle duplicate offsets.
john> + # Is this exactly the next requested data chunk?
john> + key = (start, subsize)
john> + if key == cur_offset_and_size:
john> + yield start, data
john> + cur_offset_and_size = offset_stack.next()
john> + else:
john> + data_map[key] = data
john> + cur_coalesced.ranges = new_ranges
Again, walking several iterators in parallel make the code hard
to read.
john> # Now that we've read some data, see if we can yield anything back
john> while cur_offset_and_size in data_map:
john> @@ -363,15 +403,24 @@
john> yield cur_offset_and_size[0], this_data
john> cur_offset_and_size = offset_stack.next()
john> - # We read a coalesced entry, so mark it as done
john> - cur_coalesced = None
john> - # Now that we've read all of the data for this coalesced section
john> - # on to the next
john> - cur_coalesced = cur_coalesced_stack.next()
john> + if buffered_end == cur_coalesced.start + cur_coalesced.length:
john> + # We reached the end of this section, go on to the next
john> + try:
john> + cur_coalesced = cur_coalesced_stack.next()
john> + except StopIteration:
john> + cur_coalesced = None
john> + if buffered_data:
john> + # Why do we have buffered data left
john> + import pdb; pdb.set_trace()
<cough>
john> + buffered_data = ''
john> + buffered_start = cur_coalesced.start
john> + elif buffered_end >= cur_coalesced.start + cur_coalesced.length:
john> + raise AssertionError('Somehow we read too much data: %s > %s'
john> + % (buffered_end, cur_coalesced.start + cur_coalesced.length))
Same here, if that could be documented and exercised in a test,
the resulting code would be clearer.
john> if cur_coalesced is not None:
john> raise errors.ShortReadvError(relpath, cur_coalesced.start,
john> - cur_coalesced.length, len(data))
john> + cur_coalesced.length, len(buffered_data))
john> def put_file(self, relpath, f, mode=None):
john> """
Overall, you address the problem (but Vlad said it crashes later
anyway) but increase the complexity of a already complex method.
Having addressed the problem in a different way for http, I may
not be impartial but I think this could redesigned differently.
Basically the problem we want to address is:
- read parts of a file,
- by issuing several requests,
- each request can be more or less than a requested file part,
- concatenate requests when needed,
- yield requests offsets
While:
- caching out-of-order parts,
- minimizing latency without overflowing lower level buffers.
I think we can do that by:
- coalesce_offsets (as done now, with parameters Rightly tuned)
- prepare the sftp requests,
- issue a reasonable amount of requests,
- start processing received requests
- below a threshold, issue more requests
By introducing a intermediate iterator, the _sftp_readv method
may be simplified and the intermediate iterator can then find
some balance between requests received which consume the lower
level buffers and pending requests that still need to be sent.
In another mail you said:
john> for start in xrange(0, len(ranges), _max_ranges):
john> for data in fp.readv(ranges[start:start+_max_ranges]):
john> ...
john> As you noticed, we were sending 15k async requests,
john> which seems a bit abusive. This would limit it to
john> 1000, at a time, which gives us 32KB*1000 = 32MB in
john> flight. Which sounds like enough to fill buffers. There
john> will still be 1/1000*num_ranges more round trips than
john> before, but it seems like a reasonable way to cap the
john> total number of requests on the wire.
The design above is about doing what you propose except :
for data in self._buffered_requests(ranges, self._max_ranges)
where 'ranges' is really 'requests' and we may want to replace
'_max_ranges' by '_max_size', since, after all, that's what we
care about.
In summary, on three main criteria:
- Doesn't reduce test coverage
- Doesn't reduce design clarity
- Improves bugs, features, speed, or code simplicity.
Err, I think you succeed only on part of the third: improve bugs
;-)
Hope that helps,
Vincent
P.S.: Since this is my first real review, don't hesitate to
comment the review itself.
More information about the bazaar
mailing list