Rev 3353: Functional get_record_stream interface tests covering full interface. in http://people.ubuntu.com/~robertc/baz2.0/versioned_files

Robert Collins robertc at robertcollins.net
Wed Apr 16 05:07:48 BST 2008


At http://people.ubuntu.com/~robertc/baz2.0/versioned_files

------------------------------------------------------------
revno: 3353
revision-id: robertc at robertcollins.net-20080416040740-g3cjhoaez95b7gsr
parent: robertc at robertcollins.net-20080411015017-0vrwo9xe9td0wvl2
committer: Robert Collins <robertc at robertcollins.net>
branch nick: data_stream_revamp
timestamp: Wed 2008-04-16 14:07:40 +1000
message:
  Functional get_record_stream interface tests covering full interface.
modified:
  bzrlib/errors.py               errors.py-20050309040759-20512168c4e14fbd
  bzrlib/knit.py                 knit.py-20051212171256-f056ac8f0fbe1bd9
  bzrlib/tests/test_errors.py    test_errors.py-20060210110251-41aba2deddf936a8
  bzrlib/tests/test_knit.py      test_knit.py-20051212171302-95d4c00dd5f11f2b
  bzrlib/tests/test_versionedfile.py test_versionedfile.py-20060222045249-db45c9ed14a1c2e5
  bzrlib/versionedfile.py        versionedfile.py-20060222045106-5039c71ee3b65490
  bzrlib/weave.py                knit.py-20050627021749-759c29984154256b
  doc/developers/repository-stream.txt repositorystream.txt-20080410222511-nh6b9bvscvcerh48-1
=== modified file 'bzrlib/errors.py'
--- a/bzrlib/errors.py	2008-04-07 10:34:57 +0000
+++ b/bzrlib/errors.py	2008-04-16 04:07:40 +0000
@@ -562,6 +562,18 @@
         PathError.__init__(self, base, reason)
 
 
+class UnavailableRepresentation(InternalBzrError):
+
+    _fmt = ("The encoding '%(wanted)s' is not available for key %(key)s which "
+        "is encoded as '%(native)s'.")
+
+    def __init__(self, key, wanted, native):
+        InternalBzrError.__init__(self)
+        self.wanted = wanted
+        self.native = native
+        self.key = key
+
+
 class UnknownHook(BzrError):
 
     _fmt = "The %(type)s hook '%(hook)s' is unknown in this version of bzrlib."

=== modified file 'bzrlib/knit.py'
--- a/bzrlib/knit.py	2008-04-09 23:35:55 +0000
+++ b/bzrlib/knit.py	2008-04-16 04:07:40 +0000
@@ -117,7 +117,11 @@
 from bzrlib.tsort import topo_sort
 from bzrlib.tuned_gzip import GzipFile, bytes_to_gzip
 import bzrlib.ui
-from bzrlib.versionedfile import VersionedFile, InterVersionedFile
+from bzrlib.versionedfile import (
+    ContentFactory,
+    InterVersionedFile,
+    VersionedFile,
+    )
 import bzrlib.weave
 
 
@@ -138,6 +142,51 @@
 INDEX_SUFFIX = '.kndx'
 
 
+class KnitContentFactory(ContentFactory):
+    """Content factory for streaming from knits.
+    
+    :seealso ContentFactory:
+    """
+
+    def __init__(self, version, parents, build_details, sha1, raw_record,
+        annotated, knit=None):
+        """Create a KnitContentFactory for version.
+        
+        :param version: The version.
+        :param parents: The parents.
+        :param build_details: The build details as returned from
+            get_build_details.
+        :param sha1: The sha1 expected from the full text of this object.
+        :param raw_record: The bytes of the knit data from disk.
+        :param annotated: True if the raw data is annotated.
+        """
+        ContentFactory.__init__(self)
+        self.sha1 = sha1
+        self.key = (version,)
+        self.parents = tuple((parent,) for parent in parents)
+        if build_details[0] == 'line-delta':
+            kind = 'delta'
+        else:
+            kind = 'ft'
+        if annotated:
+            annotated_kind = 'annotated-'
+        else:
+            annotated_kind = ''
+        self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
+        self._raw_record = raw_record
+        self._build_details = build_details
+        self._knit = knit
+
+    def get_bytes_as(self, storage_kind):
+        if storage_kind == self.storage_kind:
+            return self._raw_record
+        if storage_kind == 'fulltext' and self._knit is not None:
+            return self._knit.get_text(self.key[0])
+        else:
+            raise errors.UnavailableRepresentation(self.key, storage_kind,
+                self.storage_kind)
+
+
 class KnitContent(object):
     """Content of a knit version to which deltas can be applied."""
 
