Rev 3665: (jam) Change the HPSS readv() code to buffer and process without in file:///home/pqm/archives/thelove/bzr/%2Btrunk/

Canonical.com Patch Queue Manager pqm at pqm.ubuntu.com
Fri Aug 29 07:27:55 BST 2008


At file:///home/pqm/archives/thelove/bzr/%2Btrunk/

------------------------------------------------------------
revno: 3665
revision-id: pqm at pqm.ubuntu.com-20080829062746-ny482m2f2pukdhqt
parent: pqm at pqm.ubuntu.com-20080829021555-m0ewn41ljtx2z0pa
parent: john at arbash-meinel.com-20080829015532-d26jj843xwt7ad8u
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Fri 2008-08-29 07:27:46 +0100
message:
  (jam) Change the HPSS readv() code to buffer and process without
  	reallocating.
modified:
  NEWS                           NEWS-20050323055033-4e00b5db738777ff
  bzrlib/smart/protocol.py       protocol.py-20061108035435-ot0lstk2590yqhzr-1
  bzrlib/tests/test_smart_transport.py test_ssh_transport.py-20060608202016-c25gvf1ob7ypbus6-2
  bzrlib/transport/remote.py     ssh.py-20060608202016-c25gvf1ob7ypbus6-1
    ------------------------------------------------------------
    revno: 3649.5.5
    revision-id: john at arbash-meinel.com-20080829015532-d26jj843xwt7ad8u
    parent: john at arbash-meinel.com-20080828213619-0wq19y99l96lja2t
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: hpss_readv
    timestamp: Thu 2008-08-28 20:55:32 -0500
    message:
      Fix the test suite.
      Remove the 'assert' statements and turn them into raise AssertionError
      Fix the test that assumed ProtoThreeDecoder had an '_in_buffer'
      member.
    modified:
      bzrlib/smart/protocol.py       protocol.py-20061108035435-ot0lstk2590yqhzr-1
      bzrlib/tests/test_smart_transport.py test_ssh_transport.py-20060608202016-c25gvf1ob7ypbus6-2
    ------------------------------------------------------------
    revno: 3649.5.4
    revision-id: john at arbash-meinel.com-20080828213619-0wq19y99l96lja2t
    parent: john at arbash-meinel.com-20080828212748-fplqyspastui6wq8
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: hpss_readv
    timestamp: Thu 2008-08-28 16:36:19 -0500
    message:
      NEWS entry.
    modified:
      NEWS                           NEWS-20050323055033-4e00b5db738777ff
    ------------------------------------------------------------
    revno: 3649.5.3
    revision-id: john at arbash-meinel.com-20080828212748-fplqyspastui6wq8
    parent: john at arbash-meinel.com-20080828210435-h30020sylefc8750
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: hpss_readv
    timestamp: Thu 2008-08-28 16:27:48 -0500
    message:
      Don't downsize the data buffer while reading.
      Instead just use offsets into the data buffer.
      It turns out that the string copying was massively dominating performance.
      With this and the earlier patch, it drops the time for
      bzr branch bzr+ssh://localhost/bzr.dev
      from 1m26s down to 39.3s.
      And 32s is the local transport time.
    modified:
      bzrlib/transport/remote.py     ssh.py-20060608202016-c25gvf1ob7ypbus6-1
    ------------------------------------------------------------
    revno: 3649.5.2
    revision-id: john at arbash-meinel.com-20080828210435-h30020sylefc8750
    parent: john at arbash-meinel.com-20080828204756-iii2npp6ys48xzbo
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: hpss_readv
    timestamp: Thu 2008-08-28 16:04:35 -0500
    message:
      When we are waiting on a big stream, allow
      extract_length_prefixed_bytes to peek at the input stream,
      rather than always packing it into a slightly larger string.
    modified:
      bzrlib/smart/protocol.py       protocol.py-20061108035435-ot0lstk2590yqhzr-1
    ------------------------------------------------------------
    revno: 3649.5.1
    revision-id: john at arbash-meinel.com-20080828204756-iii2npp6ys48xzbo
    parent: pqm at pqm.ubuntu.com-20080825182248-dydy5bn4n5akbfjq
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: hpss_readv
    timestamp: Thu 2008-08-28 15:47:56 -0500
    message:
      Change _StatefulDecoder._in_bytes into a _in_bytes_list
    modified:
      bzrlib/smart/protocol.py       protocol.py-20061108035435-ot0lstk2590yqhzr-1
      bzrlib/transport/remote.py     ssh.py-20060608202016-c25gvf1ob7ypbus6-1
