Rev 3452: Speed up HPSS v3 by buffering writes to the medium. (Andrew Bennetts) in file:///home/pqm/archives/thelove/bzr/%2Btrunk/

Canonical.com Patch Queue Manager pqm at pqm.ubuntu.com
Sat May 24 13:02:44 BST 2008


At file:///home/pqm/archives/thelove/bzr/%2Btrunk/

------------------------------------------------------------
revno: 3452
revision-id:pqm at pqm.ubuntu.com-20080524120232-22xdromy706t0x16
parent: pqm at pqm.ubuntu.com-20080524114124-ubdyd5iqf7zxl2pn
parent: andrew.bennetts at canonical.com-20080523070237-2i3vqbsb7xs2jes1
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Sat 2008-05-24 13:02:32 +0100
message:
  Speed up HPSS v3 by buffering writes to the medium. (Andrew Bennetts)
modified:
  bzrlib/smart/protocol.py       protocol.py-20061108035435-ot0lstk2590yqhzr-1
  bzrlib/tests/test_smart_transport.py test_ssh_transport.py-20060608202016-c25gvf1ob7ypbus6-2
    ------------------------------------------------------------
    revno: 3441.3.3
    revision-id:andrew.bennetts at canonical.com-20080523070237-2i3vqbsb7xs2jes1
    parent: andrew.bennetts at canonical.com-20080521024958-1b79jyuv3v9qhuq3
    committer: Andrew Bennetts <andrew.bennetts at canonical.com>
    branch nick: hpss-buffering
    timestamp: Fri 2008-05-23 17:02:37 +1000
    message:
      Fix PEP 8 nit.
    modified:
      bzrlib/tests/test_smart_transport.py test_ssh_transport.py-20060608202016-c25gvf1ob7ypbus6-2
    ------------------------------------------------------------
    revno: 3441.3.2
    revision-id:andrew.bennetts at canonical.com-20080521024958-1b79jyuv3v9qhuq3
    parent: andrew.bennetts at canonical.com-20080521023432-rj4qn81st9sou7np
    committer: Andrew Bennetts <andrew.bennetts at canonical.com>
    branch nick: hpss-buffering
    timestamp: Wed 2008-05-21 12:49:58 +1000
    message:
      Simplify buffering logic in _ProtocolThreeEncoder.
    modified:
      bzrlib/smart/protocol.py       protocol.py-20061108035435-ot0lstk2590yqhzr-1
    ------------------------------------------------------------
    revno: 3441.3.1
    revision-id:andrew.bennetts at canonical.com-20080521023432-rj4qn81st9sou7np
    parent: pqm at pqm.ubuntu.com-20080520210027-wetfxldz1ggc5u2a
    committer: Andrew Bennetts <andrew.bennetts at canonical.com>
    branch nick: hpss-buffering
    timestamp: Wed 2008-05-21 12:34:32 +1000
    message:
      Buffer encoding of v3 messages to minimise write/send calls.  Doubles the speed of pushing over TCP with 500ms latency loopback.
    modified:
      bzrlib/smart/protocol.py       protocol.py-20061108035435-ot0lstk2590yqhzr-1
      bzrlib/tests/test_smart_transport.py test_ssh_transport.py-20060608202016-c25gvf1ob7ypbus6-2
=== modified file 'bzrlib/smart/protocol.py'
--- a/bzrlib/smart/protocol.py	2008-05-16 07:15:57 +0000
+++ b/bzrlib/smart/protocol.py	2008-05-21 02:49:58 +0000
@@ -1014,7 +1014,16 @@
     response_marker = request_marker = MESSAGE_VERSION_THREE
 
     def __init__(self, write_func):
-        self._write_func = write_func
+        self._buf = ''
+        self._real_write_func = write_func
+
+    def _write_func(self, bytes):
+        self._buf += bytes
+
+    def flush(self):
+        if self._buf:
+            self._real_write_func(self._buf)
+            self._buf = ''
 
     def _serialise_offsets(self, offsets):
         """Serialise a readv offset list."""
