Rev 5139: Merge in the stream retry code, and update for the slightly newer apis. in http://bazaar.launchpad.net/~jameinel/bzr/2.2-client-reconnect-819604
John Arbash Meinel
john at arbash-meinel.com
Mon Oct 10 13:38:38 UTC 2011
At http://bazaar.launchpad.net/~jameinel/bzr/2.2-client-reconnect-819604
------------------------------------------------------------
revno: 5139 [merge]
revision-id: john at arbash-meinel.com-20111010133817-z32ejjkw7u1lbfes
parent: john at arbash-meinel.com-20111010133630-uoso9nus93so7smd
parent: john at arbash-meinel.com-20111010125235-zbyfyk8gfhkybjbb
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: 2.2-client-reconnect-819604
timestamp: Mon 2011-10-10 15:38:17 +0200
message:
Merge in the stream retry code, and update for the slightly newer apis.
modified:
bzrlib/smart/client.py client.py-20061116014825-2k6ada6xgulslami-1
bzrlib/smart/medium.py medium.py-20061103051856-rgu2huy59fkz902q-1
bzrlib/smart/protocol.py protocol.py-20061108035435-ot0lstk2590yqhzr-1
bzrlib/tests/test_smart_transport.py test_ssh_transport.py-20060608202016-c25gvf1ob7ypbus6-2
-------------- next part --------------
=== modified file 'bzrlib/smart/client.py'
--- a/bzrlib/smart/client.py 2011-10-10 13:04:31 +0000
+++ b/bzrlib/smart/client.py 2011-10-10 13:38:17 +0000
@@ -273,9 +273,13 @@
# Connection is dead, so close our end of it.
self.client._medium.reset()
if (('noretry' in debug.debug_flags)
- or self.body_stream is not None):
+ or (self.body_stream is not None
+ and encoder.body_stream_started)):
# We can't restart a body_stream that has been partially
# consumed, so we don't retry.
+ # Note: We don't have to worry about
+ # SmartClientRequestProtocolOne or Two, because they don't
+ # support client-side body streams.
raise
trace.warning('ConnectionReset calling %r, retrying'
% (self.method,))
=== modified file 'bzrlib/smart/medium.py'
--- a/bzrlib/smart/medium.py 2011-10-10 13:36:30 +0000
+++ b/bzrlib/smart/medium.py 2011-10-10 13:38:17 +0000
@@ -738,8 +738,7 @@
except IOError, e:
if e.errno in (errno.EINVAL, errno.EPIPE):
raise errors.ConnectionReset(
- "Error trying to write to subprocess:\n%s"
- % (e,))
+ "Error trying to write to subprocess:\n%s" % (e,))
raise
self._report_activity(len(bytes), 'write')
=== modified file 'bzrlib/smart/protocol.py'
--- a/bzrlib/smart/protocol.py 2011-10-10 13:36:30 +0000
+++ b/bzrlib/smart/protocol.py 2011-10-10 13:38:17 +0000
@@ -1289,6 +1289,7 @@
_ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
self._medium_request = medium_request
self._headers = {}
+ self.body_stream_started = None
def set_headers(self, headers):
self._headers = headers.copy()
@@ -1354,6 +1355,7 @@
if path is not None:
mutter(' (to %s)', path)
self._request_start_time = osutils.timer_func()
+ self.body_stream_started = False
self._write_protocol_version()
self._write_headers(self._headers)
self._write_structure(args)
@@ -1361,6 +1363,9 @@
# have finished sending the stream. We would notice at the end
# anyway, but if the medium can deliver it early then it's good
# to short-circuit the whole request...
+ # Provoke any ConnectionReset failures before we start the body stream.
+ self.flush()
+ self.body_stream_started = True
for exc_info, part in _iter_with_errors(stream):
if exc_info is not None:
# Iterating the stream failed. Cleanly abort the request.
=== modified file 'bzrlib/tests/test_smart_transport.py'
--- a/bzrlib/tests/test_smart_transport.py 2011-10-10 13:36:30 +0000
+++ b/bzrlib/tests/test_smart_transport.py 2011-10-10 13:38:17 +0000
@@ -2949,6 +2949,33 @@
'e', # end
output.getvalue())
+ def test_records_start_of_body_stream(self):
+ requester, output = self.make_client_encoder_and_output()
+ requester.set_headers({})
+ in_stream = [False]
+ def stream_checker():
+ self.assertTrue(requester.body_stream_started)
+ in_stream[0] = True
+ yield 'content'
+ flush_called = []
+ orig_flush = requester.flush
+ def tracked_flush():
+ flush_called.append(in_stream[0])
+ if in_stream[0]:
+ self.assertTrue(requester.body_stream_started)
+ else:
+ self.assertFalse(requester.body_stream_started)
+ return orig_flush()
+ requester.flush = tracked_flush
+ requester.call_with_body_stream(('one arg',), stream_checker())
+ self.assertEqual(
+ 'bzr message 3 (bzr 1.6)\n' # protocol version
+ '\x00\x00\x00\x02de' # headers
+ 's\x00\x00\x00\x0bl7:one arge' # args
+ 'b\x00\x00\x00\x07content' # body
+ 'e', output.getvalue())
+ self.assertEqual([False, True, True], flush_called)
+
class StubMediumRequest(object):
"""A stub medium request that tracks the number of times accept_bytes is
@@ -3485,22 +3512,67 @@
vendor.calls)
self.assertRaises(errors.ConnectionReset, handler.read_response_tuple)
- def test__send_doesnt_retry_body_stream(self):
- # We don't know how much of body_stream would get iterated as part of
- # _send before it failed to actually send the request, so we
- # just always fail in this condition.
+ def test__send_request_retries_body_stream_if_not_started(self):
output, vendor, smart_client = self.make_client_with_failing_medium()
smart_request = client._SmartClientRequest(smart_client, 'hello', (),
body_stream=['a', 'b'])
+ response_handler = smart_request._send(3)
+ # We connect, get disconnected, and notice before consuming the stream,
+ # so we try again one time and succeed.
+ self.assertEqual(
+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
+ ('close',),
+ ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
+ ],
+ vendor.calls)
+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
+ '\x00\x00\x00\x02de' # empty headers
+ 's\x00\x00\x00\tl5:helloe'
+ 'b\x00\x00\x00\x01a'
+ 'b\x00\x00\x00\x01b'
+ 'e',
+ output.getvalue())
+
+ def test__send_request_stops_if_body_started(self):
+ # We intentionally use the python StringIO so that we can subclass it.
+ from StringIO import StringIO
+ response = StringIO()
+
+ class FailAfterFirstWrite(StringIO):
+ """Allow one 'write' call to pass, fail the rest"""
+ def __init__(self):
+ StringIO.__init__(self)
+ self._first = True
+
+ def write(self, s):
+ if self._first:
+ self._first = False
+ return StringIO.write(self, s)
+ raise IOError(errno.EINVAL, 'invalid file handle')
+ output = FailAfterFirstWrite()
+
+ vendor = FirstRejectedStringIOSSHVendor(response, output,
+ fail_at_write=False)
+ ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass')
+ client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor)
+ smart_client = client._SmartClient(client_medium, headers={})
+ smart_request = client._SmartClientRequest(smart_client, 'hello', (),
+ body_stream=['a', 'b'])
self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
- # We got one connect, but it fails, so we disconnect, but we don't
- # retry it
+ # We connect, and manage to get to the point that we start consuming
+ # the body stream. The next write fails, so we just stop.
self.assertEqual(
[('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
('close',),
],
vendor.calls)
+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
+ '\x00\x00\x00\x02de' # empty headers
+ 's\x00\x00\x00\tl5:helloe',
+ output.getvalue())
def test__send_disabled_retry(self):
debug.debug_flags.add('noretry')
More information about the bazaar-commits
mailing list