Rev 3353: Functional get_record_stream interface tests covering full interface. in http://people.ubuntu.com/~robertc/baz2.0/versioned_files
Robert Collins
robertc at robertcollins.net
Wed Apr 16 05:07:48 BST 2008
At http://people.ubuntu.com/~robertc/baz2.0/versioned_files
------------------------------------------------------------
revno: 3353
revision-id: robertc at robertcollins.net-20080416040740-g3cjhoaez95b7gsr
parent: robertc at robertcollins.net-20080411015017-0vrwo9xe9td0wvl2
committer: Robert Collins <robertc at robertcollins.net>
branch nick: data_stream_revamp
timestamp: Wed 2008-04-16 14:07:40 +1000
message:
Functional get_record_stream interface tests covering full interface.
modified:
bzrlib/errors.py errors.py-20050309040759-20512168c4e14fbd
bzrlib/knit.py knit.py-20051212171256-f056ac8f0fbe1bd9
bzrlib/tests/test_errors.py test_errors.py-20060210110251-41aba2deddf936a8
bzrlib/tests/test_knit.py test_knit.py-20051212171302-95d4c00dd5f11f2b
bzrlib/tests/test_versionedfile.py test_versionedfile.py-20060222045249-db45c9ed14a1c2e5
bzrlib/versionedfile.py versionedfile.py-20060222045106-5039c71ee3b65490
bzrlib/weave.py knit.py-20050627021749-759c29984154256b
doc/developers/repository-stream.txt repositorystream.txt-20080410222511-nh6b9bvscvcerh48-1
=== modified file 'bzrlib/errors.py'
--- a/bzrlib/errors.py 2008-04-07 10:34:57 +0000
+++ b/bzrlib/errors.py 2008-04-16 04:07:40 +0000
@@ -562,6 +562,18 @@
PathError.__init__(self, base, reason)
+class UnavailableRepresentation(InternalBzrError):
+
+ _fmt = ("The encoding '%(wanted)s' is not available for key %(key)s which "
+ "is encoded as '%(native)s'.")
+
+ def __init__(self, key, wanted, native):
+ InternalBzrError.__init__(self)
+ self.wanted = wanted
+ self.native = native
+ self.key = key
+
+
class UnknownHook(BzrError):
_fmt = "The %(type)s hook '%(hook)s' is unknown in this version of bzrlib."
=== modified file 'bzrlib/knit.py'
--- a/bzrlib/knit.py 2008-04-09 23:35:55 +0000
+++ b/bzrlib/knit.py 2008-04-16 04:07:40 +0000
@@ -117,7 +117,11 @@
from bzrlib.tsort import topo_sort
from bzrlib.tuned_gzip import GzipFile, bytes_to_gzip
import bzrlib.ui
-from bzrlib.versionedfile import VersionedFile, InterVersionedFile
+from bzrlib.versionedfile import (
+ ContentFactory,
+ InterVersionedFile,
+ VersionedFile,
+ )
import bzrlib.weave
@@ -138,6 +142,51 @@
INDEX_SUFFIX = '.kndx'
+class KnitContentFactory(ContentFactory):
+ """Content factory for streaming from knits.
+
+ :seealso ContentFactory:
+ """
+
+ def __init__(self, version, parents, build_details, sha1, raw_record,
+ annotated, knit=None):
+ """Create a KnitContentFactory for version.
+
+ :param version: The version.
+ :param parents: The parents.
+ :param build_details: The build details as returned from
+ get_build_details.
+ :param sha1: The sha1 expected from the full text of this object.
+ :param raw_record: The bytes of the knit data from disk.
+ :param annotated: True if the raw data is annotated.
+ """
+ ContentFactory.__init__(self)
+ self.sha1 = sha1
+ self.key = (version,)
+ self.parents = tuple((parent,) for parent in parents)
+ if build_details[0] == 'line-delta':
+ kind = 'delta'
+ else:
+ kind = 'ft'
+ if annotated:
+ annotated_kind = 'annotated-'
+ else:
+ annotated_kind = ''
+ self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
+ self._raw_record = raw_record
+ self._build_details = build_details
+ self._knit = knit
+
+ def get_bytes_as(self, storage_kind):
+ if storage_kind == self.storage_kind:
+ return self._raw_record
+ if storage_kind == 'fulltext' and self._knit is not None:
+ return self._knit.get_text(self.key[0])
+ else:
+ raise errors.UnavailableRepresentation(self.key, storage_kind,
+ self.storage_kind)
+
+
class KnitContent(object):
"""Content of a knit version to which deltas can be applied."""
@@ -699,7 +748,7 @@
# know the length of all the data a-priori.
raw_datum = []
result_version_list = []
- for (version_id, raw_data), \
+ for (version_id, raw_data, _), \
(version_id2, options, _, parents) in \
izip(self._data.read_records_iter_raw(copy_queue_records),
temp_version_list):
@@ -717,6 +766,43 @@
return pseudo_file.read(length)
return (self.get_format_signature(), result_version_list, read)
+ def get_record_stream(self, versions, ordering, include_delta_closure):
+ """Get a stream of records for versions.
+
+ :param versions: The versions to include. Each version is a tuple
+ (version,).
+ :param ordering: Either 'unordered' or 'topological'. A topologically
+ sorted stream has compression parents strictly before their
+ children.
+ :param include_delta_closure: If True then the closure across any
+ compression parents will be included (in the opaque data).
+ :return: An iterator of ContentFactory objects, each of which is only
+ valid until the iterator is advanced.
+ """
+ if include_delta_closure:
+ # Nb: what we should do is plan the data to stream to allow
+ # reconstruction of all the texts without excessive buffering,
+ # including re-sending common bases as needed. This makes the most
+ # sense when we start serialising these streams though, so for now
+ # we just fallback to individual text construction behind the
+ # abstraction barrier.
+ knit = self
+ else:
+ knit = None
+ # Double index lookups here : need a unified api ?
+ parent_map = self.get_parent_map(versions)
+ position_map = self._get_components_positions(versions)
+ if ordering == 'topological':
+ versions = topo_sort(parent_map)
+ # c = component_id, r = record_details, i_m = index_memo, n = next
+ records = [(version, position_map[version][1]) for version in versions]
+ record_map = {}
+ for version, raw_data, sha1 in \
+ self._data.read_records_iter_raw(records):
+ (record_details, index_memo, _) = position_map[version]
+ yield KnitContentFactory(version, parent_map[version],
+ record_details, sha1, raw_data, self.factory.annotated, knit)
+
def _extract_blocks(self, version_id, source, target):
if self._index.get_method(version_id) != 'line-delta':
return None
@@ -2478,6 +2564,9 @@
This unpacks enough of the text record to validate the id is
as expected but thats all.
+
+ Each item the iterator yields is (version_id, bytes,
+ sha1_of_full_text).
"""
# setup an iterator of the external records:
# uses readv so nice and fast we hope.
@@ -2492,7 +2581,7 @@
# validate the header
df, rec = self._parse_record_header(version_id, data)
df.close()
- yield version_id, data
+ yield version_id, data, rec[3]
def read_records_iter(self, records):
"""Read text records from data file and yield result.
@@ -2657,7 +2746,7 @@
total = len(version_list)
raw_datum = []
raw_records = []
- for (version_id, raw_data), \
+ for (version_id, raw_data, _), \
(version_id2, options, parents) in \
izip(self.source._data.read_records_iter_raw(copy_queue_records),
copy_queue):
=== modified file 'bzrlib/tests/test_errors.py'
--- a/bzrlib/tests/test_errors.py 2008-04-04 05:59:43 +0000
+++ b/bzrlib/tests/test_errors.py 2008-04-16 04:07:40 +0000
@@ -205,6 +205,12 @@
"the currently open request.",
str(error))
+ def test_unavailable_representation(self):
+ error = errors.UnavailableRepresentation(('key',), "mpdiff", "fulltext")
+ self.assertEqualDiff("The encoding 'mpdiff' is not available for key "
+ "('key',) which is encoded as 'fulltext'.",
+ str(error))
+
def test_unknown_hook(self):
error = errors.UnknownHook("branch", "foo")
self.assertEqualDiff("The branch hook 'foo' is unknown in this version"
=== modified file 'bzrlib/tests/test_knit.py'
--- a/bzrlib/tests/test_knit.py 2008-04-09 23:35:55 +0000
+++ b/bzrlib/tests/test_knit.py 2008-04-16 04:07:40 +0000
@@ -370,7 +370,7 @@
self.assertEqual({'rev-id-1':(['foo\n', 'bar\n'], sha1sum)}, contents)
raw_contents = list(data.read_records_iter_raw(records))
- self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
+ self.assertEqual([('rev-id-1', gz_txt, sha1sum)], raw_contents)
def test_not_enough_lines(self):
sha1sum = sha.new('foo\n').hexdigest()
@@ -387,7 +387,7 @@
# read_records_iter_raw won't detect that sort of mismatch/corruption
raw_contents = list(data.read_records_iter_raw(records))
- self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
+ self.assertEqual([('rev-id-1', gz_txt, sha1sum)], raw_contents)
def test_too_many_lines(self):
sha1sum = sha.new('foo\nbar\n').hexdigest()
@@ -405,7 +405,7 @@
# read_records_iter_raw won't detect that sort of mismatch/corruption
raw_contents = list(data.read_records_iter_raw(records))
- self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
+ self.assertEqual([('rev-id-1', gz_txt, sha1sum)], raw_contents)
def test_mismatched_version_id(self):
sha1sum = sha.new('foo\nbar\n').hexdigest()
@@ -1056,7 +1056,7 @@
"""
index_memo = knit._index.get_position(version_id)
record = (version_id, index_memo)
- [(_, expected_content)] = list(knit._data.read_records_iter_raw([record]))
+ [(_, expected_content, _)] = list(knit._data.read_records_iter_raw([record]))
self.assertEqual(expected_content, candidate_content)
=== modified file 'bzrlib/tests/test_versionedfile.py'
--- a/bzrlib/tests/test_versionedfile.py 2008-04-08 21:41:15 +0000
+++ b/bzrlib/tests/test_versionedfile.py 2008-04-16 04:07:40 +0000
@@ -87,6 +87,114 @@
f = self.reopen_file(create=True)
verify_file(f)
+ def test_get_record_stream_empty(self):
+ """get_record_stream is a replacement for get_data_stream."""
+ f = self.get_file()
+ entries = f.get_record_stream([], 'unordered', False)
+ self.assertEqual([], list(entries))
+
+ def get_diamond_vf(self):
+ """Get a diamond graph to exercise deltas and merges."""
+ f = self.get_file()
+ parents = {
+ 'origin': (),
+ 'base': (('origin',),),
+ 'left': (('base',),),
+ 'right': (('base',),),
+ 'merged': (('left',), ('right',)),
+ }
+ # insert a diamond graph to exercise deltas and merges.
+ f.add_lines('origin', [], [])
+ f.add_lines('base', ['origin'], ['base\n'])
+ f.add_lines('left', ['base'], ['base\n', 'left\n'])
+ f.add_lines('right', ['base'],
+ ['base\n', 'right\n'])
+ f.add_lines('merged', ['left', 'right'],
+ ['base\n', 'left\n', 'right\n', 'merged\n'])
+ return f, parents
+
+ def assertValidStorageKind(self, storage_kind):
+ """Assert that storage_kind is a valid storage_kind."""
+ self.assertSubset([storage_kind],
+ ['mpdiff', 'knit-annotated-ft', 'knit-annotated-delta',
+ 'knit-ft', 'knit-delta', 'fulltext', 'knit-annotated-ft-gz',
+ 'knit-annotated-delta-gz', 'knit-ft-gz', 'knit-delta-gz'])
+
+ def capture_stream(self, f, entries, on_seen, parents):
+ """Capture a stream for testing."""
+ for factory in entries:
+ on_seen(factory.key)
+ self.assertValidStorageKind(factory.storage_kind)
+ self.assertEqual(f.get_sha1s([factory.key[0]])[0], factory.sha1)
+ self.assertEqual(parents[factory.key[0]], factory.parents)
+ self.assertIsInstance(factory.get_bytes_as(factory.storage_kind),
+ str)
+
+ def test_get_record_stream_interface(self):
+ """each item in a stream has to provide a regular interface."""
+ f, parents = self.get_diamond_vf()
+ entries = f.get_record_stream(['merged', 'left', 'right', 'base'],
+ 'unordered', False)
+ seen = set()
+ self.capture_stream(f, entries, seen.add, parents)
+ self.assertEqual(set([('base',), ('left',), ('right',), ('merged',)]),
+ seen)
+
+ def test_get_record_stream_interface_ordered(self):
+ """each item in a stream has to provide a regular interface."""
+ f, parents = self.get_diamond_vf()
+ entries = f.get_record_stream(['merged', 'left', 'right', 'base'],
+ 'topological', False)
+ seen = []
+ self.capture_stream(f, entries, seen.append, parents)
+ self.assertSubset([tuple(seen)],
+ (
+ (('base',), ('left',), ('right',), ('merged',)),
+ (('base',), ('right',), ('left',), ('merged',)),
+ ))
+
+ def test_get_record_stream_interface_ordered_with_delta_closure(self):
+ """each item in a stream has to provide a regular interface."""
+ f, parents = self.get_diamond_vf()
+ entries = f.get_record_stream(['merged', 'left', 'right', 'base'],
+ 'topological', True)
+ seen = []
+ for factory in entries:
+ seen.append(factory.key)
+ self.assertValidStorageKind(factory.storage_kind)
+ self.assertEqual(f.get_sha1s([factory.key[0]])[0], factory.sha1)
+ self.assertEqual(parents[factory.key[0]], factory.parents)
+ self.assertEqual(f.get_text(factory.key[0]),
+ factory.get_bytes_as('fulltext'))
+ self.assertIsInstance(factory.get_bytes_as(factory.storage_kind),
+ str)
+ self.assertSubset([tuple(seen)],
+ (
+ (('base',), ('left',), ('right',), ('merged',)),
+ (('base',), ('right',), ('left',), ('merged',)),
+ ))
+
+ def test_get_record_stream_unknown_storage_kind_raises(self):
+ """Asking for a storage kind that the stream cannot supply raises."""
+ f, parents = self.get_diamond_vf()
+ entries = f.get_record_stream(['merged', 'left', 'right', 'base'],
+ 'unordered', False)
+ # We track the contents because we should be able to try, fail a
+ # particular kind and then ask for one that works and continue.
+ seen = set()
+ for factory in entries:
+ seen.add(factory.key)
+ self.assertValidStorageKind(factory.storage_kind)
+ self.assertEqual(f.get_sha1s([factory.key[0]])[0], factory.sha1)
+ self.assertEqual(parents[factory.key[0]], factory.parents)
+ # currently no stream emits mpdiff
+ self.assertRaises(errors.UnavailableRepresentation,
+ factory.get_bytes_as, 'mpdiff')
+ self.assertIsInstance(factory.get_bytes_as(factory.storage_kind),
+ str)
+ self.assertEqual(set([('base',), ('left',), ('right',), ('merged',)]),
+ seen)
+
def test_adds_with_parent_texts(self):
f = self.get_file()
parent_texts = {}
=== modified file 'bzrlib/versionedfile.py'
--- a/bzrlib/versionedfile.py 2008-04-09 00:34:54 +0000
+++ b/bzrlib/versionedfile.py 2008-04-16 04:07:40 +0000
@@ -41,6 +41,29 @@
from bzrlib.textmerge import TextMerge
+class ContentFactory(object):
+ """Abstract interface for insertion and retrieval from a VersionedFile.
+
+ :ivar sha1: None, or the sha1 of the content fulltext.
+ :ivar storage_kind: The native storage kind of this factory. One of
+ 'mpdiff', 'knit-annotated-ft', 'knit-annotated-delta', 'knit-ft',
+ 'knit-delta', 'fulltext', 'knit-annotated-ft-gz',
+ 'knit-annotated-delta-gz', 'knit-ft-gz', 'knit-delta-gz'.
+ :ivar key: The key of this content. Each key is a tuple with a single
+ string in it.
+ :ivar parents: A tuple of parent keys for self.key. If the object has
+ no parent information, None (as opposed to () for an empty list of
+ parents).
+ """
+
+ def __init__(self):
+ """Create a ContentFactory."""
+ self.sha1 = None
+ self.storage_kind = None
+ self.key = None
+ self.parents = None
+
+
class VersionedFile(object):
"""Versioned text file storage.
@@ -63,9 +86,20 @@
"""Copy this versioned file to name on transport."""
raise NotImplementedError(self.copy_to)
- def versions(self):
- """Return a unsorted list of versions."""
- raise NotImplementedError(self.versions)
+ def get_record_stream(self, versions, ordering, include_delta_closure):
+ """Get a stream of records for versions.
+
+ :param versions: The versions to include. Each version is a tuple
+ (version,).
+ :param ordering: Either 'unordered' or 'topological'. A topologically
+ sorted stream has compression parents strictly before their
+ children.
+ :param include_delta_closure: If True then the closure across any
+ compression parents will be included (in the opaque data).
+ :return: An iterator of ContentFactory objects, each of which is only
+ valid until the iterator is advanced.
+ """
+ raise NotImplementedError(self.get_record_stream)
@deprecated_method(one_four)
def has_ghost(self, version_id):
=== modified file 'bzrlib/weave.py'
--- a/bzrlib/weave.py 2008-04-09 00:34:54 +0000
+++ b/bzrlib/weave.py 2008-04-16 04:07:40 +0000
@@ -82,6 +82,7 @@
from bzrlib.errors import (WeaveError, WeaveFormatError, WeaveParentMismatch,
RevisionAlreadyPresent,
RevisionNotPresent,
+ UnavailableRepresentation,
WeaveRevisionAlreadyPresent,
WeaveRevisionNotPresent,
)
@@ -89,10 +90,37 @@
from bzrlib.osutils import sha_strings
import bzrlib.patiencediff
from bzrlib.tsort import topo_sort
-from bzrlib.versionedfile import VersionedFile, InterVersionedFile
+from bzrlib.versionedfile import (
+ ContentFactory,
+ InterVersionedFile,
+ VersionedFile,
+ )
from bzrlib.weavefile import _read_weave_v5, write_weave_v5
+class WeaveContentFactory(ContentFactory):
+ """Content factory for streaming from weaves.
+
+ :seealso ContentFactory:
+ """
+
+ def __init__(self, version, weave):
+ """Create a WeaveContentFactory for version from weave."""
+ ContentFactory.__init__(self)
+ self.sha1 = weave.get_sha1s([version])[0]
+ self.key = (version,)
+ parents = weave.get_parent_map([version])[version]
+ self.parents = tuple((parent,) for parent in parents)
+ self.storage_kind = 'fulltext'
+ self._weave = weave
+
+ def get_bytes_as(self, storage_kind):
+ if storage_kind == 'fulltext':
+ return self._weave.get_text(self.key[0])
+ else:
+ raise UnavailableRepresentation(self.key, storage_kind, 'fulltext')
+
+
class Weave(VersionedFile):
"""weave - versioned text file storage.
@@ -263,6 +291,25 @@
__contains__ = has_version
+ def get_record_stream(self, versions, ordering, include_delta_closure):
+ """Get a stream of records for versions.
+
+ :param versions: The versions to include. Each version is a tuple
+ (version,).
+ :param ordering: Either 'unordered' or 'topological'. A topologically
+ sorted stream has compression parents strictly before their
+ children.
+ :param include_delta_closure: If True then the closure across any
+ compression parents will be included (in the opaque data).
+ :return: An iterator of ContentFactory objects, each of which is only
+ valid until the iterator is advanced.
+ """
+ if ordering == 'topological':
+ parents = self.get_parent_map(versions)
+ versions = topo_sort(parents)
+ for version in versions:
+ yield WeaveContentFactory(version, self)
+
def get_parent_map(self, version_ids):
"""See VersionedFile.get_parent_map."""
result = {}
=== modified file 'doc/developers/repository-stream.txt'
--- a/doc/developers/repository-stream.txt 2008-04-11 01:50:17 +0000
+++ b/doc/developers/repository-stream.txt 2008-04-16 04:07:40 +0000
@@ -155,30 +155,34 @@
Each record has two attributes. One is ``key_prefix`` which is a tuple key
prefix for the names of each of the bytestrings in the record. The other
attribute is ``entries``, an iterator of the individual items in the
-record. Each item that the iterator yields is a two-tuple with a meta-data
-dict and the compressed bytestring data.
+record. Each item that the iterator yields is a factory which has metadata
+about the entry and the ability to return the compressed bytes. This
+factory can be decorated to allow obtaining different representations (for
+example from a compressed knit fulltext to a plain fulltext).
In pseudocode::
stream = repository.get_repository_stream(search, UNORDERED, False)
for record in stream.iter_contents():
- for metadata, bytes in record.entries:
+ for factory in record.entries:
+ compression = factory.storage_kind
print "Object %s, compression type %s, %d bytes long." % (
- record.key_prefix + metadata['key'],
- metadata['storage_kind'], len(bytes))
+ record.key_prefix + factory.key,
+ compression, len(factory.get_bytes_as(compression)))
This structure should allow stream adapters to be written which can coerce
all records to the type of compression that a particular client needs. For
-instance, inserting into weaves requires fulltexts, so an adapter that
-applies knit records and extracts them to fulltexts will avoid weaves
-needing to know about all potential storage kinds. Likewise, inserting
-into knits would use an adapter that gives everything as either matching
-knit records or full texts.
-
-bytestring metadata
-~~~~~~~~~~~~~~~~~~~
-
-Valid keys in the metadata dict are:
+instance, inserting into weaves requires fulltexts, so a stream would be
+adapted for weaves by an adapter that takes a stream, and the target
+weave, and then uses the target weave to reconstruct full texts (which is
+all that the weave inserter would ask for). In a similar approach, a
+stream could internally delta compress many fulltexts and be able to
+answer both fulltext and compressed record requests without extra IO.
+
+factory metadata
+~~~~~~~~~~~~~~~~
+
+Valid attributes on the factory are:
* sha1: Optional ascii representation of the sha1 of the bytestring (after
delta reconstruction).
* storage_kind: Required kind of storage compression that has been used
@@ -190,6 +194,7 @@
* key: The key for this bytestring. Like each parent this is a tuple that
should have the key_prefix prepended to it to give the unified
repository key name.
+
..
vim: ft=rst tw=74 ai
More information about the bazaar-commits
mailing list