Rev 3650: Change _StatefulDecoder._in_bytes into a _in_bytes_list in http://bzr.arbash-meinel.com/branches/bzr/1.7-dev/hpss_readv

John Arbash Meinel john at arbash-meinel.com
Thu Aug 28 21:47:57 BST 2008


At http://bzr.arbash-meinel.com/branches/bzr/1.7-dev/hpss_readv

------------------------------------------------------------
revno: 3650
revision-id: john at arbash-meinel.com-20080828204756-iii2npp6ys48xzbo
parent: pqm at pqm.ubuntu.com-20080825182248-dydy5bn4n5akbfjq
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: hpss_readv
timestamp: Thu 2008-08-28 15:47:56 -0500
message:
  Change _StatefulDecoder._in_bytes into a _in_bytes_list
-------------- next part --------------
=== modified file 'bzrlib/smart/protocol.py'
--- a/bzrlib/smart/protocol.py	2008-07-21 04:24:21 +0000
+++ b/bzrlib/smart/protocol.py	2008-08-28 20:47:56 +0000
@@ -323,11 +323,29 @@
 
     def __init__(self):
         self.finished_reading = False
-        self._in_buffer = ''
+        # self._in_buffer = None
+        self._in_buffer_list = []
+        self._in_buffer_len = 0
         self.unused_data = ''
         self.bytes_left = None
         self._number_needed_bytes = None
 
+    def _get_in_buffer(self):
+        if len(self._in_buffer_list) == 1:
+            return self._in_buffer_list[0]
+        in_buffer = ''.join(self._in_buffer_list)
+        assert len(in_buffer) == self._in_buffer_len
+        in_buffer_list = [in_buffer]
+        return in_buffer
+
+    def _set_in_buffer(self, new_buf):
+        if new_buf is not None:
+            self._in_buffer_list = [new_buf]
+            self._in_buffer_len = len(new_buf)
+        else:
+            self._in_buffer_list = []
+            self._in_buffer_len = 0
+
     def accept_bytes(self, bytes):
         """Decode as much of bytes as possible.
 
@@ -338,11 +356,14 @@
         data will be appended to self.unused_data.
         """
         # accept_bytes is allowed to change the state
-        current_state = self.state_accept
         self._number_needed_bytes = None
-        self._in_buffer += bytes
+        # lsprof puts a very large amount of time on this specific call for
+        # large readv arrays
+        self._in_buffer_list.append(bytes)
+        self._in_buffer_len += len(bytes)
         try:
             # Run the function for the current state.
+            current_state = self.state_accept
             self.state_accept()
             while current_state != self.state_accept:
                 # The current state has changed.  Run the function for the new
@@ -379,7 +400,7 @@
             # the rest of this chunk plus an END chunk.
             return self.bytes_left + 4
         elif self.state_accept == self._state_accept_expecting_length:
-            if self._in_buffer == '':
+            if self._in_buffer_len == 0:
                 # We're expecting a chunk length.  There's at least two bytes
                 # left: a digit plus '\n'.
                 return 2
@@ -390,7 +411,7 @@
         elif self.state_accept == self._state_accept_reading_unused:
             return 1
         elif self.state_accept == self._state_accept_expecting_header:
-            return max(0, len('chunked\n') - len(self._in_buffer))
+            return max(0, len('chunked\n') - self._in_buffer_len)
         else:
             raise AssertionError("Impossible state: %r" % (self.state_accept,))
 
@@ -401,19 +422,22 @@
             return None
 
     def _extract_line(self):
-        pos = self._in_buffer.find('\n')
+        in_buf = self._get_in_buffer()
+        pos = in_buf.find('\n')
         if pos == -1:
             # We haven't read a complete line yet, so request more bytes before
             # we continue.
             raise _NeedMoreBytes(1)
-        line = self._in_buffer[:pos]
+        line = in_buf[:pos]
         # Trim the prefix (including '\n' delimiter) from the _in_buffer.
-        self._in_buffer = self._in_buffer[pos+1:]
+        self._set_in_buffer(in_buf[pos+1:])
         return line
 
     def _finished(self):
-        self.unused_data = self._in_buffer
-        self._in_buffer = ''
+        self.unused_data = self._get_in_buffer()
+        # self._in_buffer = None
+        self._in_buffer_list = []
+        self._in_buffer_len = 0
         self.state_accept = self._state_accept_reading_unused
         if self.error:
             error_args = tuple(self.error_in_progress)
@@ -448,9 +472,10 @@
             self.state_accept = self._state_accept_reading_chunk
 
     def _state_accept_reading_chunk(self):
