Rev 4671: (robertc) Merge 2.0, includes EINTR support for bzr://, in file:///home/pqm/archives/thelove/bzr/%2Btrunk/

Canonical.com Patch Queue Manager pqm at pqm.ubuntu.com
Fri Sep 4 01:22:42 BST 2009


At file:///home/pqm/archives/thelove/bzr/%2Btrunk/

------------------------------------------------------------
revno: 4671 [merge]
revision-id: pqm at pqm.ubuntu.com-20090904002239-lnn3vwtab1tcbqfg
parent: pqm at pqm.ubuntu.com-20090903175453-3l05mwwkt0dwgees
parent: robertc at robertcollins.net-20090903234316-2wfb1eefkcx4qgl4
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Fri 2009-09-04 01:22:39 +0100
message:
  (robertc) Merge 2.0, includes EINTR support for bzr://,
  	and better stream representation with bzr:// streams.
modified:
  NEWS                           NEWS-20050323055033-4e00b5db738777ff
  bzrlib/smart/medium.py         medium.py-20061103051856-rgu2huy59fkz902q-1
  bzrlib/smart/repository.py     repository.py-20061128022038-vr5wy5bubyb8xttk-1
  bzrlib/tests/test_smart.py     test_smart.py-20061122024551-ol0l0o0oofsu9b3t-2
=== modified file 'NEWS'
--- a/NEWS	2009-09-02 18:07:58 +0000
+++ b/NEWS	2009-09-03 23:43:16 +0000
@@ -18,6 +18,14 @@
 Bug Fixes
 *********
 
