Rev 3620: Cherrypick the hpss_readv fix back onto bzr.1.6 in http://bzr.arbash-meinel.com/branches/bzr/1.6-dev/hpss_readv
John Arbash Meinel
john at arbash-meinel.com
Fri Aug 29 06:25:58 BST 2008
At http://bzr.arbash-meinel.com/branches/bzr/1.6-dev/hpss_readv
------------------------------------------------------------
revno: 3620
revision-id: john at arbash-meinel.com-20080829052558-2j75lgu4g571z13f
parent: pqm at pqm.ubuntu.com-20080825143827-fl7cocq59pqdig2p
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: hpss_readv
timestamp: Fri 2008-08-29 00:25:58 -0500
message:
Cherrypick the hpss_readv fix back onto bzr.1.6
-------------- next part --------------
=== modified file 'NEWS'
--- a/NEWS 2008-08-25 14:06:14 +0000
+++ b/NEWS 2008-08-29 05:25:58 +0000
@@ -4,6 +4,19 @@
.. contents::
+bzr 1.6rc1 IN DEVELOPMENT
+-------------------------
+
+ BUG FIXES:
+
+ * ``RemoteTransport.readv()`` was being inefficient about how it
+ buffered the readv data and processed it. It would keep appending to
+ the same string (causing many copies) and then pop bytes out of the
+ start of the string (causing more copies).
+ With this patch "bzr+ssh://local" speeds improve by up to 3x.
+ (John Arbash Meinel)
+
+
bzr 1.6 2008-08-25
------------------
=== modified file 'bzrlib/smart/protocol.py'
--- a/bzrlib/smart/protocol.py 2008-07-21 04:24:21 +0000
+++ b/bzrlib/smart/protocol.py 2008-08-29 05:25:58 +0000
@@ -323,11 +323,50 @@
def __init__(self):
self.finished_reading = False
- self._in_buffer = ''
+ self._in_buffer_list = []
+ self._in_buffer_len = 0
self.unused_data = ''
self.bytes_left = None
self._number_needed_bytes = None
+ def _get_in_buffer(self):
+ if len(self._in_buffer_list) == 1:
+ return self._in_buffer_list[0]
+ in_buffer = ''.join(self._in_buffer_list)
+ if len(in_buffer) != self._in_buffer_len:
+ raise AssertionError(
+ "Length of buffer did not match expected value: %s != %s"
+ % self._in_buffer_len, len(in_buffer))
+ self._in_buffer_list = [in_buffer]
+ return in_buffer
+
+ def _get_in_bytes(self, count):
+ """Grab X bytes from the input_buffer.
+
+ Callers should have already checked that self._in_buffer_len is >
+ count. Note, this does not consume the bytes from the buffer. The
+ caller will still need to call _get_in_buffer() and then
+ _set_in_buffer() if they actually need to consume the bytes.
+ """
+ # check if we can yield the bytes from just the first entry in our list
+ if len(self._in_buffer_list) == 0:
+ raise AssertionError('Callers must be sure we have buffered bytes'
+ ' before calling _get_in_bytes')
+ if len(self._in_buffer_list[0]) > count:
+ return self._in_buffer_list[0][:count]
+ # We can't yield it from the first buffer, so collapse all buffers, and
+ # yield it from that
+ in_buf = self._get_in_buffer()
+ return in_buf[:count]
+
+ def _set_in_buffer(self, new_buf):
+ if new_buf is not None:
+ self._in_buffer_list = [new_buf]
+ self._in_buffer_len = len(new_buf)
+ else:
+ self._in_buffer_list = []
+ self._in_buffer_len = 0
+
def accept_bytes(self, bytes):
"""Decode as much of bytes as possible.
@@ -338,11 +377,14 @@
data will be appended to self.unused_data.
"""
# accept_bytes is allowed to change the state
- current_state = self.state_accept
self._number_needed_bytes = None
- self._in_buffer += bytes
+ # lsprof puts a very large amount of time on this specific call for
+ # large readv arrays
+ self._in_buffer_list.append(bytes)
+ self._in_buffer_len += len(bytes)
try:
# Run the function for the current state.
+ current_state = self.state_accept
self.state_accept()
while current_state != self.state_accept:
# The current state has changed. Run the function for the new
@@ -379,7 +421,7 @@
# the rest of this chunk plus an END chunk.
return self.bytes_left + 4
elif self.state_accept == self._state_accept_expecting_length:
- if self._in_buffer == '':
+ if self._in_buffer_len == 0:
# We're expecting a chunk length. There's at least two bytes
# left: a digit plus '\n'.
return 2
@@ -390,7 +432,7 @@
elif self.state_accept == self._state_accept_reading_unused:
return 1
elif self.state_accept == self._state_accept_expecting_header:
- return max(0, len('chunked\n') - len(self._in_buffer))
+ return max(0, len('chunked\n') - self._in_buffer_len)
else:
raise AssertionError("Impossible state: %r" % (self.state_accept,))
@@ -401,19 +443,22 @@
return None
def _extract_line(self):
- pos = self._in_buffer.find('\n')
+ in_buf = self._get_in_buffer()
+ pos = in_buf.find('\n')
if pos == -1:
# We haven't read a complete line yet, so request more bytes before
# we continue.
raise _NeedMoreBytes(1)
- line = self._in_buffer[:pos]
+ line = in_buf[:pos]
# Trim the prefix (including '\n' delimiter) from the _in_buffer.
- self._in_buffer = self._in_buffer[pos+1:]
+ self._set_in_buffer(in_buf[pos+1:])
return line
def _finished(self):
- self.unused_data = self._in_buffer
- self._in_buffer = ''
+ self.unused_data = self._get_in_buffer()
+ # self._in_buffer = None
+ self._in_buffer_list = []
+ self._in_buffer_len = 0
self.state_accept = self._state_accept_reading_unused
if self.error:
error_args = tuple(self.error_in_progress)
@@ -448,9 +493,10 @@
self.state_accept = self._state_accept_reading_chunk
def _state_accept_reading_chunk(self):
- in_buffer_len = len(self._in_buffer)
- self.chunk_in_progress += self._in_buffer[:self.bytes_left]
- self._in_buffer = self._in_buffer[self.bytes_left:]
+ in_buf = self._get_in_buffer()
+ in_buffer_len = len(in_buf)
+ self.chunk_in_progress += in_buf[:self.bytes_left]
+ self._set_in_buffer(in_buf[self.bytes_left:])
self.bytes_left -= in_buffer_len
if self.bytes_left <= 0:
# Finished with chunk
@@ -463,8 +509,8 @@
self.state_accept = self._state_accept_expecting_length
def _state_accept_reading_unused(self):
- self.unused_data += self._in_buffer
- self._in_buffer = ''
+ self.unused_data += self._get_in_buffer()
+ self._in_buffer_list = []
class LengthPrefixedBodyDecoder(_StatefulDecoder):
@@ -498,18 +544,20 @@
return self.state_read()
def _state_accept_expecting_length(self):
- pos = self._in_buffer.find('\n')
+ in_buf = self._get_in_buffer()
+ pos = in_buf.find('\n')
if pos == -1:
return
- self.bytes_left = int(self._in_buffer[:pos])
- self._in_buffer = self._in_buffer[pos+1:]
+ self.bytes_left = int(in_buf[:pos])
+ self._set_in_buffer(in_buf[pos+1:])
self.state_accept = self._state_accept_reading_body
self.state_read = self._state_read_body_buffer
def _state_accept_reading_body(self):
- self._body += self._in_buffer
- self.bytes_left -= len(self._in_buffer)
- self._in_buffer = ''
+ in_buf = self._get_in_buffer()
+ self._body += in_buf
+ self.bytes_left -= len(in_buf)
+ self._set_in_buffer(None)
if self.bytes_left <= 0:
# Finished with body
if self.bytes_left != 0:
@@ -519,8 +567,8 @@
self.state_accept = self._state_accept_reading_trailer
def _state_accept_reading_trailer(self):
- self._trailer_buffer += self._in_buffer
- self._in_buffer = ''
+ self._trailer_buffer += self._get_in_buffer()
+ self._set_in_buffer(None)
# TODO: what if the trailer does not match "done\n"? Should this raise
# a ProtocolViolation exception?
if self._trailer_buffer.startswith('done\n'):
@@ -529,8 +577,8 @@
self.finished_reading = True
def _state_accept_reading_unused(self):
- self.unused_data += self._in_buffer
- self._in_buffer = ''
+ self.unused_data += self._get_in_buffer()
+ self._set_in_buffer(None)
def _state_read_no_data(self):
return ''
@@ -865,19 +913,20 @@
self.message_handler.protocol_error(exception)
def _extract_length_prefixed_bytes(self):
- if len(self._in_buffer) < 4:
+ if self._in_buffer_len < 4:
# A length prefix by itself is 4 bytes, and we don't even have that
# many yet.
raise _NeedMoreBytes(4)
- (length,) = struct.unpack('!L', self._in_buffer[:4])
+ (length,) = struct.unpack('!L', self._get_in_bytes(4))
end_of_bytes = 4 + length
- if len(self._in_buffer) < end_of_bytes:
+ if self._in_buffer_len < end_of_bytes:
# We haven't yet read as many bytes as the length-prefix says there
# are.
raise _NeedMoreBytes(end_of_bytes)
# Extract the bytes from the buffer.
- bytes = self._in_buffer[4:end_of_bytes]
- self._in_buffer = self._in_buffer[end_of_bytes:]
+ in_buf = self._get_in_buffer()
+ bytes = in_buf[4:end_of_bytes]
+ self._set_in_buffer(in_buf[end_of_bytes:])
return bytes
def _extract_prefixed_bencoded_data(self):
@@ -890,15 +939,17 @@
return decoded
def _extract_single_byte(self):
- if self._in_buffer == '':
+ if self._in_buffer_len == 0:
# The buffer is empty
raise _NeedMoreBytes(1)
- one_byte = self._in_buffer[0]
- self._in_buffer = self._in_buffer[1:]
+ in_buf = self._get_in_buffer()
+ one_byte = in_buf[0]
+ self._set_in_buffer(in_buf[1:])
return one_byte
def _state_accept_expecting_protocol_version(self):
- needed_bytes = len(MESSAGE_VERSION_THREE) - len(self._in_buffer)
+ needed_bytes = len(MESSAGE_VERSION_THREE) - self._in_buffer_len
+ in_buf = self._get_in_buffer()
if needed_bytes > 0:
# We don't have enough bytes to check if the protocol version
# marker is right. But we can check if it is already wrong by
@@ -908,13 +959,13 @@
# len(MESSAGE_VERSION_THREE) bytes. So if the bytes we have so far
# are wrong then we should just raise immediately rather than
# stall.]
- if not MESSAGE_VERSION_THREE.startswith(self._in_buffer):
+ if not MESSAGE_VERSION_THREE.startswith(in_buf):
# We have enough bytes to know the protocol version is wrong
- raise errors.UnexpectedProtocolVersionMarker(self._in_buffer)
+ raise errors.UnexpectedProtocolVersionMarker(in_buf)
raise _NeedMoreBytes(len(MESSAGE_VERSION_THREE))
- if not self._in_buffer.startswith(MESSAGE_VERSION_THREE):
- raise errors.UnexpectedProtocolVersionMarker(self._in_buffer)
- self._in_buffer = self._in_buffer[len(MESSAGE_VERSION_THREE):]
+ if not in_buf.startswith(MESSAGE_VERSION_THREE):
+ raise errors.UnexpectedProtocolVersionMarker(in_buf)
+ self._set_in_buffer(in_buf[len(MESSAGE_VERSION_THREE):])
self.state_accept = self._state_accept_expecting_headers
def _state_accept_expecting_headers(self):
@@ -969,8 +1020,8 @@
raise errors.SmartMessageHandlerError(sys.exc_info())
def done(self):
- self.unused_data = self._in_buffer
- self._in_buffer = ''
+ self.unused_data = self._get_in_buffer()
+ self._set_in_buffer(None)
self.state_accept = self._state_accept_reading_unused
try:
self.message_handler.end_received()
@@ -978,8 +1029,8 @@
raise errors.SmartMessageHandlerError(sys.exc_info())
def _state_accept_reading_unused(self):
- self.unused_data += self._in_buffer
- self._in_buffer = ''
+ self.unused_data = self._get_in_buffer()
+ self._set_in_buffer(None)
def next_read_size(self):
if self.state_accept == self._state_accept_reading_unused:
@@ -992,7 +1043,7 @@
return 0
else:
if self._number_needed_bytes is not None:
- return self._number_needed_bytes - len(self._in_buffer)
+ return self._number_needed_bytes - self._in_buffer_len
else:
raise AssertionError("don't know how many bytes are expected!")
=== modified file 'bzrlib/tests/test_smart_transport.py'
--- a/bzrlib/tests/test_smart_transport.py 2008-06-17 04:32:18 +0000
+++ b/bzrlib/tests/test_smart_transport.py 2008-08-29 05:25:58 +0000
@@ -2267,7 +2267,8 @@
def test_construct_version_three_server_protocol(self):
smart_protocol = protocol.ProtocolThreeDecoder(None)
self.assertEqual('', smart_protocol.unused_data)
- self.assertEqual('', smart_protocol._in_buffer)
+ self.assertEqual([], smart_protocol._in_buffer_list)
+ self.assertEqual(0, smart_protocol._in_buffer_len)
self.assertFalse(smart_protocol._has_dispatched)
# The protocol starts by expecting four bytes, a length prefix for the
# headers.
=== modified file 'bzrlib/transport/remote.py'
--- a/bzrlib/transport/remote.py 2008-07-25 03:12:11 +0000
+++ b/bzrlib/transport/remote.py 2008-08-29 05:25:58 +0000
@@ -43,7 +43,7 @@
self.st_mode = mode
-class RemoteTransport(transport.ConnectedTransport):
+class RemoteTransport(transport.ConnectedTransport, medium.SmartClientMedium):
"""Connection to a smart server.
The connection holds references to the medium that can be used to send
@@ -312,14 +312,13 @@
offsets = list(offsets)
sorted_offsets = sorted(offsets)
- # turn the list of offsets into a stack
- offset_stack = iter(offsets)
- cur_offset_and_size = offset_stack.next()
coalesced = list(self._coalesce_offsets(sorted_offsets,
limit=self._max_readv_combine,
fudge_factor=self._bytes_to_read_before_seek))
try:
+ # if relpath.endswith('.pack'):
+ # import pdb; pdb.set_trace()
result = self._client.call_with_body_readv_array(
('readv', self._remote_path(relpath),),
[(c.start, c.length) for c in coalesced])
@@ -332,18 +331,38 @@
response_handler.cancel_read_body()
raise errors.UnexpectedSmartServerResponse(resp)
+ return self._handle_response(offsets, coalesced, response_handler)
+
+ def _handle_response(self, offsets, coalesced, response_handler):
+ # turn the list of offsets into a stack
+ offset_stack = iter(offsets)
+ cur_offset_and_size = offset_stack.next()
# FIXME: this should know how many bytes are needed, for clarity.
data = response_handler.read_body_bytes()
# Cache the results, but only until they have been fulfilled
data_map = {}
+ data_offset = 0
for c_offset in coalesced:
if len(data) < c_offset.length:
raise errors.ShortReadvError(relpath, c_offset.start,
c_offset.length, actual=len(data))
for suboffset, subsize in c_offset.ranges:
key = (c_offset.start+suboffset, subsize)
- data_map[key] = data[suboffset:suboffset+subsize]
- data = data[c_offset.length:]
+ this_data = data[data_offset+suboffset:
+ data_offset+suboffset+subsize]
+ # Special case when the data is in-order, rather than packing
+ # into a map and then back out again. Benchmarking shows that
+ # this has 100% hit rate, but leave in the data_map work just
+ # in case.
+ # TODO: Could we get away with using buffer() to avoid the
+ # memory copy? Callers would need to realize they may
+ # not have a real string.
+ if key == cur_offset_and_size:
+ yield cur_offset_and_size[0], this_data
+ cur_offset_and_size = offset_stack.next()
+ else:
+ data_map[key] = this_data
+ data_offset += c_offset.length
# Now that we've read some data, see if we can yield anything back
while cur_offset_and_size in data_map:
More information about the bazaar-commits
mailing list