Rev 4888: Move read_bytes_from_socket into osutils like it is in bzr-2.5. in http://bazaar.launchpad.net/~jameinel/bzr/2.1-all-reconnect-819604

John Arbash Meinel john at arbash-meinel.com
Wed Sep 12 07:39:49 UTC 2012


At http://bazaar.launchpad.net/~jameinel/bzr/2.1-all-reconnect-819604

------------------------------------------------------------
revno: 4888
revision-id: john at arbash-meinel.com-20120912073929-0vavweteyac1j3bx
parent: john at arbash-meinel.com-20120912070905-agpbab3b4oez5kwq
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: 2.1-all-reconnect-819604
timestamp: Wed 2012-09-12 11:39:29 +0400
message:
  Move read_bytes_from_socket into osutils like it is in bzr-2.5.
  
  This makes sure we are catching the extra exceptions, etc.
  
  The code is now essentially the same as 2.5 except for:
  a) No class SmartClientAlreadyConnectedSocketMedium, since I don't want to bring in
  the code that uses sockets to communicate to the subprocess.
  b) No code for auto-disconnecting clients. That is more of a server-side change
  and I only care to change the client side.
  c) A fair number of differences that are just not related.
-------------- next part --------------
=== modified file 'bzrlib/osutils.py'
--- a/bzrlib/osutils.py	2012-09-11 12:26:46 +0000
+++ b/bzrlib/osutils.py	2012-09-12 07:39:29 +0000
@@ -1944,19 +1944,46 @@
 del _eno
 
 
-def recv_all(socket, bytes):
+def read_bytes_from_socket(sock, report_activity=None,
+        max_read_size=MAX_SOCKET_CHUNK):
+    """Read up to max_read_size of bytes from sock and notify of progress.
+
+    Translates "Connection reset by peer" into file-like EOF (return an
+    empty string rather than raise an error), and repeats the recv if
+    interrupted by a signal.
+    """
+    while 1:
+        try:
+            bytes = sock.recv(max_read_size)
+        except socket.error, e:
+            eno = e.args[0]
+            if eno in _end_of_stream_errors:
+                # The connection was closed by the other side.  Callers expect
+                # an empty string to signal end-of-stream.
+                return ""
+            elif eno == errno.EINTR:
+                # Retry the interrupted recv.
+                continue
+            raise
+        else:
+            if report_activity is not None:
+                report_activity(len(bytes), 'read')
+            return bytes
+
+
+def recv_all(socket, count):
     """Receive an exact number of bytes.
 
     Regular Socket.recv() may return less than the requested number of bytes,
-    dependning on what's in the OS buffer.  MSG_WAITALL is not available
+    depending on what's in the OS buffer.  MSG_WAITALL is not available
     on all platforms, but this should work everywhere.  This will return
     less than the requested amount if the remote end closes.
 
     This isn't optimized and is intended mostly for use in testing.
     """
     b = ''
-    while len(b) < bytes:
-        new = until_no_eintr(socket.recv, bytes - len(b))
+    while len(b) < count:
+        new = read_bytes_from_socket(socket, None, count - len(b))
         if new == '':
             break # eof
         b += new

=== modified file 'bzrlib/smart/medium.py'
--- a/bzrlib/smart/medium.py	2011-10-07 16:11:40 +0000
+++ b/bzrlib/smart/medium.py	2012-09-12 07:39:29 +0000
@@ -1,4 +1,4 @@
-# Copyright (C) 2006-2010 Canonical Ltd
+# Copyright (C) 2006-2012 Canonical Ltd
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -50,11 +50,11 @@
 #usually already imported, and getting IllegalScoperReplacer on it here.
 from bzrlib import osutils
 
-# We must not read any more than 64k at a time so we don't risk "no buffer
-# space available" errors on some platforms.  Windows in particular is likely
-# to give error 10053 or 10055 if we read more than 64k from a socket.
-_MAX_READ_SIZE = 64 * 1024
-
+# Throughout this module buffer size parameters are either limited to be at
+# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
+# For this module's purposes, MAX_SOCKET_CHUNK is a reasonable size for reads
+# from non-sockets as well.
+_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
 
 def _get_protocol_factory_for_bytes(bytes):
     """Determine the right protocol factory for 'bytes'.
@@ -178,6 +178,14 @@
         ui.ui_factory.report_transport_activity(self, bytes, direction)
 
 
+_bad_file_descriptor = (errno.EBADF,)
+if sys.platform == 'win32':
+    # Given on Windows if you pass a closed socket to select.select. Probably
+    # also given if you pass a file handle to select.
+    WSAENOTSOCK = 10038
+    _bad_file_descriptor += (WSAENOTSOCK,)
+
+
 class SmartServerStreamMedium(SmartMedium):
     """Handles smart commands coming over a stream.
 
