Rev 5139: Merge in the stream retry code, and update for the slightly newer apis. in http://bazaar.launchpad.net/~jameinel/bzr/2.2-client-reconnect-819604

John Arbash Meinel john at arbash-meinel.com
Mon Oct 10 13:38:38 UTC 2011


At http://bazaar.launchpad.net/~jameinel/bzr/2.2-client-reconnect-819604

------------------------------------------------------------
revno: 5139 [merge]
revision-id: john at arbash-meinel.com-20111010133817-z32ejjkw7u1lbfes
parent: john at arbash-meinel.com-20111010133630-uoso9nus93so7smd
parent: john at arbash-meinel.com-20111010125235-zbyfyk8gfhkybjbb
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: 2.2-client-reconnect-819604
timestamp: Mon 2011-10-10 15:38:17 +0200
message:
  Merge in the stream retry code, and update for the slightly newer apis.
modified:
  bzrlib/smart/client.py         client.py-20061116014825-2k6ada6xgulslami-1
  bzrlib/smart/medium.py         medium.py-20061103051856-rgu2huy59fkz902q-1
  bzrlib/smart/protocol.py       protocol.py-20061108035435-ot0lstk2590yqhzr-1
  bzrlib/tests/test_smart_transport.py test_ssh_transport.py-20060608202016-c25gvf1ob7ypbus6-2
-------------- next part --------------
=== modified file 'bzrlib/smart/client.py'
--- a/bzrlib/smart/client.py	2011-10-10 13:04:31 +0000
+++ b/bzrlib/smart/client.py	2011-10-10 13:38:17 +0000
@@ -273,9 +273,13 @@
             # Connection is dead, so close our end of it.
             self.client._medium.reset()
             if (('noretry' in debug.debug_flags)
-                or self.body_stream is not None):
+                or (self.body_stream is not None
+                    and encoder.body_stream_started)):
                 # We can't restart a body_stream that has been partially
                 # consumed, so we don't retry.
+                # Note: We don't have to worry about
+                #   SmartClientRequestProtocolOne or Two, because they don't
+                #   support client-side body streams.
                 raise
             trace.warning('ConnectionReset calling %r, retrying'
                           % (self.method,))

=== modified file 'bzrlib/smart/medium.py'
--- a/bzrlib/smart/medium.py	2011-10-10 13:36:30 +0000
+++ b/bzrlib/smart/medium.py	2011-10-10 13:38:17 +0000
@@ -738,8 +738,7 @@
         except IOError, e:
             if e.errno in (errno.EINVAL, errno.EPIPE):
                 raise errors.ConnectionReset(
-                    "Error trying to write to subprocess:\n%s"
-                    % (e,))
+                    "Error trying to write to subprocess:\n%s" % (e,))
             raise
         self._report_activity(len(bytes), 'write')
 

=== modified file 'bzrlib/smart/protocol.py'
--- a/bzrlib/smart/protocol.py	2011-10-10 13:36:30 +0000
+++ b/bzrlib/smart/protocol.py	2011-10-10 13:38:17 +0000
@@ -1289,6 +1289,7 @@
         _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
         self._medium_request = medium_request
         self._headers = {}
+        self.body_stream_started = None
 
     def set_headers(self, headers):
         self._headers = headers.copy()
@@ -1354,6 +1355,7 @@
             if path is not None:
                 mutter('                  (to %s)', path)
             self._request_start_time = osutils.timer_func()
+        self.body_stream_started = False
         self._write_protocol_version()
         self._write_headers(self._headers)
         self._write_structure(args)
@@ -1361,6 +1363,9 @@
         #       have finished sending the stream.  We would notice at the end
         #       anyway, but if the medium can deliver it early then it's good
         #       to short-circuit the whole request...
+        # Provoke any ConnectionReset failures before we start the body stream.
+        self.flush()
+        self.body_stream_started = True
         for exc_info, part in _iter_with_errors(stream):
             if exc_info is not None:
                 # Iterating the stream failed.  Cleanly abort the request.