+* Bazaar's native protocol code now correctly handles EINTR, which most
+  noticeably occurs if you break in to the debugger while connected to a
+  bzr+ssh server.  You can now can continue from the debugger (by typing
+  'c') and the process continues.  However, note that pressing C-\ in the
+  shell may still kill the SSH process, which is bug 162509, so you must
+  sent a signal to the bzr process specifically, for example by typing
+  ``kill -QUIT PID`` in another shell.  (Martin Pool, #341535)
+
 * ``bzr check`` in pack-0.92, 1.6 and 1.9 format repositories will no
   longer report incorrect errors about ``Missing inventory ('TREE_ROOT', ...)``
   (Robert Collins, #416732)
@@ -25,6 +33,9 @@
 * Don't restrict the command name used to run the test suite.
   (Vincent Ladeuil, #419950)
 
+* Network streams now decode adjacent records of the same type into a
+  single stream, reducing layering churn. (Robert Collins)
+
 Improvements
 ************
 
@@ -55,12 +66,20 @@
   parameterisation, and is substantially faster. (Robert Collins)
 
 
-bzr 2.0rc2
-##########
+bzr 2.0rc2 (not released yet)
+#############################
 
 Bug Fixes
 *********
 
+* Bazaar's native protocol code now correctly handles EINTR, which most
+  noticeably occurs if you break in to the debugger while connected to a
+  bzr+ssh server.  You can now can continue from the debugger (by typing
+  'c') and the process continues.  However, note that pressing C-\ in the
+  shell may still kill the SSH process, which is bug 162509, so you must
+  sent a signal to the bzr process specifically, for example by typing
+  ``kill -QUIT PID`` in another shell.  (Martin Pool, #341535)
+
 * ``bzr check`` in pack-0.92, 1.6 and 1.9 format repositories will no
   longer report incorrect errors about ``Missing inventory ('TREE_ROOT', ...)``
   (Robert Collins, #416732)
@@ -69,8 +88,8 @@
   revisions that are in the fallback repository. (Regressed in 2.0rc1).
   (John Arbash Meinel, #419241)
 
-* Fix a segmentation fault when computing the ``merge_sort`` of a graph
-  that has a ghost in the mainline ancestry.
+* Fix a potential segmentation fault when doing 'log' of a branch that had
+  ghosts in its mainline.  (Evaluating None as a tuple is bad.)
   (John Arbash Meinel, #419241)
 
 * ``groupcompress`` sort order is now more stable, rather than relying on
@@ -81,13 +100,15 @@
   bugfix vs 2.0rc1, and all 2.0rc1 users should upgrade to 2.0rc2 before
   converting repositories. (Robert Collins, #422849)
 
+* Network streams now decode adjacent records of the same type into a
+  single stream, reducing layering churn. (Robert Collins)
+
 Documentation
 *************
 
 * The main table of contents now provides links to the new Migration Docs
   and Plugins Guide. (Ian Clatworthy)
 
-
 bzr 2.0rc1
 ##########
 

=== modified file 'bzrlib/smart/medium.py'
--- a/bzrlib/smart/medium.py	2009-08-07 05:56:29 +0000
+++ b/bzrlib/smart/medium.py	2009-09-01 07:47:36 +0000
@@ -291,7 +291,7 @@
     def terminate_due_to_error(self):
         # TODO: This should log to a server log file, but no such thing
         # exists yet.  Andrew Bennetts 2006-09-29.
-        self.socket.close()
+        osutils.until_no_eintr(self.socket.close)
         self.finished = True
 
     def _write_out(self, bytes):
@@ -326,27 +326,27 @@
             bytes_to_read = protocol.next_read_size()
             if bytes_to_read == 0:
                 # Finished serving this request.
-                self._out.flush()
+                osutils.until_no_eintr(self._out.flush)
                 return
             bytes = self.read_bytes(bytes_to_read)
             if bytes == '':
                 # Connection has been closed.
                 self.finished = True
-                self._out.flush()
+                osutils.until_no_eintr(self._out.flush)
                 return
             protocol.accept_bytes(bytes)
 
     def _read_bytes(self, desired_count):
-        return self._in.read(desired_count)
+        return osutils.until_no_eintr(self._in.read, desired_count)
 
     def terminate_due_to_error(self):
         # TODO: This should log to a server log file, but no such thing
         # exists yet.  Andrew Bennetts 2006-09-29.
-        self._out.close()
+        osutils.until_no_eintr(self._out.close)
         self.finished = True
 
     def _write_out(self, bytes):
-        self._out.write(bytes)
+        osutils.until_no_eintr(self._out.write, bytes)
 
 
 class SmartClientMediumRequest(object):
@@ -712,16 +712,16 @@
 
     def _accept_bytes(self, bytes):
         """See SmartClientStreamMedium.accept_bytes."""
-        self._writeable_pipe.write(bytes)
+        osutils.until_no_eintr(self._writeable_pipe.write, bytes)
         self._report_activity(len(bytes), 'write')
 
     def _flush(self):
         """See SmartClientStreamMedium._flush()."""
-        self._writeable_pipe.flush()
+        osutils.until_no_eintr(self._writeable_pipe.flush)
 
     def _read_bytes(self, count):
         """See SmartClientStreamMedium._read_bytes."""
-        bytes = self._readable_pipe.read(count)
+        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
         self._report_activity(len(bytes), 'read')
         return bytes
 
@@ -765,15 +765,15 @@
     def _accept_bytes(self, bytes):
         """See SmartClientStreamMedium.accept_bytes."""
         self._ensure_connection()
-        self._write_to.write(bytes)
+        osutils.until_no_eintr(self._write_to.write, bytes)
         self._report_activity(len(bytes), 'write')
 
     def disconnect(self):
         """See SmartClientMedium.disconnect()."""
         if not self._connected:
             return
-        self._read_from.close()
-        self._write_to.close()
+        osutils.until_no_eintr(self._read_from.close)
+        osutils.until_no_eintr(self._write_to.close)
         self._ssh_connection.close()
         self._connected = False
 
@@ -802,7 +802,7 @@
         if not self._connected:
             raise errors.MediumNotConnected(self)
         bytes_to_read = min(count, _MAX_READ_SIZE)
-        bytes = self._read_from.read(bytes_to_read)
+        bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
         self._report_activity(len(bytes), 'read')
         return bytes
 
@@ -832,7 +832,7 @@
         """See SmartClientMedium.disconnect()."""
         if not self._connected:
             return
-        self._socket.close()
+        osutils.until_no_eintr(self._socket.close)
         self._socket = None
         self._connected = False
 

=== modified file 'bzrlib/smart/repository.py'
--- a/bzrlib/smart/repository.py	2009-08-14 00:55:42 +0000
+++ b/bzrlib/smart/repository.py	2009-09-03 00:33:35 +0000
@@ -519,36 +519,97 @@
     yield pack_writer.end()
 
 
+class _ByteStreamDecoder(object):
+    """Helper for _byte_stream_to_stream.
+
+    The expected usage of this class is via the function _byte_stream_to_stream
+    which creates a _ByteStreamDecoder, pops off the stream format and then
+    yields the output of record_stream(), the main entry point to
+    _ByteStreamDecoder.
+
+    Broadly this class has to unwrap two layers of iterators:
+    (type, substream)
+    (substream details)
+
+    This is complicated by wishing to return type, iterator_for_type, but
+    getting the data for iterator_for_type when we find out type: we can't
+    simply pass a generator down to the NetworkRecordStream parser, instead
+    we have a little local state to seed each NetworkRecordStream instance,
+    and gather the type that we'll be yielding.
+
+    :ivar byte_stream: The byte stream being decoded.
+    :ivar stream_decoder: A pack parser used to decode the bytestream
+    :ivar current_type: The current type, used to join adjacent records of the
+        same type into a single stream.
+    :ivar first_bytes: The first bytes to give the next NetworkRecordStream.
+    """
+
+    def __init__(self, byte_stream):
+        """Create a _ByteStreamDecoder."""
+        self.stream_decoder = pack.ContainerPushParser()
+        self.current_type = None
+        self.first_bytes = None
+        self.byte_stream = byte_stream
+
+    def iter_stream_decoder(self):
+        """Iterate the contents of the pack from stream_decoder."""
+        # dequeue pending items
+        for record in self.stream_decoder.read_pending_records():
+            yield record
+        # Pull bytes of the wire, decode them to records, yield those records.
+        for bytes in self.byte_stream:
+            self.stream_decoder.accept_bytes(bytes)
+            for record in self.stream_decoder.read_pending_records():
+                yield record
+
+    def iter_substream_bytes(self):
+        if self.first_bytes is not None:
+            yield self.first_bytes
+            # If we run out of pack records, single the outer layer to stop.
+            self.first_bytes = None
+        for record in self.iter_pack_records:
+            record_names, record_bytes = record
+            record_name, = record_names
+            substream_type = record_name[0]
+            if substream_type != self.current_type:
+                # end of a substream, seed the next substream.
+                self.current_type = substream_type
+                self.first_bytes = record_bytes
+                return
+            yield record_bytes
+
+    def record_stream(self):
+        """Yield substream_type, substream from the byte stream."""
+        self.seed_state()
+        # Make and consume sub generators, one per substream type:
+        while self.first_bytes is not None:
+            substream = NetworkRecordStream(self.iter_substream_bytes())
+            # after substream is fully consumed, self.current_type is set to
+            # the next type, and self.first_bytes is set to the matching bytes.
+            yield self.current_type, substream.read()
+
+    def seed_state(self):
+        """Prepare the _ByteStreamDecoder to decode from the pack stream."""
+        # Set a single generator we can use to get data from the pack stream.
+        self.iter_pack_records = self.iter_stream_decoder()
+        # Seed the very first subiterator with content; after this each one
+        # seeds the next.
+        list(self.iter_substream_bytes())
+
+
 def _byte_stream_to_stream(byte_stream):
     """Convert a byte stream into a format and a stream.
 
     :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
     :return: (RepositoryFormat, stream_generator)
     """
-    stream_decoder = pack.ContainerPushParser()
-    def record_stream():
-        """Closure to return the substreams."""
-        # May have fully parsed records already.
-        for record in stream_decoder.read_pending_records():
-            record_names, record_bytes = record
-            record_name, = record_names
-            substream_type = record_name[0]
-            substream = NetworkRecordStream([record_bytes])
-            yield substream_type, substream.read()
-        for bytes in byte_stream:
-            stream_decoder.accept_bytes(bytes)
-            for record in stream_decoder.read_pending_records():
-                record_names, record_bytes = record
-                record_name, = record_names
-                substream_type = record_name[0]
-                substream = NetworkRecordStream([record_bytes])
-                yield substream_type, substream.read()
+    decoder = _ByteStreamDecoder(byte_stream)
     for bytes in byte_stream:
-        stream_decoder.accept_bytes(bytes)
-        for record in stream_decoder.read_pending_records(max=1):
+        decoder.stream_decoder.accept_bytes(bytes)
+        for record in decoder.stream_decoder.read_pending_records(max=1):
             record_names, src_format_name = record
             src_format = network_format_registry.get(src_format_name)
-            return src_format, record_stream()
+            return src_format, decoder.record_stream()
 
 
 class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):

=== modified file 'bzrlib/tests/test_smart.py'
--- a/bzrlib/tests/test_smart.py	2009-08-27 22:17:35 +0000
+++ b/bzrlib/tests/test_smart.py	2009-09-03 23:43:16 +0000
@@ -36,6 +36,7 @@
     smart,
     tests,
     urlutils,
+    versionedfile,
     )
 from bzrlib.branch import Branch, BranchReferenceFormat
 import bzrlib.smart.branch
@@ -112,6 +113,25 @@
         return self.get_transport().get_smart_medium()
 
 
+class TestByteStreamToStream(tests.TestCase):
+
+    def test_repeated_substreams_same_kind_are_one_stream(self):
+        # Make a stream - an iterable of bytestrings.
+        stream = [('text', [versionedfile.FulltextContentFactory(('k1',), None,
+            None, 'foo')]),('text', [
+            versionedfile.FulltextContentFactory(('k2',), None, None, 'bar')])]
+        fmt = bzrdir.format_registry.get('pack-0.92')().repository_format
+        bytes = smart.repository._stream_to_byte_stream(stream, fmt)
+        streams = []
+        # Iterate the resulting iterable; checking that we get only one stream
+        # out.
+        fmt, stream = smart.repository._byte_stream_to_stream(bytes)
+        for kind, substream in stream:
+            streams.append((kind, list(substream)))
+        self.assertLength(1, streams)
+        self.assertLength(2, streams[0][1])
+
+
 class TestSmartServerResponse(tests.TestCase):
 
     def test__eq__(self):




More information about the bazaar-commits mailing list