@@ -699,7 +748,7 @@
         # know the length of all the data a-priori.
         raw_datum = []
         result_version_list = []
-        for (version_id, raw_data), \
+        for (version_id, raw_data, _), \
             (version_id2, options, _, parents) in \
             izip(self._data.read_records_iter_raw(copy_queue_records),
                  temp_version_list):
@@ -717,6 +766,43 @@
                 return pseudo_file.read(length)
         return (self.get_format_signature(), result_version_list, read)
 
+    def get_record_stream(self, versions, ordering, include_delta_closure):
+        """Get a stream of records for versions.
+
+        :param versions: The versions to include. Each version is a tuple
+            (version,).
+        :param ordering: Either 'unordered' or 'topological'. A topologically
+            sorted stream has compression parents strictly before their
+            children.
+        :param include_delta_closure: If True then the closure across any
+            compression parents will be included (in the opaque data).
+        :return: An iterator of ContentFactory objects, each of which is only
+            valid until the iterator is advanced.
+        """
+        if include_delta_closure:
+            # Nb: what we should do is plan the data to stream to allow
+            # reconstruction of all the texts without excessive buffering,
+            # including re-sending common bases as needed. This makes the most
+            # sense when we start serialising these streams though, so for now
+            # we just fallback to individual text construction behind the
+            # abstraction barrier.
+            knit = self
+        else:
+            knit = None
+        # Double index lookups here : need a unified api ?
+        parent_map = self.get_parent_map(versions)
+        position_map = self._get_components_positions(versions)
+        if ordering == 'topological':
+            versions = topo_sort(parent_map)
+        # c = component_id, r = record_details, i_m = index_memo, n = next
+        records = [(version, position_map[version][1]) for version in versions]
+        record_map = {}
+        for version, raw_data, sha1 in \
+                self._data.read_records_iter_raw(records):
+            (record_details, index_memo, _) = position_map[version]
+            yield KnitContentFactory(version, parent_map[version],
+                record_details, sha1, raw_data, self.factory.annotated, knit)
+
     def _extract_blocks(self, version_id, source, target):
         if self._index.get_method(version_id) != 'line-delta':
             return None
@@ -2478,6 +2564,9 @@
 
         This unpacks enough of the text record to validate the id is
         as expected but thats all.
+
+        Each item the iterator yields is (version_id, bytes,
+        sha1_of_full_text).
         """
         # setup an iterator of the external records:
         # uses readv so nice and fast we hope.
@@ -2492,7 +2581,7 @@
             # validate the header
             df, rec = self._parse_record_header(version_id, data)
             df.close()
-            yield version_id, data
+            yield version_id, data, rec[3]
 
     def read_records_iter(self, records):
         """Read text records from data file and yield result.
@@ -2657,7 +2746,7 @@
             total = len(version_list)
             raw_datum = []
             raw_records = []
-            for (version_id, raw_data), \
+            for (version_id, raw_data, _), \
                 (version_id2, options, parents) in \
                 izip(self.source._data.read_records_iter_raw(copy_queue_records),
                      copy_queue):