@@ -241,6 +249,8 @@
 
         :param protocol: a SmartServerRequestProtocol.
         """
+        if protocol is None:
+            return
         try:
             self._serve_one_request_unguarded(protocol)
         except KeyboardInterrupt:
@@ -276,9 +286,9 @@
     def _serve_one_request_unguarded(self, protocol):
         while protocol.next_read_size():
             # We can safely try to read large chunks.  If there is less data
-            # than _MAX_READ_SIZE ready, the socket wil just return a short
-            # read immediately rather than block.
-            bytes = self.read_bytes(_MAX_READ_SIZE)
+            # than MAX_SOCKET_CHUNK ready, the socket will just return a
+            # short read immediately rather than block.
+            bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK)
             if bytes == '':
                 self.finished = True
                 return
@@ -287,13 +297,13 @@
         self._push_back(protocol.unused_data)
 
     def _read_bytes(self, desired_count):
-        return _read_bytes_from_socket(
-            self.socket.recv, desired_count, self._report_activity)
+        return osutils.read_bytes_from_socket(
+            self.socket, self._report_activity)
 
     def terminate_due_to_error(self):
         # TODO: This should log to a server log file, but no such thing
         # exists yet.  Andrew Bennetts 2006-09-29.
-        osutils.until_no_eintr(self.socket.close)
+        self.socket.close()
         self.finished = True
 
     def _write_out(self, bytes):
@@ -345,16 +355,16 @@
             protocol.accept_bytes(bytes)
 
     def _read_bytes(self, desired_count):
-        return osutils.until_no_eintr(self._in.read, desired_count)
+        return self._in.read(desired_count)
 
     def terminate_due_to_error(self):
         # TODO: This should log to a server log file, but no such thing
         # exists yet.  Andrew Bennetts 2006-09-29.
-        osutils.until_no_eintr(self._out.close)
+        self._out.close()
         self.finished = True
 
     def _write_out(self, bytes):
-        osutils.until_no_eintr(self._out.write, bytes)
+        self._out.write(bytes)
 
 
 class SmartClientMediumRequest(object):
@@ -735,11 +745,11 @@
     def _accept_bytes(self, bytes):
         """See SmartClientStreamMedium.accept_bytes."""
         try:
-            osutils.until_no_eintr(self._writeable_pipe.write, bytes)
+            self._writeable_pipe.write(bytes)
         except IOError, e:
             if e.errno in (errno.EINVAL, errno.EPIPE):
                 raise errors.ConnectionReset(
-                    "Error trying to write to subprocess:\n%s" % (e,))
+                    "Error trying to write to subprocess", e)
             raise
         self._report_activity(len(bytes), 'write')
 
@@ -748,11 +758,12 @@
         # Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
         #       However, testing shows that even when the child process is
         #       gone, this doesn't error.
-        osutils.until_no_eintr(self._writeable_pipe.flush)
+        self._writeable_pipe.flush()
 
     def _read_bytes(self, count):
         """See SmartClientStreamMedium._read_bytes."""
-        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
+        bytes_to_read = min(count, _MAX_READ_SIZE)
+        bytes = self._readable_pipe.read(bytes_to_read)
         self._report_activity(len(bytes), 'read')
         return bytes
 
@@ -782,9 +793,9 @@
         # _DebugCounter so we have to store all the values used in our repr
         # method before calling the super init.
         SmartClientStreamMedium.__init__(self, base)
-        self._ssh_connection = None
         self._vendor = vendor
         self._bzr_remote_path = bzr_remote_path
+        self._ssh_connection = None
 
     def __repr__(self):
         if self._port is None:
@@ -864,7 +875,7 @@
         """See SmartClientMedium.disconnect()."""
         if not self._connected:
             return
-        osutils.until_no_eintr(self._socket.close)
+        self._socket.close()
         self._socket = None
         self._connected = False
 
@@ -918,8 +929,8 @@
         """See SmartClientMedium.read_bytes."""
         if not self._connected:
             raise errors.MediumNotConnected(self)
-        return _read_bytes_from_socket(
-            self._socket.recv, count, self._report_activity)
+        return osutils.read_bytes_from_socket(
+            self._socket, self._report_activity)
 
 
 class SmartClientStreamMediumRequest(SmartClientMediumRequest):
@@ -960,25 +971,3 @@
         This invokes self._medium._flush to ensure all bytes are transmitted.
         """
         self._medium._flush()
-
-
-WSAECONNABORTED = 10053
-WSAECONNRESET = 10054
-
-def _read_bytes_from_socket(sock, desired_count, report_activity):
-    # We ignore the desired_count because on sockets it's more efficient to
-    # read large chunks (of _MAX_READ_SIZE bytes) at a time.
-    try:
-        bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
-    except socket.error, e:
-        if len(e.args) and e.args[0] in (errno.ECONNRESET, WSAECONNABORTED,
-                                         WSAECONNRESET):
-            # The connection was closed by the other side.  Callers expect an
-            # empty string to signal end-of-stream.
-            bytes = ''
-        else:
-            raise
-    else:
-        report_activity(len(bytes), 'read')
-    return bytes
-



More information about the bazaar-commits mailing list