Rev 3054: * Operations pulling data from a smart server where the underlying in http://people.ubuntu.com/~robertc/baz2.0/knit.datastreamjoin

Robert Collins robertc at robertcollins.net
Fri Nov 30 00:55:01 GMT 2007


At http://people.ubuntu.com/~robertc/baz2.0/knit.datastreamjoin

------------------------------------------------------------
revno: 3054
revision-id:robertc at robertcollins.net-20071130005436-0qmx32hyti0jz0y6
parent: robertc at robertcollins.net-20071129222449-r3r2er12d2p70wmy
committer: Robert Collins <robertc at robertcollins.net>
branch nick: knit.datastreamjoin
timestamp: Fri 2007-11-30 11:54:36 +1100
message:
  * Operations pulling data from a smart server where the underlying
    repositories have are respectively annotated an unannotated will now work.
    (Robert Collins, #165106).
modified:
  NEWS                           NEWS-20050323055033-4e00b5db738777ff
  bzrlib/knit.py                 knit.py-20051212171256-f056ac8f0fbe1bd9
  bzrlib/tests/test_knit.py      test_knit.py-20051212171302-95d4c00dd5f11f2b
=== modified file 'NEWS'
--- a/NEWS	2007-11-29 18:41:01 +0000
+++ b/NEWS	2007-11-30 00:54:36 +0000
@@ -149,6 +149,10 @@
    * Obsolete packs are now cleaned up by pack and autopack operations.
      (Robert Collins, #153789)
 
+   * Operations pulling data from a smart server where the underlying
+     repositories have are respectively annotated an unannotated will now work.
+     (Robert Collins, #165106).
+
    * Reconcile now shows progress bars. (Robert Collins, #159351)
 
    * ``RemoteBranch`` was not initializing ``self._revision_id_to_revno_map``

=== modified file 'bzrlib/knit.py'
--- a/bzrlib/knit.py	2007-11-29 00:22:51 +0000
+++ b/bzrlib/knit.py	2007-11-30 00:54:36 +0000
@@ -96,7 +96,6 @@
     KnitError,
     InvalidRevisionId,
     KnitCorrupt,
-    KnitDataStreamIncompatible,
     KnitHeaderError,
     RevisionNotPresent,
     RevisionAlreadyPresent,
@@ -738,8 +737,10 @@
         """
         if format != self.get_format_signature():
             trace.mutter('incompatible format signature inserting to %r', self)
-            raise KnitDataStreamIncompatible(
-                format, self.get_format_signature())
+            source = self._knit_from_datastream(
+                (format, data_list, reader_callable))
+            self.join(source)
+            return
 
         for version_id, options, length, parents in data_list:
             if self.has_version(version_id):
@@ -787,6 +788,28 @@
                     [(version_id, options, parents, length)],
                     reader_callable(length))
 
+    def _knit_from_datastream(self, (format, data_list, reader_callable)):
+        """Create a knit object from a data stream.
+
+        This method exists to allow conversion of data streams that do not
+        match the signature of this knit. Generally it will be slower and use
+        more memory to use this method to insert data, but it will work.
+
+        :seealso: get_data_stream for details on datastreams.
+        :return: A knit versioned file which can be used to join the datastream
+            into self.
+        """
+        if format == "knit-plain":
+            factory = KnitPlainFactory()
+        elif format == "knit-annotated":
+            factory = KnitAnnotateFactory()
+        else:
+            raise errors.KnitDataStreamUnknown(format)
+        access = _StreamAccess(reader_callable)
+        index = _StreamIndex(data_list)
+        return KnitVersionedFile(self.filename, self.transport,
+            factory=factory, index=index, access_method=access)
+
     def versions(self):
         """See VersionedFile.versions."""
         if 'evil' in debug.debug_flags:
@@ -1512,9 +1535,9 @@
             return 'line-delta'
 
     def get_options(self, version_id):
-        """Return a string represention options.
+        """Return a string representing options.
 
-        e.g. foo,bar
+        e.g. ['foo', 'bar']
         """
         return self._cache[version_id][1]
 
@@ -1748,9 +1771,9 @@
             raise RevisionNotPresent(version_id, self)
 
     def get_options(self, version_id):
-        """Return a string represention options.
+        """Return a list representing options.
 
-        e.g. foo,bar
+        e.g. ['foo', 'bar']
         """
         node = self._get_node(version_id)
         if not self._deltas:
@@ -2022,6 +2045,109 @@
         self.write_index = index
 
 
+class _StreamAccess(object):
+    """A Knit Access object that provides data from a datastream."""
+
+    def __init__(self, reader_callable):
+        """Create a _StreamAccess object.
+
+        :param reader_callable: The reader_callable from the datastream.
+            This is called to buffer all the data immediately, for 
+            random access.
+        """
+        self.data = reader_callable(None)
+
+    def get_raw_records(self, memos_for_retrieval):
+        """Get the raw bytes for a records.
+
+        :param memos_for_retrieval: An iterable containing the (index, pos, 
+            length) memo for retrieving the bytes. The .knit method ignores
+            the index as there is always only a single file.
+        :return: An iterator over the bytes of the records.
+        """
+        # use a generator for memory friendliness
+        for _, start, end in memos_for_retrieval:
+            yield self.data[start:end]
+
+
+class _StreamIndex(object):
+    """A Knit Index object that uses the data map from a datastream."""
+
+    def __init__(self, data_list):
+        """Create a _StreamIndex object.
+
+        :param data_list: The data_list from the datastream.
+        """
+        self.data_list = data_list
+        self._by_version = {}
+        pos = 0
+        for key, options, length, parents in data_list:
+            self._by_version[key] = options, (pos, pos + length), parents
+            pos += length
+
+    def get_ancestry(self, versions, topo_sorted):
+        """Get an ancestry list for versions."""
+        if topo_sorted:
+            # Not needed for basic joins
+            raise NotImplementedError(self.get_ancestry)
+        # get a graph of all the mentioned versions:
+        # Little ugly - basically copied from KnitIndex, but don't want to
+        # accidentally incorporate too much of that index's code.
+        graph = {}
+        pending = set(versions)
+        cache = self._by_version
+        while pending:
+            version = pending.pop()
+            # trim ghosts
+            try:
+                parents = [p for p in cache[version][2] if p in cache]
+            except KeyError:
+                raise RevisionNotPresent(version, self)
+            # if not completed and not a ghost
+            pending.update([p for p in parents if p not in graph])
+            graph[version] = parents
+        return graph.keys()
+
+    def get_options(self, version_id):
+        """Return a string representing options.
+
+        e.g. ['foo', 'bar']
+        """
+        return self._by_version[version_id][0]
+
+    def get_parents_with_ghosts(self, version_id):
+        """Return parents of specified version with ghosts."""
+        return self._by_version[version_id][2]
+
+    def get_position(self, version_id):
+        """Return details needed to access the version.
+        
+        _StreamAccess has the data as a big array, so we return slice
+        coordinates into that (as index_memo's are opaque outside the 
+        index and matching access class.
+
+        :return: a tuple (None, start, end).
+        """
+        start, end = self._by_version[version_id][1]
+        return None, start, end
+
+    def get_versions(self):
+        """Get all the versions in the stream."""
+        return self._by_version.keys()
+
+    def iter_parents(self, version_ids):
+        """Iterate through the parents for many version ids.
+
+        :param version_ids: An iterable yielding version_ids.
+        :return: An iterator that yields (version_id, parents). Requested 
+            version_ids not present in the versioned file are simply skipped.
+            The order is undefined, allowing for different optimisations in
+            the underlying implementation.
+        """
+        return [(version, self._by_version[version][2]) for 
+            version in version_ids]
+
+
 class _KnitData(object):
     """Manage extraction of data from a KnitAccess, caching and decompressing.
     
@@ -2316,7 +2442,8 @@
             if None in version_ids:
                 version_ids.remove(None)
     
-            self.source_ancestry = set(self.source.get_ancestry(version_ids))
+            self.source_ancestry = set(self.source.get_ancestry(version_ids,
+                topo_sorted=False))
             this_versions = set(self.target._index.get_versions())
             # XXX: For efficiency we should not look at the whole index,
             #      we only need to consider the referenced revisions - they

=== modified file 'bzrlib/tests/test_knit.py'
--- a/bzrlib/tests/test_knit.py	2007-11-26 22:33:25 +0000
+++ b/bzrlib/tests/test_knit.py	2007-11-30 00:54:36 +0000
@@ -47,6 +47,8 @@
     _KnitIndex,
     _PackAccess,
     PlainKnitContent,
+    _StreamAccess,
+    _StreamIndex,
     WeaveToKnit,
     KnitSequenceMatcher,
     )
@@ -1132,6 +1134,38 @@
         self.assertTrue(k.has_version('text-1'))
         self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
+    def test_newline_empty_lines(self):
+        # ensure that ["\n"] round trips ok.
+        knit = self.make_test_knit()
+        knit.add_lines('a', [], ["\n"])
+        knit.add_lines_with_ghosts('b', [], ["\n"])
+        self.assertEqual(["\n"], knit.get_lines('a'))
+        self.assertEqual(["\n"], knit.get_lines('b'))
+        self.assertEqual(['fulltext'], knit._index.get_options('a'))
+        self.assertEqual(['fulltext'], knit._index.get_options('b'))
+        knit.add_lines('c', ['a'], ["\n"])
+        knit.add_lines_with_ghosts('d', ['b'], ["\n"])
+        self.assertEqual(["\n"], knit.get_lines('c'))
+        self.assertEqual(["\n"], knit.get_lines('d'))
+        self.assertEqual(['line-delta'], knit._index.get_options('c'))
+        self.assertEqual(['line-delta'], knit._index.get_options('d'))
+
+    def test_empty_lines(self):
+        # bizarrely, [] is not listed as having no-eol. 
+        knit = self.make_test_knit()
+        knit.add_lines('a', [], [])
+        knit.add_lines_with_ghosts('b', [], [])
+        self.assertEqual([], knit.get_lines('a'))
+        self.assertEqual([], knit.get_lines('b'))
+        self.assertEqual(['fulltext'], knit._index.get_options('a'))
+        self.assertEqual(['fulltext'], knit._index.get_options('b'))
+        knit.add_lines('c', ['a'], [])
+        knit.add_lines_with_ghosts('d', ['b'], [])
+        self.assertEqual([], knit.get_lines('c'))
+        self.assertEqual([], knit.get_lines('d'))
+        self.assertEqual(['line-delta'], knit._index.get_options('c'))
+        self.assertEqual(['line-delta'], knit._index.get_options('d'))
+
     def test_knit_reload(self):
         # test that the content in a reloaded knit is correct
         k = self.make_test_knit()
@@ -1748,6 +1782,17 @@
             knit1.transport.get_bytes(knit1._index._filename),
             knit2.transport.get_bytes(knit2._index._filename))
 
+    def assertKnitValuesEqual(self, left, right):
+        """Assert that the texts and graph of left and right are the same."""
+        self.assertEqual(set(left.versions()), set(right.versions()))
+        for version in left.versions():
+            self.assertEqual(left.get_parents_with_ghosts(version),
+                right.get_parents_with_ghosts(version))
+            self.assertEqual(left.get_lines(version),
+                right.get_lines(version))
+            self.assertEqual(left.annotate(version),
+                right.annotate(version))
+
     def test_insert_data_stream_empty(self):
         """Inserting a data stream with no records should not put any data into
         the knit.
@@ -1768,12 +1813,23 @@
         source = self.make_test_knit(name='source')
         source.add_lines('text-a', [], split_lines(TEXT_1))
         data_stream = source.get_data_stream(['text-a'])
-        
         target = self.make_test_knit(name='target')
         target.insert_data_stream(data_stream)
-        
         self.assertKnitFilesEqual(source, target)
 
+    def test_insert_data_stream_annotated_unannotated(self):
+        """Inserting an annotated datastream to an unannotated knit works."""
+        # case one - full texts.
+        source = self.make_test_knit(name='source', annotate=True)
+        target = self.make_test_knit(name='target', annotate=False)
+        source.add_lines('text-a', [], split_lines(TEXT_1))
+        target.insert_data_stream(source.get_data_stream(['text-a']))
+        self.assertKnitValuesEqual(source, target)
+        # case two - deltas.
+        source.add_lines('text-b', ['text-a'], split_lines(TEXT_2))
+        target.insert_data_stream(source.get_data_stream(['text-b']))
+        self.assertKnitValuesEqual(source, target)
+
     def test_insert_data_stream_records_already_present(self):
         """Insert a data stream where some records are alreday present in the
         target, and some not.  Only the new records are inserted.
@@ -1855,21 +1911,56 @@
         self.assertRaises(
             errors.KnitCorrupt, target.insert_data_stream, data_stream)
 
-    def test_insert_data_stream_incompatible_format(self):
+    def test_insert_data_stream_unknown_format(self):
         """A data stream in a different format to the target knit cannot be
         inserted.
 
-        It will raise KnitDataStreamIncompatible.
+        It will raise KnitDataStreamUnknown because the fallback code will fail
+        to make a knit. In future we may need KnitDataStreamIncompatible again,
+        for more exotic cases.
         """
         data_stream = ('fake-format-signature', [], lambda _: '')
         target = self.make_test_knit(name='target')
         self.assertRaises(
-            errors.KnitDataStreamIncompatible,
+            errors.KnitDataStreamUnknown,
             target.insert_data_stream, data_stream)
 
     #  * test that a stream of "already present version, then new version"
     #    inserts correctly.
 
+
+    def assertMadeStreamKnit(self, source_knit, versions, target_knit):
+        """Assert that a knit made from a stream is as expected."""
+        a_stream = source_knit.get_data_stream(versions)
+        expected_data = a_stream[2](None)
+        a_stream = source_knit.get_data_stream(versions)
+        a_knit = target_knit._knit_from_datastream(a_stream)
+        self.assertEqual(source_knit.factory.__class__,
+            a_knit.factory.__class__)
+        self.assertIsInstance(a_knit._data._access, _StreamAccess)
+        self.assertIsInstance(a_knit._index, _StreamIndex)
+        self.assertEqual(a_knit._index.data_list, a_stream[1])
+        self.assertEqual(a_knit._data._access.data, expected_data)
+        self.assertEqual(a_knit.filename, target_knit.filename)
+        self.assertEqual(a_knit.transport, target_knit.transport)
+
+    def test__knit_from_data_stream_empty(self):
+        """Create a knit object from a datastream."""
+        annotated = self.make_test_knit(name='source', annotate=True)
+        plain = self.make_test_knit(name='target', annotate=False)
+        # case 1: annotated source
+        self.assertMadeStreamKnit(annotated, [], annotated)
+        self.assertMadeStreamKnit(annotated, [], plain)
+        # case 2: plain source
+        self.assertMadeStreamKnit(plain, [], annotated)
+        self.assertMadeStreamKnit(plain, [], plain)
+
+    def test__knit_from_data_stream_unknown_format(self):
+        annotated = self.make_test_knit(name='source', annotate=True)
+        self.assertRaises(errors.KnitDataStreamUnknown,
+            annotated._knit_from_datastream, ("unknown", None, None))
+
+
 TEXT_1 = """\
 Banana cup cakes:
 
@@ -2719,3 +2810,154 @@
         # will fail and we'll adjust it to handle that case correctly, rather
         # than allowing an over-read that is bogus.
         self.assertEqual(expected_length, len(stream[2](-1)))
+
+
+class Test_StreamIndex(KnitTests):
+
+    def get_index(self, knit, stream):
+        """Get a _StreamIndex from knit and stream."""
+        return knit._knit_from_datastream(stream)._index
+
+    def assertIndexVersions(self, knit, versions):
+        """Check that the _StreamIndex versions are those of the stream."""
+        index = self.get_index(knit, knit.get_data_stream(versions))
+        self.assertEqual(set(index.get_versions()), set(versions))
+        # check we didn't get duplicates
+        self.assertEqual(len(index.get_versions()), len(versions))
+
+    def assertIndexAncestry(self, knit, ancestry_versions, versions, result):
+        """Check the result of a get_ancestry call on knit."""
+        index = self.get_index(knit, knit.get_data_stream(versions))
+        self.assertEqual(result, index.get_ancestry(ancestry_versions, False))
+
+    def assertIterParents(self, knit, versions, result):
+        """Check the result of an iter_parents call on knit."""
+        index = self.get_index(knit, knit.get_data_stream(versions))
+        self.assertEqual(result, index.iter_parents(versions))
+
+    def assertGetOptions(self, knit, version, options):
+        index = self.get_index(knit, knit.get_data_stream(version))
+        self.assertEqual(options, index.get_options(version))
+
+    def assertGetPosition(self, knit, versions, version, result):
+        index = self.get_index(knit, knit.get_data_stream(versions))
+        self.assertEqual(result, index.get_position(version))
+
+    def assertGetParentsWithGhosts(self, knit, versions, version, parents):
+        index = self.get_index(knit, knit.get_data_stream(versions))
+        self.assertEqual(parents, index.get_parents_with_ghosts(version))
+
+    def make_knit_with_4_versions_2_dags(self):
+        knit = self.make_test_knit()
+        knit.add_lines('a', [], ["foo"])
+        knit.add_lines('b', [], [])
+        knit.add_lines('c', ['b', 'a'], [])
+        knit.add_lines_with_ghosts('d', ['e', 'f'], [])
+        return knit
+
+    def test_versions(self):
+        """The versions of a StreamIndex are those of the datastream."""
+        knit = self.make_knit_with_4_versions_2_dags()
+        # ask for most permutations, which catches bugs like falling back to the
+        # target knit, or showing ghosts, etc.
+        self.assertIndexVersions(knit, [])
+        self.assertIndexVersions(knit, ['a'])
+        self.assertIndexVersions(knit, ['b'])
+        self.assertIndexVersions(knit, ['c'])
+        self.assertIndexVersions(knit, ['d'])
+        self.assertIndexVersions(knit, ['a', 'b'])
+        self.assertIndexVersions(knit, ['b', 'c'])
+        self.assertIndexVersions(knit, ['a', 'c'])
+        self.assertIndexVersions(knit, ['a', 'b', 'c'])
+        self.assertIndexVersions(knit, ['a', 'b', 'c', 'd'])
+
+    def test_construct(self):
+        """Constructing a StreamIndex generates index data."""
+        data_list = [('text-a', ['fulltext'], 127, []),
+            ('text-b', ['option'], 128, ['text-c'])]
+        index = _StreamIndex(data_list)
+        self.assertEqual({'text-a':(['fulltext'], (0, 127), []),
+            'text-b':(['option'], (127, 127 + 128), ['text-c'])},
+            index._by_version)
+
+    def test_get_ancestry(self):
+        knit = self.make_knit_with_4_versions_2_dags()
+        self.assertIndexAncestry(knit, ['a'], ['a'], ['a'])
+        self.assertIndexAncestry(knit, ['b'], ['b'], ['b'])
+        self.assertIndexAncestry(knit, ['c'], ['c'], ['c'])
+        self.assertIndexAncestry(knit, ['c'], ['a', 'b', 'c'],
+            {'a':[], 'b':[], 'c':[]}.keys())
+        self.assertIndexAncestry(knit, ['c', 'd'], ['a', 'b', 'c', 'd'],
+            {'a':[], 'b':[], 'c':[], 'd':[]}.keys())
+
+    def test_iter_parents(self):
+        knit = self.make_knit_with_4_versions_2_dags()
+        self.assertIterParents(knit, ['a'], [('a', [])])
+        self.assertIterParents(knit, ['a', 'b'], [('a', []), ('b', [])])
+        self.assertIterParents(knit, ['a', 'b', 'c'],
+            [('a', []), ('b', []), ('c', ['b', 'a'])])
+        self.assertIterParents(knit, ['a', 'b', 'c', 'd'],
+            [('a', []), ('b', []), ('c', ['b', 'a']), ('d', ['e', 'f'])])
+
+    def test_get_options(self):
+        knit = self.make_knit_with_4_versions_2_dags()
+        self.assertGetOptions(knit, 'a', ['no-eol', 'fulltext'])
+        self.assertGetOptions(knit, 'c', ['line-delta'])
+
+    def test_get_parents_with_ghosts(self):
+        knit = self.make_knit_with_4_versions_2_dags()
+        self.assertGetParentsWithGhosts(knit, ['a'], 'a', [])
+        self.assertGetParentsWithGhosts(knit, ['c'], 'c', ['b', 'a'])
+
+    def test_get_position(self):
+        knit = self.make_knit_with_4_versions_2_dags()
+        # get_position returns (index(can be None), start, end) for
+        # _StreamAccess to use.
+        self.assertGetPosition(knit, ['a'], 'a', (None, 0, 78))
+        self.assertGetPosition(knit, ['a', 'c'], 'c', (None, 78, 156))
+
+
+class Test_StreamIndex(KnitTests):
+
+    def get_index_access(self, knit, stream):
+        """Get a _StreamAccess from knit and stream."""
+        knit =  knit._knit_from_datastream(stream)
+        return knit._index, knit._data._access
+
+    def assertGetRawRecords(self, knit, versions):
+        index, access = self.get_index_access(knit,
+            knit.get_data_stream(versions))
+        # check that every version asked for can be obtained from the resulting
+        # access object.
+        # batch
+        memos = []
+        for version in versions:
+            memos.append(knit._index.get_position(version))
+        original = {}
+        for version, data in zip(
+            versions, knit._data._access.get_raw_records(memos)):
+            original[version] = data
+        memos = []
+        for version in versions:
+            memos.append(index.get_position(version))
+        streamed = {}
+        for version, data in zip(versions, access.get_raw_records(memos)):
+            streamed[version] = data
+        self.assertEqual(original, streamed)
+        # individually
+        for version in versions:
+            data = list(access.get_raw_records(
+                [index.get_position(version)]))[0]
+            self.assertEqual(original[version], data)
+
+    def make_knit_with_two_versions(self):
+        knit = self.make_test_knit()
+        knit.add_lines('a', [], ["foo"])
+        knit.add_lines('b', [], ["bar"])
+        return knit
+
+    def test_get_raw_records(self):
+        knit = self.make_knit_with_two_versions()
+        self.assertGetRawRecords(knit, ['a', 'b'])
+        self.assertGetRawRecords(knit, ['a'])
+        self.assertGetRawRecords(knit, ['b'])



More information about the bazaar-commits mailing list