-        in_buffer_len = len(self._in_buffer)
-        self.chunk_in_progress += self._in_buffer[:self.bytes_left]
-        self._in_buffer = self._in_buffer[self.bytes_left:]
+        in_buf = self._get_in_buffer()
+        in_buffer_len = len(in_buf)
+        self.chunk_in_progress += in_buf[:self.bytes_left]
+        self._set_in_buffer(in_buf[self.bytes_left:])
         self.bytes_left -= in_buffer_len
         if self.bytes_left <= 0:
             # Finished with chunk
@@ -463,8 +488,8 @@
             self.state_accept = self._state_accept_expecting_length
         
     def _state_accept_reading_unused(self):
-        self.unused_data += self._in_buffer
-        self._in_buffer = ''
+        self.unused_data += self._get_in_buffer()
+        self._in_buffer_list = []
 
 
 class LengthPrefixedBodyDecoder(_StatefulDecoder):
@@ -498,18 +523,20 @@
         return self.state_read()
 
     def _state_accept_expecting_length(self):
-        pos = self._in_buffer.find('\n')
+        in_buf = self._get_in_buffer()
+        pos = in_buf.find('\n')
         if pos == -1:
             return
-        self.bytes_left = int(self._in_buffer[:pos])
-        self._in_buffer = self._in_buffer[pos+1:]
+        self.bytes_left = int(in_buf[:pos])
+        self._set_in_buffer(in_buf[pos+1:])
         self.state_accept = self._state_accept_reading_body
         self.state_read = self._state_read_body_buffer
 
     def _state_accept_reading_body(self):
-        self._body += self._in_buffer
-        self.bytes_left -= len(self._in_buffer)
-        self._in_buffer = ''
+        in_buf = self._get_in_buffer()
+        self._body += in_buf
+        self.bytes_left -= len(in_buf)
+        self._set_in_buffer(None)
         if self.bytes_left <= 0:
             # Finished with body
             if self.bytes_left != 0:
@@ -519,8 +546,8 @@
             self.state_accept = self._state_accept_reading_trailer
         
     def _state_accept_reading_trailer(self):
-        self._trailer_buffer += self._in_buffer
-        self._in_buffer = ''
+        self._trailer_buffer += self._get_in_buffer()
+        self._set_in_buffer(None)
         # TODO: what if the trailer does not match "done\n"?  Should this raise
         # a ProtocolViolation exception?
         if self._trailer_buffer.startswith('done\n'):
@@ -529,8 +556,8 @@
             self.finished_reading = True
     
     def _state_accept_reading_unused(self):
-        self.unused_data += self._in_buffer
-        self._in_buffer = ''
+        self.unused_data += self._get_in_buffer()
+        self._set_in_buffer(None)
 
     def _state_read_no_data(self):
         return ''
@@ -865,19 +892,20 @@
             self.message_handler.protocol_error(exception)
 
     def _extract_length_prefixed_bytes(self):
-        if len(self._in_buffer) < 4:
+        if self._in_buffer_len < 4:
             # A length prefix by itself is 4 bytes, and we don't even have that
             # many yet.
             raise _NeedMoreBytes(4)
-        (length,) = struct.unpack('!L', self._in_buffer[:4])
+        in_buf = self._get_in_buffer()
+        (length,) = struct.unpack('!L', in_buf[:4])
         end_of_bytes = 4 + length
-        if len(self._in_buffer) < end_of_bytes:
+        if self._in_buffer_len < end_of_bytes:
             # We haven't yet read as many bytes as the length-prefix says there
             # are.
             raise _NeedMoreBytes(end_of_bytes)
         # Extract the bytes from the buffer.
-        bytes = self._in_buffer[4:end_of_bytes]
-        self._in_buffer = self._in_buffer[end_of_bytes:]
+        bytes = in_buf[4:end_of_bytes]
+        self._set_in_buffer(in_buf[end_of_bytes:])
         return bytes
 
     def _extract_prefixed_bencoded_data(self):
@@ -890,15 +918,17 @@
         return decoded
 
     def _extract_single_byte(self):
-        if self._in_buffer == '':
+        if self._in_buffer_len == 0:
             # The buffer is empty
             raise _NeedMoreBytes(1)
-        one_byte = self._in_buffer[0]
-        self._in_buffer = self._in_buffer[1:]
+        in_buf = self._get_in_buffer()
+        one_byte = in_buf[0]
+        self._set_in_buffer(in_buf[1:])
         return one_byte
 
     def _state_accept_expecting_protocol_version(self):