@@ -1046,6 +1055,7 @@
 
     def _write_end(self):
         self._write_func('e')
+        self.flush()
 
     def _write_prefixed_body(self, bytes):
         self._write_func('b')
@@ -1101,6 +1111,7 @@
         elif response.body_stream is not None:
             for chunk in response.body_stream:
                 self._write_prefixed_body(chunk)
+                self.flush()
         self._write_end()
         
 

=== modified file 'bzrlib/tests/test_smart_transport.py'
--- a/bzrlib/tests/test_smart_transport.py	2008-05-19 13:55:43 +0000
+++ b/bzrlib/tests/test_smart_transport.py	2008-05-23 07:02:37 +0000
@@ -2566,6 +2566,37 @@
             'e', # end
             output.getvalue())
 
+    def test_call_writes_just_once(self):
+        """A bodyless request is written to the medium all at once."""
+        medium_request = StubMediumRequest()
+        encoder = protocol.ProtocolThreeRequester(medium_request)
+        encoder.call('arg1', 'arg2', 'arg3')
+        self.assertEqual(
+            ['accept_bytes', 'finished_writing'], medium_request.calls)
+
+    def test_call_with_body_bytes_writes_just_once(self):
+        """A request with body bytes is written to the medium all at once."""
+        medium_request = StubMediumRequest()
+        encoder = protocol.ProtocolThreeRequester(medium_request)
+        encoder.call_with_body_bytes(('arg', 'arg'), 'body bytes')
+        self.assertEqual(
+            ['accept_bytes', 'finished_writing'], medium_request.calls)
+
+
+class StubMediumRequest(object):
+    """A stub medium request that tracks the number of times accept_bytes is
+    called.
+    """
+
+    def __init__(self):
+        self.calls = []
+
+    def accept_bytes(self, bytes):
+        self.calls.append('accept_bytes')
+
+    def finished_writing(self):
+        self.calls.append('finished_writing')
+
 
 class TestResponseEncodingProtocolThree(tests.TestCase):
 
@@ -2589,6 +2620,56 @@
             'e')
 
 
+class TestResponseEncoderBufferingProtocolThree(tests.TestCase):
+    """Tests for buffering of responses.
+
+    We want to avoid doing many small writes when one would do, to avoid
+    unnecessary network overhead.
+    """
+
+    def setUp(self):
+        self.writes = []
+        self.responder = protocol.ProtocolThreeResponder(self.writes.append)
+
+    def assertWriteCount(self, expected_count):
+        self.assertEqual(
+            expected_count, len(self.writes),
+            "Too many writes: %r" % (self.writes,))
+        
+    def test_send_error_writes_just_once(self):
+        """An error response is written to the medium all at once."""
+        self.responder.send_error(Exception('An exception string.'))
+        self.assertWriteCount(1)
+
+    def test_send_response_writes_just_once(self):
+        """A normal response with no body is written to the medium all at once.
+        """
+        response = _mod_request.SuccessfulSmartServerResponse(('arg', 'arg'))
+        self.responder.send_response(response)
+        self.assertWriteCount(1)
+
+    def test_send_response_with_body_writes_just_once(self):
+        """A normal response with a monolithic body is written to the medium
+        all at once.
+        """
+        response = _mod_request.SuccessfulSmartServerResponse(
+            ('arg', 'arg'), body='body bytes')
+        self.responder.send_response(response)
+        self.assertWriteCount(1)
+
+    def test_send_response_with_body_stream_writes_once_per_chunk(self):
+        """A normal response with a stream body is written to the medium
+        writes to the medium once per chunk.
+        """
+        # Construct a response with stream with 2 chunks in it.
+        response = _mod_request.SuccessfulSmartServerResponse(
+            ('arg', 'arg'), body_stream=['chunk1', 'chunk2'])
+        self.responder.send_response(response)
+        # We will write 3 times: exactly once for each chunk, plus a final
+        # write to end the response.
+        self.assertWriteCount(3)
+
+
 class TestSmartClientUnicode(tests.TestCase):
     """_SmartClient tests for unicode arguments.
 




More information about the bazaar-commits mailing list