=== modified file 'bzrlib/tests/test_errors.py'
--- a/bzrlib/tests/test_errors.py	2008-04-04 05:59:43 +0000
+++ b/bzrlib/tests/test_errors.py	2008-04-16 04:07:40 +0000
@@ -205,6 +205,12 @@
             "the currently open request.",
             str(error))
 
+    def test_unavailable_representation(self):
+        error = errors.UnavailableRepresentation(('key',), "mpdiff", "fulltext")
+        self.assertEqualDiff("The encoding 'mpdiff' is not available for key "
+            "('key',) which is encoded as 'fulltext'.",
+            str(error))
+
     def test_unknown_hook(self):
         error = errors.UnknownHook("branch", "foo")
         self.assertEqualDiff("The branch hook 'foo' is unknown in this version"

=== modified file 'bzrlib/tests/test_knit.py'
--- a/bzrlib/tests/test_knit.py	2008-04-09 23:35:55 +0000
+++ b/bzrlib/tests/test_knit.py	2008-04-16 04:07:40 +0000
@@ -370,7 +370,7 @@
         self.assertEqual({'rev-id-1':(['foo\n', 'bar\n'], sha1sum)}, contents)
 
         raw_contents = list(data.read_records_iter_raw(records))
-        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
+        self.assertEqual([('rev-id-1', gz_txt, sha1sum)], raw_contents)
 
     def test_not_enough_lines(self):
         sha1sum = sha.new('foo\n').hexdigest()
@@ -387,7 +387,7 @@
 
         # read_records_iter_raw won't detect that sort of mismatch/corruption
         raw_contents = list(data.read_records_iter_raw(records))
-        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
+        self.assertEqual([('rev-id-1', gz_txt, sha1sum)], raw_contents)
 
     def test_too_many_lines(self):
         sha1sum = sha.new('foo\nbar\n').hexdigest()
@@ -405,7 +405,7 @@
 
         # read_records_iter_raw won't detect that sort of mismatch/corruption
         raw_contents = list(data.read_records_iter_raw(records))
-        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
+        self.assertEqual([('rev-id-1', gz_txt, sha1sum)], raw_contents)
 
     def test_mismatched_version_id(self):
         sha1sum = sha.new('foo\nbar\n').hexdigest()
@@ -1056,7 +1056,7 @@
         """
         index_memo = knit._index.get_position(version_id)
         record = (version_id, index_memo)
-        [(_, expected_content)] = list(knit._data.read_records_iter_raw([record]))
+        [(_, expected_content, _)] = list(knit._data.read_records_iter_raw([record]))
         self.assertEqual(expected_content, candidate_content)
 
 

=== modified file 'bzrlib/tests/test_versionedfile.py'
--- a/bzrlib/tests/test_versionedfile.py	2008-04-08 21:41:15 +0000
+++ b/bzrlib/tests/test_versionedfile.py	2008-04-16 04:07:40 +0000
@@ -87,6 +87,114 @@
         f = self.reopen_file(create=True)
         verify_file(f)
 
+    def test_get_record_stream_empty(self):
+        """get_record_stream is a replacement for get_data_stream."""
+        f = self.get_file()
+        entries = f.get_record_stream([], 'unordered', False)
+        self.assertEqual([], list(entries))
+
+    def get_diamond_vf(self):
+        """Get a diamond graph to exercise deltas and merges."""
+        f = self.get_file()
+        parents = {
+            'origin': (),
+            'base': (('origin',),),
+            'left': (('base',),),
+            'right': (('base',),),
+            'merged': (('left',), ('right',)),
+            }
+        # insert a diamond graph to exercise deltas and merges.
+        f.add_lines('origin', [], [])
+        f.add_lines('base', ['origin'], ['base\n'])
+        f.add_lines('left', ['base'], ['base\n', 'left\n'])
+        f.add_lines('right', ['base'],
+            ['base\n', 'right\n'])
+        f.add_lines('merged', ['left', 'right'],
+            ['base\n', 'left\n', 'right\n', 'merged\n'])
+        return f, parents
+
+    def assertValidStorageKind(self, storage_kind):
+        """Assert that storage_kind is a valid storage_kind."""
+        self.assertSubset([storage_kind],
+            ['mpdiff', 'knit-annotated-ft', 'knit-annotated-delta',
+             'knit-ft', 'knit-delta', 'fulltext', 'knit-annotated-ft-gz',
+             'knit-annotated-delta-gz', 'knit-ft-gz', 'knit-delta-gz'])
+
+    def capture_stream(self, f, entries, on_seen, parents):
+        """Capture a stream for testing."""
+        for factory in entries:
+            on_seen(factory.key)
+            self.assertValidStorageKind(factory.storage_kind)
+            self.assertEqual(f.get_sha1s([factory.key[0]])[0], factory.sha1)
+            self.assertEqual(parents[factory.key[0]], factory.parents)
+            self.assertIsInstance(factory.get_bytes_as(factory.storage_kind),
+                str)
+
+    def test_get_record_stream_interface(self):
+        """each item in a stream has to provide a regular interface."""
+        f, parents = self.get_diamond_vf()
+        entries = f.get_record_stream(['merged', 'left', 'right', 'base'],
+            'unordered', False)
+        seen = set()
+        self.capture_stream(f, entries, seen.add, parents)
+        self.assertEqual(set([('base',), ('left',), ('right',), ('merged',)]),
+            seen)
+
+    def test_get_record_stream_interface_ordered(self):
+        """each item in a stream has to provide a regular interface."""
+        f, parents = self.get_diamond_vf()
+        entries = f.get_record_stream(['merged', 'left', 'right', 'base'],
+            'topological', False)
+        seen = []
+        self.capture_stream(f, entries, seen.append, parents)
+        self.assertSubset([tuple(seen)],
+            (
+             (('base',), ('left',), ('right',), ('merged',)),
+             (('base',), ('right',), ('left',), ('merged',)),
+            ))
+
+    def test_get_record_stream_interface_ordered_with_delta_closure(self):
+        """each item in a stream has to provide a regular interface."""
+        f, parents = self.get_diamond_vf()
+        entries = f.get_record_stream(['merged', 'left', 'right', 'base'],
+            'topological', True)
+        seen = []
+        for factory in entries:
+            seen.append(factory.key)
+            self.assertValidStorageKind(factory.storage_kind)
+            self.assertEqual(f.get_sha1s([factory.key[0]])[0], factory.sha1)
+            self.assertEqual(parents[factory.key[0]], factory.parents)
+            self.assertEqual(f.get_text(factory.key[0]),
+                factory.get_bytes_as('fulltext'))
+            self.assertIsInstance(factory.get_bytes_as(factory.storage_kind),
+                str)
+        self.assertSubset([tuple(seen)],
+            (
+             (('base',), ('left',), ('right',), ('merged',)),
+             (('base',), ('right',), ('left',), ('merged',)),
+            ))
+
+    def test_get_record_stream_unknown_storage_kind_raises(self):
+        """Asking for a storage kind that the stream cannot supply raises."""
+        f, parents = self.get_diamond_vf()
+        entries = f.get_record_stream(['merged', 'left', 'right', 'base'],
+            'unordered', False)
+        # We track the contents because we should be able to try, fail a
+        # particular kind and then ask for one that works and continue.
+        seen = set()
+        for factory in entries:
+            seen.add(factory.key)
+            self.assertValidStorageKind(factory.storage_kind)
+            self.assertEqual(f.get_sha1s([factory.key[0]])[0], factory.sha1)
+            self.assertEqual(parents[factory.key[0]], factory.parents)
+            # currently no stream emits mpdiff
+            self.assertRaises(errors.UnavailableRepresentation,
+                factory.get_bytes_as, 'mpdiff')
+            self.assertIsInstance(factory.get_bytes_as(factory.storage_kind),
+                str)
+        self.assertEqual(set([('base',), ('left',), ('right',), ('merged',)]),
+            seen)
+
     def test_adds_with_parent_texts(self):
         f = self.get_file()
         parent_texts = {}

=== modified file 'bzrlib/versionedfile.py'
--- a/bzrlib/versionedfile.py	2008-04-09 00:34:54 +0000
+++ b/bzrlib/versionedfile.py	2008-04-16 04:07:40 +0000
@@ -41,6 +41,29 @@
 from bzrlib.textmerge import TextMerge
 
 
+class ContentFactory(object):
+    """Abstract interface for insertion and retrieval from a VersionedFile.
+    
+    :ivar sha1: None, or the sha1 of the content fulltext.
+    :ivar storage_kind: The native storage kind of this factory. One of
+        'mpdiff', 'knit-annotated-ft', 'knit-annotated-delta', 'knit-ft',
+        'knit-delta', 'fulltext', 'knit-annotated-ft-gz',
+        'knit-annotated-delta-gz', 'knit-ft-gz', 'knit-delta-gz'.
+    :ivar key: The key of this content. Each key is a tuple with a single
+        string in it.
+    :ivar parents: A tuple of parent keys for self.key. If the object has
+        no parent information, None (as opposed to () for an empty list of
+        parents).
+        """
+
+    def __init__(self):
+        """Create a ContentFactory."""
+        self.sha1 = None
+        self.storage_kind = None
+        self.key = None
+        self.parents = None
+
+
 class VersionedFile(object):
     """Versioned text file storage.
     
@@ -63,9 +86,20 @@
         """Copy this versioned file to name on transport."""
         raise NotImplementedError(self.copy_to)
 
-    def versions(self):
-        """Return a unsorted list of versions."""
-        raise NotImplementedError(self.versions)
+    def get_record_stream(self, versions, ordering, include_delta_closure):
+        """Get a stream of records for versions.
+
+        :param versions: The versions to include. Each version is a tuple
+            (version,).
+        :param ordering: Either 'unordered' or 'topological'. A topologically
+            sorted stream has compression parents strictly before their
+            children.
+        :param include_delta_closure: If True then the closure across any
+            compression parents will be included (in the opaque data).
+        :return: An iterator of ContentFactory objects, each of which is only
+            valid until the iterator is advanced.
+        """
+        raise NotImplementedError(self.get_record_stream)
 
     @deprecated_method(one_four)
     def has_ghost(self, version_id):

=== modified file 'bzrlib/weave.py'
--- a/bzrlib/weave.py	2008-04-09 00:34:54 +0000
+++ b/bzrlib/weave.py	2008-04-16 04:07:40 +0000
@@ -82,6 +82,7 @@
 from bzrlib.errors import (WeaveError, WeaveFormatError, WeaveParentMismatch,
         RevisionAlreadyPresent,
         RevisionNotPresent,
+        UnavailableRepresentation,
         WeaveRevisionAlreadyPresent,
         WeaveRevisionNotPresent,
         )
@@ -89,10 +90,37 @@
 from bzrlib.osutils import sha_strings
 import bzrlib.patiencediff
 from bzrlib.tsort import topo_sort
-from bzrlib.versionedfile import VersionedFile, InterVersionedFile
+from bzrlib.versionedfile import (
+    ContentFactory,
+    InterVersionedFile,
+    VersionedFile,
+    )
 from bzrlib.weavefile import _read_weave_v5, write_weave_v5
 
 
+class WeaveContentFactory(ContentFactory):
+    """Content factory for streaming from weaves.
+
+    :seealso ContentFactory:
+    """
+
+    def __init__(self, version, weave):
+        """Create a WeaveContentFactory for version from weave."""
+        ContentFactory.__init__(self)
+        self.sha1 = weave.get_sha1s([version])[0]
+        self.key = (version,)
+        parents = weave.get_parent_map([version])[version]
+        self.parents = tuple((parent,) for parent in parents)
+        self.storage_kind = 'fulltext'
+        self._weave = weave
+
+    def get_bytes_as(self, storage_kind):
+        if storage_kind == 'fulltext':
+            return self._weave.get_text(self.key[0])
+        else:
+            raise UnavailableRepresentation(self.key, storage_kind, 'fulltext')
+
+
 class Weave(VersionedFile):
     """weave - versioned text file storage.
     
@@ -263,6 +291,25 @@
 
     __contains__ = has_version
 
+    def get_record_stream(self, versions, ordering, include_delta_closure):
+        """Get a stream of records for versions.
+
+        :param versions: The versions to include. Each version is a tuple
+            (version,).
+        :param ordering: Either 'unordered' or 'topological'. A topologically
+            sorted stream has compression parents strictly before their
+            children.
+        :param include_delta_closure: If True then the closure across any
+            compression parents will be included (in the opaque data).
+        :return: An iterator of ContentFactory objects, each of which is only
+            valid until the iterator is advanced.
+        """
+        if ordering == 'topological':
+            parents = self.get_parent_map(versions)
+            versions = topo_sort(parents)
+        for version in versions:
+            yield WeaveContentFactory(version, self)
+
     def get_parent_map(self, version_ids):
         """See VersionedFile.get_parent_map."""
         result = {}

=== modified file 'doc/developers/repository-stream.txt'
--- a/doc/developers/repository-stream.txt	2008-04-11 01:50:17 +0000
+++ b/doc/developers/repository-stream.txt	2008-04-16 04:07:40 +0000
@@ -155,30 +155,34 @@
 Each record has two attributes. One is ``key_prefix`` which is a tuple key
 prefix for the names of each of the bytestrings in the record. The other
 attribute is ``entries``, an iterator of the individual items in the
-record. Each item that the iterator yields is a two-tuple with a meta-data
-dict and the compressed bytestring data.
+record. Each item that the iterator yields is a factory which has metadata
+about the entry and the ability to return the compressed bytes. This
+factory can be decorated to allow obtaining different representations (for
+example from a compressed knit fulltext to a plain fulltext). 
 
 In pseudocode::
 
   stream = repository.get_repository_stream(search, UNORDERED, False)
   for record in stream.iter_contents():
-      for metadata, bytes in record.entries:
+      for factory in record.entries:
+          compression = factory.storage_kind
           print "Object %s, compression type %s, %d bytes long." % (
-              record.key_prefix + metadata['key'],
-              metadata['storage_kind'], len(bytes))
+              record.key_prefix + factory.key,
+              compression, len(factory.get_bytes_as(compression)))
 
 This structure should allow stream adapters to be written which can coerce
 all records to the type of compression that a particular client needs. For
-instance, inserting into weaves requires fulltexts, so an adapter that
-applies knit records and extracts them to fulltexts will avoid weaves
-needing to know about all potential storage kinds. Likewise, inserting
-into knits would use an adapter that gives everything as either matching
-knit records or full texts.
-
-bytestring metadata
-~~~~~~~~~~~~~~~~~~~
-
-Valid keys in the metadata dict are:
+instance, inserting into weaves requires fulltexts, so a stream would be
+adapted for weaves by an adapter that takes a stream, and the target
+weave, and then uses the target weave to reconstruct full texts (which is
+all that the weave inserter would ask for). In a similar approach, a
+stream could internally delta compress many fulltexts and be able to
+answer both fulltext and compressed record requests without extra IO.
+
+factory metadata
+~~~~~~~~~~~~~~~~
+
+Valid attributes on the factory are:
  * sha1: Optional ascii representation of the sha1 of the bytestring (after
    delta reconstruction).
  * storage_kind: Required kind of storage compression that has been used
@@ -190,6 +194,7 @@
  * key: The key for this bytestring. Like each parent this is a tuple that
    should have the key_prefix prepended to it to give the unified
    repository key name.
+
 ..
    vim: ft=rst tw=74 ai
 




More information about the bazaar-commits mailing list