=== modified file 'NEWS'
--- a/NEWS	2008-08-28 22:04:58 +0000
+++ b/NEWS	2008-08-29 06:27:46 +0000
@@ -54,6 +54,13 @@
     * Merging from a previously joined branch will no longer cause 
       a traceback. (Jelmer Vernooij, #203376)
     
+    * ``RemoteTransport.readv()`` was being inefficient about how it
+      buffered the readv data and processed it. It would keep appending to
+      the same string (causing many copies) and then pop bytes out of the
+      start of the string (causing more copies).
+      With this patch "bzr+ssh://local" speeds improve by up to 3x.
+      (John Arbash Meinel)
+
     * Running ``bzr st PATH_TO_TREE`` will no longer suppress merge
       status. Status is also about 7% faster on mozilla sized trees
       when the path to the root of the tree has been given. Users of

=== modified file 'bzrlib/smart/protocol.py'
--- a/bzrlib/smart/protocol.py	2008-07-21 04:24:21 +0000
+++ b/bzrlib/smart/protocol.py	2008-08-29 01:55:32 +0000
@@ -323,11 +323,50 @@
 
     def __init__(self):
         self.finished_reading = False
-        self._in_buffer = ''
+        self._in_buffer_list = []
+        self._in_buffer_len = 0
         self.unused_data = ''
         self.bytes_left = None
         self._number_needed_bytes = None
 
+    def _get_in_buffer(self):
+        if len(self._in_buffer_list) == 1:
+            return self._in_buffer_list[0]
+        in_buffer = ''.join(self._in_buffer_list)
+        if len(in_buffer) != self._in_buffer_len:
+            raise AssertionError(
+                "Length of buffer did not match expected value: %s != %s"
+                % self._in_buffer_len, len(in_buffer))
+        self._in_buffer_list = [in_buffer]
+        return in_buffer
+
+    def _get_in_bytes(self, count):
+        """Grab X bytes from the input_buffer.
+
+        Callers should have already checked that self._in_buffer_len is >
+        count. Note, this does not consume the bytes from the buffer. The
+        caller will still need to call _get_in_buffer() and then
+        _set_in_buffer() if they actually need to consume the bytes.
+        """
+        # check if we can yield the bytes from just the first entry in our list
+        if len(self._in_buffer_list) == 0:
+            raise AssertionError('Callers must be sure we have buffered bytes'
+                ' before calling _get_in_bytes')
+        if len(self._in_buffer_list[0]) > count:
+            return self._in_buffer_list[0][:count]
+        # We can't yield it from the first buffer, so collapse all buffers, and
+        # yield it from that
+        in_buf = self._get_in_buffer()
+        return in_buf[:count]
+
+    def _set_in_buffer(self, new_buf):
+        if new_buf is not None:
+            self._in_buffer_list = [new_buf]
+            self._in_buffer_len = len(new_buf)
+        else:
+            self._in_buffer_list = []
+            self._in_buffer_len = 0
+
     def accept_bytes(self, bytes):
         """Decode as much of bytes as possible.
 
@@ -338,11 +377,14 @@
         data will be appended to self.unused_data.
         """
         # accept_bytes is allowed to change the state
-        current_state = self.state_accept
         self._number_needed_bytes = None
-        self._in_buffer += bytes
+        # lsprof puts a very large amount of time on this specific call for
+        # large readv arrays
+        self._in_buffer_list.append(bytes)
+        self._in_buffer_len += len(bytes)
         try:
             # Run the function for the current state.
+            current_state = self.state_accept
             self.state_accept()
             while current_state != self.state_accept:
                 # The current state has changed.  Run the function for the new
@@ -379,7 +421,7 @@
             # the rest of this chunk plus an END chunk.
             return self.bytes_left + 4
         elif self.state_accept == self._state_accept_expecting_length:
-            if self._in_buffer == '':
+            if self._in_buffer_len == 0:
                 # We're expecting a chunk length.  There's at least two bytes
                 # left: a digit plus '\n'.
                 return 2
@@ -390,7 +432,7 @@
         elif self.state_accept == self._state_accept_reading_unused:
             return 1
         elif self.state_accept == self._state_accept_expecting_header:
-            return max(0, len('chunked\n') - len(self._in_buffer))
+            return max(0, len('chunked\n') - self._in_buffer_len)
         else:
             raise AssertionError("Impossible state: %r" % (self.state_accept,))
 
@@ -401,19 +443,22 @@
             return None
 
     def _extract_line(self):
-        pos = self._in_buffer.find('\n')
+        in_buf = self._get_in_buffer()
+        pos = in_buf.find('\n')
         if pos == -1:
             # We haven't read a complete line yet, so request more bytes before
             # we continue.
             raise _NeedMoreBytes(1)
-        line = self._in_buffer[:pos]
+        line = in_buf[:pos]
         # Trim the prefix (including '\n' delimiter) from the _in_buffer.
-        self._in_buffer = self._in_buffer[pos+1:]
+        self._set_in_buffer(in_buf[pos+1:])
         return line
 
     def _finished(self):
-        self.unused_data = self._in_buffer
-        self._in_buffer = ''
+        self.unused_data = self._get_in_buffer()
+        # self._in_buffer = None
+        self._in_buffer_list = []
+        self._in_buffer_len = 0
         self.state_accept = self._state_accept_reading_unused
         if self.error:
             error_args = tuple(self.error_in_progress)
@@ -448,9 +493,10 @@
             self.state_accept = self._state_accept_reading_chunk
 
     def _state_accept_reading_chunk(self):
-        in_buffer_len = len(self._in_buffer)
-        self.chunk_in_progress += self._in_buffer[:self.bytes_left]
-        self._in_buffer = self._in_buffer[self.bytes_left:]
+        in_buf = self._get_in_buffer()
+        in_buffer_len = len(in_buf)
+        self.chunk_in_progress += in_buf[:self.bytes_left]
+        self._set_in_buffer(in_buf[self.bytes_left:])
         self.bytes_left -= in_buffer_len
         if self.bytes_left <= 0:
             # Finished with chunk
@@ -463,8 +509,8 @@
             self.state_accept = self._state_accept_expecting_length
         
     def _state_accept_reading_unused(self):
-        self.unused_data += self._in_buffer
-        self._in_buffer = ''
+        self.unused_data += self._get_in_buffer()
+        self._in_buffer_list = []
 
 
 class LengthPrefixedBodyDecoder(_StatefulDecoder):
@@ -498,18 +544,20 @@
         return self.state_read()
 
     def _state_accept_expecting_length(self):
-        pos = self._in_buffer.find('\n')
+        in_buf = self._get_in_buffer()
+        pos = in_buf.find('\n')
         if pos == -1:
             return
-        self.bytes_left = int(self._in_buffer[:pos])
-        self._in_buffer = self._in_buffer[pos+1:]
+        self.bytes_left = int(in_buf[:pos])
+        self._set_in_buffer(in_buf[pos+1:])
         self.state_accept = self._state_accept_reading_body
         self.state_read = self._state_read_body_buffer
 
     def _state_accept_reading_body(self):
-        self._body += self._in_buffer
-        self.bytes_left -= len(self._in_buffer)
-        self._in_buffer = ''
+        in_buf = self._get_in_buffer()
+        self._body += in_buf
+        self.bytes_left -= len(in_buf)
+        self._set_in_buffer(None)
         if self.bytes_left <= 0:
             # Finished with body
             if self.bytes_left != 0:
@@ -519,8 +567,8 @@
             self.state_accept = self._state_accept_reading_trailer
         
     def _state_accept_reading_trailer(self):
-        self._trailer_buffer += self._in_buffer
-        self._in_buffer = ''
+        self._trailer_buffer += self._get_in_buffer()
+        self._set_in_buffer(None)
         # TODO: what if the trailer does not match "done\n"?  Should this raise
         # a ProtocolViolation exception?
         if self._trailer_buffer.startswith('done\n'):
@@ -529,8 +577,8 @@
             self.finished_reading = True
     
     def _state_accept_reading_unused(self):
-        self.unused_data += self._in_buffer
-        self._in_buffer = ''
+        self.unused_data += self._get_in_buffer()
+        self._set_in_buffer(None)
 
     def _state_read_no_data(self):
         return ''
@@ -865,19 +913,20 @@
             self.message_handler.protocol_error(exception)
 
     def _extract_length_prefixed_bytes(self):
-        if len(self._in_buffer) < 4:
+        if self._in_buffer_len < 4:
             # A length prefix by itself is 4 bytes, and we don't even have that
             # many yet.
             raise _NeedMoreBytes(4)
-        (length,) = struct.unpack('!L', self._in_buffer[:4])
+        (length,) = struct.unpack('!L', self._get_in_bytes(4))
         end_of_bytes = 4 + length
-        if len(self._in_buffer) < end_of_bytes:
+        if self._in_buffer_len < end_of_bytes:
             # We haven't yet read as many bytes as the length-prefix says there
             # are.
             raise _NeedMoreBytes(end_of_bytes)
         # Extract the bytes from the buffer.
-        bytes = self._in_buffer[4:end_of_bytes]
-        self._in_buffer = self._in_buffer[end_of_bytes:]
+        in_buf = self._get_in_buffer()
+        bytes = in_buf[4:end_of_bytes]
+        self._set_in_buffer(in_buf[end_of_bytes:])
         return bytes
 
     def _extract_prefixed_bencoded_data(self):
@@ -890,15 +939,17 @@
         return decoded
 
     def _extract_single_byte(self):
-        if self._in_buffer == '':
+        if self._in_buffer_len == 0:
             # The buffer is empty
             raise _NeedMoreBytes(1)
-        one_byte = self._in_buffer[0]
-        self._in_buffer = self._in_buffer[1:]
+        in_buf = self._get_in_buffer()
+        one_byte = in_buf[0]
+        self._set_in_buffer(in_buf[1:])
         return one_byte
 
     def _state_accept_expecting_protocol_version(self):
-        needed_bytes = len(MESSAGE_VERSION_THREE) - len(self._in_buffer)
+        needed_bytes = len(MESSAGE_VERSION_THREE) - self._in_buffer_len
+        in_buf = self._get_in_buffer()
         if needed_bytes > 0:
             # We don't have enough bytes to check if the protocol version
             # marker is right.  But we can check if it is already wrong by
@@ -908,13 +959,13 @@
             # len(MESSAGE_VERSION_THREE) bytes.  So if the bytes we have so far
             # are wrong then we should just raise immediately rather than
             # stall.]
-            if not MESSAGE_VERSION_THREE.startswith(self._in_buffer):
+            if not MESSAGE_VERSION_THREE.startswith(in_buf):
                 # We have enough bytes to know the protocol version is wrong
-                raise errors.UnexpectedProtocolVersionMarker(self._in_buffer)
+                raise errors.UnexpectedProtocolVersionMarker(in_buf)
             raise _NeedMoreBytes(len(MESSAGE_VERSION_THREE))
-        if not self._in_buffer.startswith(MESSAGE_VERSION_THREE):
-            raise errors.UnexpectedProtocolVersionMarker(self._in_buffer)
-        self._in_buffer = self._in_buffer[len(MESSAGE_VERSION_THREE):]
+        if not in_buf.startswith(MESSAGE_VERSION_THREE):
+            raise errors.UnexpectedProtocolVersionMarker(in_buf)
+        self._set_in_buffer(in_buf[len(MESSAGE_VERSION_THREE):])
         self.state_accept = self._state_accept_expecting_headers
 
     def _state_accept_expecting_headers(self):
@@ -969,8 +1020,8 @@
             raise errors.SmartMessageHandlerError(sys.exc_info())
 
     def done(self):
-        self.unused_data = self._in_buffer
-        self._in_buffer = ''
+        self.unused_data = self._get_in_buffer()
+        self._set_in_buffer(None)
         self.state_accept = self._state_accept_reading_unused
         try:
             self.message_handler.end_received()
@@ -978,8 +1029,8 @@
             raise errors.SmartMessageHandlerError(sys.exc_info())
 
     def _state_accept_reading_unused(self):
-        self.unused_data += self._in_buffer
-        self._in_buffer = ''
+        self.unused_data = self._get_in_buffer()
+        self._set_in_buffer(None)
 
     def next_read_size(self):
         if self.state_accept == self._state_accept_reading_unused:
@@ -992,7 +1043,7 @@
             return 0
         else:
             if self._number_needed_bytes is not None:
-                return self._number_needed_bytes - len(self._in_buffer)
+                return self._number_needed_bytes - self._in_buffer_len
             else:
                 raise AssertionError("don't know how many bytes are expected!")
 

=== modified file 'bzrlib/tests/test_smart_transport.py'
--- a/bzrlib/tests/test_smart_transport.py	2008-06-17 04:32:18 +0000
+++ b/bzrlib/tests/test_smart_transport.py	2008-08-29 01:55:32 +0000
@@ -2267,7 +2267,8 @@
     def test_construct_version_three_server_protocol(self):
         smart_protocol = protocol.ProtocolThreeDecoder(None)
         self.assertEqual('', smart_protocol.unused_data)
-        self.assertEqual('', smart_protocol._in_buffer)
+        self.assertEqual([], smart_protocol._in_buffer_list)
+        self.assertEqual(0, smart_protocol._in_buffer_len)
         self.assertFalse(smart_protocol._has_dispatched)
         # The protocol starts by expecting four bytes, a length prefix for the
         # headers.

=== modified file 'bzrlib/transport/remote.py'
--- a/bzrlib/transport/remote.py	2008-07-25 03:12:11 +0000
+++ b/bzrlib/transport/remote.py	2008-08-28 21:27:48 +0000
@@ -43,7 +43,7 @@
         self.st_mode = mode
 
 
-class RemoteTransport(transport.ConnectedTransport):
+class RemoteTransport(transport.ConnectedTransport, medium.SmartClientMedium):
     """Connection to a smart server.
 
     The connection holds references to the medium that can be used to send
@@ -312,14 +312,13 @@
         offsets = list(offsets)
 
         sorted_offsets = sorted(offsets)
-        # turn the list of offsets into a stack
-        offset_stack = iter(offsets)
-        cur_offset_and_size = offset_stack.next()
         coalesced = list(self._coalesce_offsets(sorted_offsets,
                                limit=self._max_readv_combine,
                                fudge_factor=self._bytes_to_read_before_seek))
 
         try:
+            # if relpath.endswith('.pack'):
+            #     import pdb; pdb.set_trace()
             result = self._client.call_with_body_readv_array(
                 ('readv', self._remote_path(relpath),),
                 [(c.start, c.length) for c in coalesced])
@@ -332,18 +331,38 @@
             response_handler.cancel_read_body()
             raise errors.UnexpectedSmartServerResponse(resp)
 
+        return self._handle_response(offsets, coalesced, response_handler)
+
+    def _handle_response(self, offsets, coalesced, response_handler):
+        # turn the list of offsets into a stack
+        offset_stack = iter(offsets)
+        cur_offset_and_size = offset_stack.next()
         # FIXME: this should know how many bytes are needed, for clarity.
         data = response_handler.read_body_bytes()
         # Cache the results, but only until they have been fulfilled
         data_map = {}
+        data_offset = 0
         for c_offset in coalesced:
             if len(data) < c_offset.length:
                 raise errors.ShortReadvError(relpath, c_offset.start,
                             c_offset.length, actual=len(data))
             for suboffset, subsize in c_offset.ranges:
                 key = (c_offset.start+suboffset, subsize)
-                data_map[key] = data[suboffset:suboffset+subsize]
-            data = data[c_offset.length:]
+                this_data = data[data_offset+suboffset:
+                                 data_offset+suboffset+subsize]
+                # Special case when the data is in-order, rather than packing
+                # into a map and then back out again. Benchmarking shows that
+                # this has 100% hit rate, but leave in the data_map work just
+                # in case.
+                # TODO: Could we get away with using buffer() to avoid the
+                #       memory copy?  Callers would need to realize they may
+                #       not have a real string.
+                if key == cur_offset_and_size:
+                    yield cur_offset_and_size[0], this_data
+                    cur_offset_and_size = offset_stack.next()
+                else:
+                    data_map[key] = this_data
+            data_offset += c_offset.length
 
             # Now that we've read some data, see if we can yield anything back
             while cur_offset_and_size in data_map:




More information about the bazaar-commits mailing list