=== modified file 'bzrlib/tests/test_smart_transport.py'
--- a/bzrlib/tests/test_smart_transport.py	2011-10-10 13:36:30 +0000
+++ b/bzrlib/tests/test_smart_transport.py	2011-10-10 13:38:17 +0000
@@ -2949,6 +2949,33 @@
             'e', # end
             output.getvalue())
 
+    def test_records_start_of_body_stream(self):
+        requester, output = self.make_client_encoder_and_output()
+        requester.set_headers({})
+        in_stream = [False]
+        def stream_checker():
+            self.assertTrue(requester.body_stream_started)
+            in_stream[0] = True
+            yield 'content'
+        flush_called = []
+        orig_flush = requester.flush
+        def tracked_flush():
+            flush_called.append(in_stream[0])
+            if in_stream[0]:
+                self.assertTrue(requester.body_stream_started)
+            else:
+                self.assertFalse(requester.body_stream_started)
+            return orig_flush()
+        requester.flush = tracked_flush
+        requester.call_with_body_stream(('one arg',), stream_checker())
+        self.assertEqual(
+            'bzr message 3 (bzr 1.6)\n' # protocol version
+            '\x00\x00\x00\x02de' # headers
+            's\x00\x00\x00\x0bl7:one arge' # args
+            'b\x00\x00\x00\x07content' # body
+            'e', output.getvalue())
+        self.assertEqual([False, True, True], flush_called)
+
 
 class StubMediumRequest(object):
     """A stub medium request that tracks the number of times accept_bytes is
@@ -3485,22 +3512,67 @@
             vendor.calls)
         self.assertRaises(errors.ConnectionReset, handler.read_response_tuple)
 
-    def test__send_doesnt_retry_body_stream(self):
-        # We don't know how much of body_stream would get iterated as part of
-        # _send before it failed to actually send the request, so we
-        # just always fail in this condition.
+    def test__send_request_retries_body_stream_if_not_started(self):
         output, vendor, smart_client = self.make_client_with_failing_medium()
         smart_request = client._SmartClientRequest(smart_client, 'hello', (),
             body_stream=['a', 'b'])
+        response_handler = smart_request._send(3)
+        # We connect, get disconnected, and notice before consuming the stream,
+        # so we try again one time and succeed.
+        self.assertEqual(
+            [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
+              ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
+             ('close',),
+             ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
+              ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
+            ],
+            vendor.calls)
+        self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
+                         '\x00\x00\x00\x02de'   # empty headers
+                         's\x00\x00\x00\tl5:helloe'
+                         'b\x00\x00\x00\x01a'
+                         'b\x00\x00\x00\x01b'
+                         'e',
+                         output.getvalue())
+
+    def test__send_request_stops_if_body_started(self):
+        # We intentionally use the python StringIO so that we can subclass it.
+        from StringIO import StringIO
+        response = StringIO()
+
+        class FailAfterFirstWrite(StringIO):
+            """Allow one 'write' call to pass, fail the rest"""
+            def __init__(self):
+                StringIO.__init__(self)
+                self._first = True
+
+            def write(self, s):
+                if self._first:
+                    self._first = False
+                    return StringIO.write(self, s)
+                raise IOError(errno.EINVAL, 'invalid file handle')
+        output = FailAfterFirstWrite()
+
+        vendor = FirstRejectedStringIOSSHVendor(response, output,
+            fail_at_write=False)
+        ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass')
+        client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor)
+        smart_client = client._SmartClient(client_medium, headers={})
+        smart_request = client._SmartClientRequest(smart_client, 'hello', (),
+            body_stream=['a', 'b'])
         self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
-        # We got one connect, but it fails, so we disconnect, but we don't
-        # retry it
+        # We connect, and manage to get to the point that we start consuming
+        # the body stream. The next write fails, so we just stop.
         self.assertEqual(
             [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
               ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
              ('close',),
             ],
             vendor.calls)
+        self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
+                         '\x00\x00\x00\x02de'   # empty headers
+                         's\x00\x00\x00\tl5:helloe',
+                         output.getvalue())
 
     def test__send_disabled_retry(self):
         debug.debug_flags.add('noretry')



More information about the bazaar-commits mailing list