-        needed_bytes = len(MESSAGE_VERSION_THREE) - len(self._in_buffer)
+        needed_bytes = len(MESSAGE_VERSION_THREE) - self._in_buffer_len
+        in_buf = self._get_in_buffer()
         if needed_bytes > 0:
             # We don't have enough bytes to check if the protocol version
             # marker is right.  But we can check if it is already wrong by
@@ -908,13 +938,13 @@
             # len(MESSAGE_VERSION_THREE) bytes.  So if the bytes we have so far
             # are wrong then we should just raise immediately rather than
             # stall.]
-            if not MESSAGE_VERSION_THREE.startswith(self._in_buffer):
+            if not MESSAGE_VERSION_THREE.startswith(in_buf):
                 # We have enough bytes to know the protocol version is wrong
-                raise errors.UnexpectedProtocolVersionMarker(self._in_buffer)
+                raise errors.UnexpectedProtocolVersionMarker(in_buf)
             raise _NeedMoreBytes(len(MESSAGE_VERSION_THREE))
-        if not self._in_buffer.startswith(MESSAGE_VERSION_THREE):
-            raise errors.UnexpectedProtocolVersionMarker(self._in_buffer)
-        self._in_buffer = self._in_buffer[len(MESSAGE_VERSION_THREE):]
+        if not in_buf.startswith(MESSAGE_VERSION_THREE):
+            raise errors.UnexpectedProtocolVersionMarker(in_buf)
+        self._set_in_buffer(in_buf[len(MESSAGE_VERSION_THREE):])
         self.state_accept = self._state_accept_expecting_headers
 
     def _state_accept_expecting_headers(self):
@@ -969,8 +999,8 @@
             raise errors.SmartMessageHandlerError(sys.exc_info())
 
     def done(self):
-        self.unused_data = self._in_buffer
-        self._in_buffer = ''
+        self.unused_data = self._get_in_buffer()
+        self._set_in_buffer(None)
         self.state_accept = self._state_accept_reading_unused
         try:
             self.message_handler.end_received()
@@ -978,8 +1008,8 @@
             raise errors.SmartMessageHandlerError(sys.exc_info())
 
     def _state_accept_reading_unused(self):
-        self.unused_data += self._in_buffer
-        self._in_buffer = ''
+        self.unused_data = self._get_in_buffer()
+        self._set_in_buffer(None)
 
     def next_read_size(self):
         if self.state_accept == self._state_accept_reading_unused:
@@ -992,7 +1022,7 @@
             return 0
         else:
             if self._number_needed_bytes is not None:
-                return self._number_needed_bytes - len(self._in_buffer)
+                return self._number_needed_bytes - self._in_buffer_len
             else:
                 raise AssertionError("don't know how many bytes are expected!")
 

=== modified file 'bzrlib/transport/remote.py'
--- a/bzrlib/transport/remote.py	2008-07-25 03:12:11 +0000
+++ b/bzrlib/transport/remote.py	2008-08-28 20:47:56 +0000
@@ -43,7 +43,7 @@
         self.st_mode = mode
 
 
-class RemoteTransport(transport.ConnectedTransport):
+class RemoteTransport(transport.ConnectedTransport, medium.SmartClientMedium):
     """Connection to a smart server.
 
     The connection holds references to the medium that can be used to send
@@ -312,14 +312,13 @@
         offsets = list(offsets)
 
         sorted_offsets = sorted(offsets)
-        # turn the list of offsets into a stack
-        offset_stack = iter(offsets)
-        cur_offset_and_size = offset_stack.next()
         coalesced = list(self._coalesce_offsets(sorted_offsets,
                                limit=self._max_readv_combine,
                                fudge_factor=self._bytes_to_read_before_seek))
 
         try:
+            # if relpath.endswith('.pack'):
+            #     import pdb; pdb.set_trace()
             result = self._client.call_with_body_readv_array(
                 ('readv', self._remote_path(relpath),),
                 [(c.start, c.length) for c in coalesced])
@@ -332,6 +331,12 @@
             response_handler.cancel_read_body()
             raise errors.UnexpectedSmartServerResponse(resp)
 
+        return self._handle_response(offsets, coalesced, response_handler)
+
+    def _handle_response(self, offsets, coalesced, response_handler):
+        # turn the list of offsets into a stack
+        offset_stack = iter(offsets)
+        cur_offset_and_size = offset_stack.next()
         # FIXME: this should know how many bytes are needed, for clarity.
         data = response_handler.read_body_bytes()
         # Cache the results, but only until they have been fulfilled



More information about the bazaar-commits mailing list