Rev 3054: * Operations pulling data from a smart server where the underlying in http://people.ubuntu.com/~robertc/baz2.0/knit.datastreamjoin
Robert Collins
robertc at robertcollins.net
Fri Nov 30 00:55:01 GMT 2007
At http://people.ubuntu.com/~robertc/baz2.0/knit.datastreamjoin
------------------------------------------------------------
revno: 3054
revision-id:robertc at robertcollins.net-20071130005436-0qmx32hyti0jz0y6
parent: robertc at robertcollins.net-20071129222449-r3r2er12d2p70wmy
committer: Robert Collins <robertc at robertcollins.net>
branch nick: knit.datastreamjoin
timestamp: Fri 2007-11-30 11:54:36 +1100
message:
* Operations pulling data from a smart server where the underlying
repositories have are respectively annotated an unannotated will now work.
(Robert Collins, #165106).
modified:
NEWS NEWS-20050323055033-4e00b5db738777ff
bzrlib/knit.py knit.py-20051212171256-f056ac8f0fbe1bd9
bzrlib/tests/test_knit.py test_knit.py-20051212171302-95d4c00dd5f11f2b
=== modified file 'NEWS'
--- a/NEWS 2007-11-29 18:41:01 +0000
+++ b/NEWS 2007-11-30 00:54:36 +0000
@@ -149,6 +149,10 @@
* Obsolete packs are now cleaned up by pack and autopack operations.
(Robert Collins, #153789)
+ * Operations pulling data from a smart server where the underlying
+ repositories have are respectively annotated an unannotated will now work.
+ (Robert Collins, #165106).
+
* Reconcile now shows progress bars. (Robert Collins, #159351)
* ``RemoteBranch`` was not initializing ``self._revision_id_to_revno_map``
=== modified file 'bzrlib/knit.py'
--- a/bzrlib/knit.py 2007-11-29 00:22:51 +0000
+++ b/bzrlib/knit.py 2007-11-30 00:54:36 +0000
@@ -96,7 +96,6 @@
KnitError,
InvalidRevisionId,
KnitCorrupt,
- KnitDataStreamIncompatible,
KnitHeaderError,
RevisionNotPresent,
RevisionAlreadyPresent,
@@ -738,8 +737,10 @@
"""
if format != self.get_format_signature():
trace.mutter('incompatible format signature inserting to %r', self)
- raise KnitDataStreamIncompatible(
- format, self.get_format_signature())
+ source = self._knit_from_datastream(
+ (format, data_list, reader_callable))
+ self.join(source)
+ return
for version_id, options, length, parents in data_list:
if self.has_version(version_id):
@@ -787,6 +788,28 @@
[(version_id, options, parents, length)],
reader_callable(length))
+ def _knit_from_datastream(self, (format, data_list, reader_callable)):
+ """Create a knit object from a data stream.
+
+ This method exists to allow conversion of data streams that do not
+ match the signature of this knit. Generally it will be slower and use
+ more memory to use this method to insert data, but it will work.
+
+ :seealso: get_data_stream for details on datastreams.
+ :return: A knit versioned file which can be used to join the datastream
+ into self.
+ """
+ if format == "knit-plain":
+ factory = KnitPlainFactory()
+ elif format == "knit-annotated":
+ factory = KnitAnnotateFactory()
+ else:
+ raise errors.KnitDataStreamUnknown(format)
+ access = _StreamAccess(reader_callable)
+ index = _StreamIndex(data_list)
+ return KnitVersionedFile(self.filename, self.transport,
+ factory=factory, index=index, access_method=access)
+
def versions(self):
"""See VersionedFile.versions."""
if 'evil' in debug.debug_flags:
@@ -1512,9 +1535,9 @@
return 'line-delta'
def get_options(self, version_id):
- """Return a string represention options.
+ """Return a string representing options.
- e.g. foo,bar
+ e.g. ['foo', 'bar']
"""
return self._cache[version_id][1]
@@ -1748,9 +1771,9 @@
raise RevisionNotPresent(version_id, self)
def get_options(self, version_id):
- """Return a string represention options.
+ """Return a list representing options.
- e.g. foo,bar
+ e.g. ['foo', 'bar']
"""
node = self._get_node(version_id)
if not self._deltas:
@@ -2022,6 +2045,109 @@
self.write_index = index
+class _StreamAccess(object):
+ """A Knit Access object that provides data from a datastream."""
+
+ def __init__(self, reader_callable):
+ """Create a _StreamAccess object.
+
+ :param reader_callable: The reader_callable from the datastream.
+ This is called to buffer all the data immediately, for
+ random access.
+ """
+ self.data = reader_callable(None)
+
+ def get_raw_records(self, memos_for_retrieval):
+ """Get the raw bytes for a records.
+
+ :param memos_for_retrieval: An iterable containing the (index, pos,
+ length) memo for retrieving the bytes. The .knit method ignores
+ the index as there is always only a single file.
+ :return: An iterator over the bytes of the records.
+ """
+ # use a generator for memory friendliness
+ for _, start, end in memos_for_retrieval:
+ yield self.data[start:end]
+
+
+class _StreamIndex(object):
+ """A Knit Index object that uses the data map from a datastream."""
+
+ def __init__(self, data_list):
+ """Create a _StreamIndex object.
+
+ :param data_list: The data_list from the datastream.
+ """
+ self.data_list = data_list
+ self._by_version = {}
+ pos = 0
+ for key, options, length, parents in data_list:
+ self._by_version[key] = options, (pos, pos + length), parents
+ pos += length
+
+ def get_ancestry(self, versions, topo_sorted):
+ """Get an ancestry list for versions."""
+ if topo_sorted:
+ # Not needed for basic joins
+ raise NotImplementedError(self.get_ancestry)
+ # get a graph of all the mentioned versions:
+ # Little ugly - basically copied from KnitIndex, but don't want to
+ # accidentally incorporate too much of that index's code.
+ graph = {}
+ pending = set(versions)
+ cache = self._by_version
+ while pending:
+ version = pending.pop()
+ # trim ghosts
+ try:
+ parents = [p for p in cache[version][2] if p in cache]
+ except KeyError:
+ raise RevisionNotPresent(version, self)
+ # if not completed and not a ghost
+ pending.update([p for p in parents if p not in graph])
+ graph[version] = parents
+ return graph.keys()
+
+ def get_options(self, version_id):
+ """Return a string representing options.
+
+ e.g. ['foo', 'bar']
+ """
+ return self._by_version[version_id][0]
+
+ def get_parents_with_ghosts(self, version_id):
+ """Return parents of specified version with ghosts."""
+ return self._by_version[version_id][2]
+
+ def get_position(self, version_id):
+ """Return details needed to access the version.
+
+ _StreamAccess has the data as a big array, so we return slice
+ coordinates into that (as index_memo's are opaque outside the
+ index and matching access class.
+
+ :return: a tuple (None, start, end).
+ """
+ start, end = self._by_version[version_id][1]
+ return None, start, end
+
+ def get_versions(self):
+ """Get all the versions in the stream."""
+ return self._by_version.keys()
+
+ def iter_parents(self, version_ids):
+ """Iterate through the parents for many version ids.
+
+ :param version_ids: An iterable yielding version_ids.
+ :return: An iterator that yields (version_id, parents). Requested
+ version_ids not present in the versioned file are simply skipped.
+ The order is undefined, allowing for different optimisations in
+ the underlying implementation.
+ """
+ return [(version, self._by_version[version][2]) for
+ version in version_ids]
+
+
class _KnitData(object):
"""Manage extraction of data from a KnitAccess, caching and decompressing.
@@ -2316,7 +2442,8 @@
if None in version_ids:
version_ids.remove(None)
- self.source_ancestry = set(self.source.get_ancestry(version_ids))
+ self.source_ancestry = set(self.source.get_ancestry(version_ids,
+ topo_sorted=False))
this_versions = set(self.target._index.get_versions())
# XXX: For efficiency we should not look at the whole index,
# we only need to consider the referenced revisions - they
=== modified file 'bzrlib/tests/test_knit.py'
--- a/bzrlib/tests/test_knit.py 2007-11-26 22:33:25 +0000
+++ b/bzrlib/tests/test_knit.py 2007-11-30 00:54:36 +0000
@@ -47,6 +47,8 @@
_KnitIndex,
_PackAccess,
PlainKnitContent,
+ _StreamAccess,
+ _StreamIndex,
WeaveToKnit,
KnitSequenceMatcher,
)
@@ -1132,6 +1134,38 @@
self.assertTrue(k.has_version('text-1'))
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
+ def test_newline_empty_lines(self):
+ # ensure that ["\n"] round trips ok.
+ knit = self.make_test_knit()
+ knit.add_lines('a', [], ["\n"])
+ knit.add_lines_with_ghosts('b', [], ["\n"])
+ self.assertEqual(["\n"], knit.get_lines('a'))
+ self.assertEqual(["\n"], knit.get_lines('b'))
+ self.assertEqual(['fulltext'], knit._index.get_options('a'))
+ self.assertEqual(['fulltext'], knit._index.get_options('b'))
+ knit.add_lines('c', ['a'], ["\n"])
+ knit.add_lines_with_ghosts('d', ['b'], ["\n"])
+ self.assertEqual(["\n"], knit.get_lines('c'))
+ self.assertEqual(["\n"], knit.get_lines('d'))
+ self.assertEqual(['line-delta'], knit._index.get_options('c'))
+ self.assertEqual(['line-delta'], knit._index.get_options('d'))
+
+ def test_empty_lines(self):
+ # bizarrely, [] is not listed as having no-eol.
+ knit = self.make_test_knit()
+ knit.add_lines('a', [], [])
+ knit.add_lines_with_ghosts('b', [], [])
+ self.assertEqual([], knit.get_lines('a'))
+ self.assertEqual([], knit.get_lines('b'))
+ self.assertEqual(['fulltext'], knit._index.get_options('a'))
+ self.assertEqual(['fulltext'], knit._index.get_options('b'))
+ knit.add_lines('c', ['a'], [])
+ knit.add_lines_with_ghosts('d', ['b'], [])
+ self.assertEqual([], knit.get_lines('c'))
+ self.assertEqual([], knit.get_lines('d'))
+ self.assertEqual(['line-delta'], knit._index.get_options('c'))
+ self.assertEqual(['line-delta'], knit._index.get_options('d'))
+
def test_knit_reload(self):
# test that the content in a reloaded knit is correct
k = self.make_test_knit()
@@ -1748,6 +1782,17 @@
knit1.transport.get_bytes(knit1._index._filename),
knit2.transport.get_bytes(knit2._index._filename))
+ def assertKnitValuesEqual(self, left, right):
+ """Assert that the texts and graph of left and right are the same."""
+ self.assertEqual(set(left.versions()), set(right.versions()))
+ for version in left.versions():
+ self.assertEqual(left.get_parents_with_ghosts(version),
+ right.get_parents_with_ghosts(version))
+ self.assertEqual(left.get_lines(version),
+ right.get_lines(version))
+ self.assertEqual(left.annotate(version),
+ right.annotate(version))
+
def test_insert_data_stream_empty(self):
"""Inserting a data stream with no records should not put any data into
the knit.
@@ -1768,12 +1813,23 @@
source = self.make_test_knit(name='source')
source.add_lines('text-a', [], split_lines(TEXT_1))
data_stream = source.get_data_stream(['text-a'])
-
target = self.make_test_knit(name='target')
target.insert_data_stream(data_stream)
-
self.assertKnitFilesEqual(source, target)
+ def test_insert_data_stream_annotated_unannotated(self):
+ """Inserting an annotated datastream to an unannotated knit works."""
+ # case one - full texts.
+ source = self.make_test_knit(name='source', annotate=True)
+ target = self.make_test_knit(name='target', annotate=False)
+ source.add_lines('text-a', [], split_lines(TEXT_1))
+ target.insert_data_stream(source.get_data_stream(['text-a']))
+ self.assertKnitValuesEqual(source, target)
+ # case two - deltas.
+ source.add_lines('text-b', ['text-a'], split_lines(TEXT_2))
+ target.insert_data_stream(source.get_data_stream(['text-b']))
+ self.assertKnitValuesEqual(source, target)
+
def test_insert_data_stream_records_already_present(self):
"""Insert a data stream where some records are alreday present in the
target, and some not. Only the new records are inserted.
@@ -1855,21 +1911,56 @@
self.assertRaises(
errors.KnitCorrupt, target.insert_data_stream, data_stream)
- def test_insert_data_stream_incompatible_format(self):
+ def test_insert_data_stream_unknown_format(self):
"""A data stream in a different format to the target knit cannot be
inserted.
- It will raise KnitDataStreamIncompatible.
+ It will raise KnitDataStreamUnknown because the fallback code will fail
+ to make a knit. In future we may need KnitDataStreamIncompatible again,
+ for more exotic cases.
"""
data_stream = ('fake-format-signature', [], lambda _: '')
target = self.make_test_knit(name='target')
self.assertRaises(
- errors.KnitDataStreamIncompatible,
+ errors.KnitDataStreamUnknown,
target.insert_data_stream, data_stream)
# * test that a stream of "already present version, then new version"
# inserts correctly.
+
+ def assertMadeStreamKnit(self, source_knit, versions, target_knit):
+ """Assert that a knit made from a stream is as expected."""
+ a_stream = source_knit.get_data_stream(versions)
+ expected_data = a_stream[2](None)
+ a_stream = source_knit.get_data_stream(versions)
+ a_knit = target_knit._knit_from_datastream(a_stream)
+ self.assertEqual(source_knit.factory.__class__,
+ a_knit.factory.__class__)
+ self.assertIsInstance(a_knit._data._access, _StreamAccess)
+ self.assertIsInstance(a_knit._index, _StreamIndex)
+ self.assertEqual(a_knit._index.data_list, a_stream[1])
+ self.assertEqual(a_knit._data._access.data, expected_data)
+ self.assertEqual(a_knit.filename, target_knit.filename)
+ self.assertEqual(a_knit.transport, target_knit.transport)
+
+ def test__knit_from_data_stream_empty(self):
+ """Create a knit object from a datastream."""
+ annotated = self.make_test_knit(name='source', annotate=True)
+ plain = self.make_test_knit(name='target', annotate=False)
+ # case 1: annotated source
+ self.assertMadeStreamKnit(annotated, [], annotated)
+ self.assertMadeStreamKnit(annotated, [], plain)
+ # case 2: plain source
+ self.assertMadeStreamKnit(plain, [], annotated)
+ self.assertMadeStreamKnit(plain, [], plain)
+
+ def test__knit_from_data_stream_unknown_format(self):
+ annotated = self.make_test_knit(name='source', annotate=True)
+ self.assertRaises(errors.KnitDataStreamUnknown,
+ annotated._knit_from_datastream, ("unknown", None, None))
+
+
TEXT_1 = """\
Banana cup cakes:
@@ -2719,3 +2810,154 @@
# will fail and we'll adjust it to handle that case correctly, rather
# than allowing an over-read that is bogus.
self.assertEqual(expected_length, len(stream[2](-1)))
+
+
+class Test_StreamIndex(KnitTests):
+
+ def get_index(self, knit, stream):
+ """Get a _StreamIndex from knit and stream."""
+ return knit._knit_from_datastream(stream)._index
+
+ def assertIndexVersions(self, knit, versions):
+ """Check that the _StreamIndex versions are those of the stream."""
+ index = self.get_index(knit, knit.get_data_stream(versions))
+ self.assertEqual(set(index.get_versions()), set(versions))
+ # check we didn't get duplicates
+ self.assertEqual(len(index.get_versions()), len(versions))
+
+ def assertIndexAncestry(self, knit, ancestry_versions, versions, result):
+ """Check the result of a get_ancestry call on knit."""
+ index = self.get_index(knit, knit.get_data_stream(versions))
+ self.assertEqual(result, index.get_ancestry(ancestry_versions, False))
+
+ def assertIterParents(self, knit, versions, result):
+ """Check the result of an iter_parents call on knit."""
+ index = self.get_index(knit, knit.get_data_stream(versions))
+ self.assertEqual(result, index.iter_parents(versions))
+
+ def assertGetOptions(self, knit, version, options):
+ index = self.get_index(knit, knit.get_data_stream(version))
+ self.assertEqual(options, index.get_options(version))
+
+ def assertGetPosition(self, knit, versions, version, result):
+ index = self.get_index(knit, knit.get_data_stream(versions))
+ self.assertEqual(result, index.get_position(version))
+
+ def assertGetParentsWithGhosts(self, knit, versions, version, parents):
+ index = self.get_index(knit, knit.get_data_stream(versions))
+ self.assertEqual(parents, index.get_parents_with_ghosts(version))
+
+ def make_knit_with_4_versions_2_dags(self):
+ knit = self.make_test_knit()
+ knit.add_lines('a', [], ["foo"])
+ knit.add_lines('b', [], [])
+ knit.add_lines('c', ['b', 'a'], [])
+ knit.add_lines_with_ghosts('d', ['e', 'f'], [])
+ return knit
+
+ def test_versions(self):
+ """The versions of a StreamIndex are those of the datastream."""
+ knit = self.make_knit_with_4_versions_2_dags()
+ # ask for most permutations, which catches bugs like falling back to the
+ # target knit, or showing ghosts, etc.
+ self.assertIndexVersions(knit, [])
+ self.assertIndexVersions(knit, ['a'])
+ self.assertIndexVersions(knit, ['b'])
+ self.assertIndexVersions(knit, ['c'])
+ self.assertIndexVersions(knit, ['d'])
+ self.assertIndexVersions(knit, ['a', 'b'])
+ self.assertIndexVersions(knit, ['b', 'c'])
+ self.assertIndexVersions(knit, ['a', 'c'])
+ self.assertIndexVersions(knit, ['a', 'b', 'c'])
+ self.assertIndexVersions(knit, ['a', 'b', 'c', 'd'])
+
+ def test_construct(self):
+ """Constructing a StreamIndex generates index data."""
+ data_list = [('text-a', ['fulltext'], 127, []),
+ ('text-b', ['option'], 128, ['text-c'])]
+ index = _StreamIndex(data_list)
+ self.assertEqual({'text-a':(['fulltext'], (0, 127), []),
+ 'text-b':(['option'], (127, 127 + 128), ['text-c'])},
+ index._by_version)
+
+ def test_get_ancestry(self):
+ knit = self.make_knit_with_4_versions_2_dags()
+ self.assertIndexAncestry(knit, ['a'], ['a'], ['a'])
+ self.assertIndexAncestry(knit, ['b'], ['b'], ['b'])
+ self.assertIndexAncestry(knit, ['c'], ['c'], ['c'])
+ self.assertIndexAncestry(knit, ['c'], ['a', 'b', 'c'],
+ {'a':[], 'b':[], 'c':[]}.keys())
+ self.assertIndexAncestry(knit, ['c', 'd'], ['a', 'b', 'c', 'd'],
+ {'a':[], 'b':[], 'c':[], 'd':[]}.keys())
+
+ def test_iter_parents(self):
+ knit = self.make_knit_with_4_versions_2_dags()
+ self.assertIterParents(knit, ['a'], [('a', [])])
+ self.assertIterParents(knit, ['a', 'b'], [('a', []), ('b', [])])
+ self.assertIterParents(knit, ['a', 'b', 'c'],
+ [('a', []), ('b', []), ('c', ['b', 'a'])])
+ self.assertIterParents(knit, ['a', 'b', 'c', 'd'],
+ [('a', []), ('b', []), ('c', ['b', 'a']), ('d', ['e', 'f'])])
+
+ def test_get_options(self):
+ knit = self.make_knit_with_4_versions_2_dags()
+ self.assertGetOptions(knit, 'a', ['no-eol', 'fulltext'])
+ self.assertGetOptions(knit, 'c', ['line-delta'])
+
+ def test_get_parents_with_ghosts(self):
+ knit = self.make_knit_with_4_versions_2_dags()
+ self.assertGetParentsWithGhosts(knit, ['a'], 'a', [])
+ self.assertGetParentsWithGhosts(knit, ['c'], 'c', ['b', 'a'])
+
+ def test_get_position(self):
+ knit = self.make_knit_with_4_versions_2_dags()
+ # get_position returns (index(can be None), start, end) for
+ # _StreamAccess to use.
+ self.assertGetPosition(knit, ['a'], 'a', (None, 0, 78))
+ self.assertGetPosition(knit, ['a', 'c'], 'c', (None, 78, 156))
+
+
+class Test_StreamIndex(KnitTests):
+
+ def get_index_access(self, knit, stream):
+ """Get a _StreamAccess from knit and stream."""
+ knit = knit._knit_from_datastream(stream)
+ return knit._index, knit._data._access
+
+ def assertGetRawRecords(self, knit, versions):
+ index, access = self.get_index_access(knit,
+ knit.get_data_stream(versions))
+ # check that every version asked for can be obtained from the resulting
+ # access object.
+ # batch
+ memos = []
+ for version in versions:
+ memos.append(knit._index.get_position(version))
+ original = {}
+ for version, data in zip(
+ versions, knit._data._access.get_raw_records(memos)):
+ original[version] = data
+ memos = []
+ for version in versions:
+ memos.append(index.get_position(version))
+ streamed = {}
+ for version, data in zip(versions, access.get_raw_records(memos)):
+ streamed[version] = data
+ self.assertEqual(original, streamed)
+ # individually
+ for version in versions:
+ data = list(access.get_raw_records(
+ [index.get_position(version)]))[0]
+ self.assertEqual(original[version], data)
+
+ def make_knit_with_two_versions(self):
+ knit = self.make_test_knit()
+ knit.add_lines('a', [], ["foo"])
+ knit.add_lines('b', [], ["bar"])
+ return knit
+
+ def test_get_raw_records(self):
+ knit = self.make_knit_with_two_versions()
+ self.assertGetRawRecords(knit, ['a', 'b'])
+ self.assertGetRawRecords(knit, ['a'])
+ self.assertGetRawRecords(knit, ['b'])
More information about the bazaar-commits
mailing list