Robert Collins robertc at
Sun Feb 15 21:47:54 GMT 2009


revno: 4007
revision-id: robertc at
parent: robertc at
committer: Robert Collins <robertc at>
branch nick: VersionedFiles.NetworkRecordStream
timestamp: Mon 2009-02-16 08:47:42 +1100
  First passing NetworkRecordStream test - a fulltext from any record type which isn't a chunked or fulltext can be serialised and deserialised successfully.
=== modified file 'bzrlib/'
--- a/bzrlib/	2009-02-15 21:24:20 +0000
+++ b/bzrlib/	2009-02-15 21:47:42 +0000
@@ -152,7 +152,7 @@
     """An adapter from FT annotated knits to unannotated ones."""
     def get_bytes(self, factory):
-        annotated_compressed_bytes = factory.get_bytes_as(factory.storage_kind)
+        annotated_compressed_bytes = factory._raw_record
         rec, contents = \
         content = self._annotate_factory.parse_fulltext(contents, rec[1])
@@ -164,7 +164,7 @@
     """An adapter for deltas from annotated to unannotated."""
     def get_bytes(self, factory):
-        annotated_compressed_bytes = factory.get_bytes_as(factory.storage_kind)
+        annotated_compressed_bytes = factory._raw_record
         rec, contents = \
         delta = self._annotate_factory.parse_line_delta(contents, rec[1],
@@ -178,7 +178,7 @@
     """An adapter from FT annotated knits to unannotated ones."""
     def get_bytes(self, factory):
-        annotated_compressed_bytes = factory.get_bytes_as(factory.storage_kind)
+        annotated_compressed_bytes = factory._raw_record
         rec, contents = \
         content, delta = self._annotate_factory.parse_record(factory.key[-1],
@@ -190,7 +190,7 @@
     """An adapter for deltas from annotated to unannotated."""
     def get_bytes(self, factory):
-        annotated_compressed_bytes = factory.get_bytes_as(factory.storage_kind)
+        annotated_compressed_bytes = factory._raw_record
         rec, contents = \
         delta = self._annotate_factory.parse_line_delta(contents, rec[1],
@@ -214,7 +214,7 @@
     """An adapter from FT plain knits to unannotated ones."""
     def get_bytes(self, factory):
-        compressed_bytes = factory.get_bytes_as(factory.storage_kind)
+        compressed_bytes = factory._raw_record
         rec, contents = \
         content, delta = self._plain_factory.parse_record(factory.key[-1],
@@ -226,7 +226,7 @@
     """An adapter for deltas from annotated to unannotated."""
     def get_bytes(self, factory):
-        compressed_bytes = factory.get_bytes_as(factory.storage_kind)
+        compressed_bytes = factory._raw_record
         rec, contents = \
         delta = self._plain_factory.parse_line_delta(contents, rec[1])
@@ -253,7 +253,7 @@
     def __init__(self, key, parents, build_details, sha1, raw_record,
-        annotated, knit=None):
+        annotated, knit=None, network_bytes=None):
         """Create a KnitContentFactory for key.
         :param key: The key.
@@ -263,6 +263,8 @@
         :param sha1: The sha1 expected from the full text of this object.
         :param raw_record: The bytes of the knit data from disk.
         :param annotated: True if the raw data is annotated.
+        :param network_bytes: None to calculate the network bytes on demand,
+            not-none if they are already known.
         self.sha1 = sha1
@@ -278,12 +280,31 @@
             annotated_kind = ''
         self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
         self._raw_record = raw_record
+        self._network_bytes = network_bytes
         self._build_details = build_details
         self._knit = knit
+    def _create_network_bytes(self):
+        """Create a fully serialised network version for transmission."""
+        # storage_kind, key, parents, Noeol, raw_record
+        key_bytes = '\x00'.join(self.key)
+        if self.parents is None:
+            parent_bytes = 'None:'
+        else:
+            parent_bytes = '\t'.join('\x00'.join(key) for key in self.parents)
+        if self._build_details[1]:
+            noeol = 'N'
+        else:
+            noeol = ' '
+        network_bytes = "%s\n%s\n%s\n%s%s" % (self.storage_kind, key_bytes,
+            parent_bytes, noeol, self._raw_record)
+        self._network_bytes = network_bytes
     def get_bytes_as(self, storage_kind):
         if storage_kind == self.storage_kind:
-            return self._raw_record
+            if self._network_bytes is None:
+                self._create_network_bytes()
+            return self._network_bytes
         if self._knit is not None:
             if storage_kind == 'chunked':
                 return self._knit.get_lines(self.key[0])
@@ -293,6 +314,38 @@
+def knit_network_to_record(storage_kind, bytes, line_end):
+    """Convert a network record to a record object.
+    :param storage_kind: The storage kind of the record.
+    :param bytes: The bytes of the record on the network.
+    """
+    start = line_end
+    line_end = bytes.find('\n', start)
+    key = bytes[:line_end].split('\x00')
+    start = line_end + 1
+    line_end = bytes.find('\n', start)
+    parent_line = bytes[start:line_end]
+    if parent_line == 'None:':
+        parents = None
+    else:
+        parents = tuple(
+            [segment.split('\x00') for segment in parent_line.split('\t')
+             if segment])
+    start = line_end + 1
+    noeol = bytes[start] != 'N'
+    if 'ft' in storage_kind:
+        method = 'fulltext'
+    else:
+        method = 'line-delta'
+    build_details = (method, noeol)
+    start = start + 1
+    raw_record = bytes[start:]
+    annotated = 'annotated' in storage_kind
+    return KnitContentFactory(key, parents, build_details, None, raw_record,
+        annotated, network_bytes=bytes)
 class KnitContent(object):
     """Content of a knit version to which deltas can be applied.
@@ -1416,7 +1469,9 @@
                         adapter = get_adapter(adapter_key)
                     bytes = adapter.get_bytes(record)
-                    bytes = record.get_bytes_as(record.storage_kind)
+                    # It's a knit record, it has a _raw_record field (even if
+                    # it was reconstituted from a network stream).
+                    bytes = record._raw_record
                 options = [record._build_details[0]]
                 if record._build_details[1]:

=== modified file 'bzrlib/tests/'
--- a/bzrlib/tests/	2009-02-15 21:24:20 +0000
+++ b/bzrlib/tests/	2009-02-15 21:47:42 +0000
@@ -1708,6 +1708,34 @@
         entries = files.get_record_stream(keys, 'topological', False)
         self.assertAbsentRecord(files, keys, parent_map, entries)
+    def test_get_record_stream_native_formats_are_wire_ready_one_ft(self):
+        files = self.get_versionedfiles()
+        key = self.get_simple_key('foo')
+        files.add_lines(key, (), ['my text\n', 'content'])
+        stream = files.get_record_stream([key], 'unordered', False)
+        record =
+        if record.storage_kind in ('chunked', 'fulltext'):
+            # chunked and fulltext representations are for direct use not wire
+            # serialisation: check they are able to be used that way.
+            self.assertEqual('my text\ncontent',
+                record.get_bytes_as('fulltext'))
+            self.assertEqual('my text\ncontent',
+                ''.join(record.get_bytes_as('fulltext')))
+        else:
+            bytes = [record.get_bytes_as(record.storage_kind)]
+            network_stream = versionedfile.NetworkRecordStream(bytes).read()
+            source_record = record
+            records = []
+            for record in network_stream:
+                records.append(record)
+                self.assertEqual(source_record.storage_kind,
+                    record.storage_kind)
+                self.assertEqual(source_record.parents, record.parents)
+                self.assertEqual(
+                    source_record.get_bytes_as(source_record.storage_kind),
+                    record.get_bytes_as(record.storage_kind))
+            self.assertEqual(1, len(records))
     def assertAbsentRecord(self, files, keys, parents, entries):
         """Helper for test_get_record_stream_missing_records_are_absent."""
         seen = set()

=== modified file 'bzrlib/'
--- a/bzrlib/	2009-02-03 01:38:45 +0000
+++ b/bzrlib/	2009-02-15 21:47:42 +0000
@@ -31,6 +31,7 @@
 from bzrlib import (
+    knit,
@@ -39,6 +40,7 @@
 from bzrlib.graph import DictParentsProvider, Graph, _StackedParentsProvider
 from bzrlib.transport.memory import MemoryTransport
+from bzrlib.util import bencode
 from bzrlib.inter import InterObject
 from bzrlib.registry import Registry
@@ -1472,3 +1474,40 @@
                 pb.update("iterating texts", i, len(keys))
             for l in self._get_lines(key):
                 yield (l, key)
+def network_bytes_to_kind_and_offset(network_bytes):
+    """Strip of a record kind from the front of network_bytes.
+    :param network_bytes: The bytes of a record.
+    :return: A tuple (storage_kind, offset_of_remaining_bytes)
+    """
+    line_end = network_bytes.find('\n')
+    storage_kind = network_bytes[:line_end]
+    return storage_kind, line_end + 1
+class NetworkRecordStream(object):
+    """A record_stream which reconstitures a serialised stream."""
+    def __init__(self, bytes_iterator):
+        """Create a NetworkRecordStream.
+        :param bytes_iterator: An iterator of bytes. Each item in this
+            iterator should have been obtained from a record_streams'
+            record.get_bytes_as(record.storage_kind) call.
+        """
+        self._bytes_iterator = bytes_iterator
+        self._kind_factory = {'knit-ft-gz':knit.knit_network_to_record,
+            'knit-annotated-ft-gz':knit.knit_network_to_record,
+            }
+    def read(self):
+        """Read the stream.
+        :return: An iterator as per VersionedFiles.get_record_stream().
+        """
+        for bytes in self._bytes_iterator:
+            storage_kind, line_end = network_bytes_to_kind_and_offset(bytes)
+            yield self._kind_factory[storage_kind](
+                storage_kind, bytes, line_end)

