Rev 2: We now pass the versionedfiles interface tests. in http://bzr.arbash-meinel.com/plugins/xdelta_repo
John Arbash Meinel
john at arbash-meinel.com
Fri Feb 20 19:58:42 GMT 2009
At http://bzr.arbash-meinel.com/plugins/xdelta_repo
------------------------------------------------------------
revno: 2
revision-id: john at arbash-meinel.com-20090220195818-r2u0w445n9qa9cmx
parent: john at arbash-meinel.com-20090220185346-ezbwh3bm1o3skyda
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: xdelta_repo
timestamp: Fri 2009-02-20 13:58:18 -0600
message:
We now pass the versionedfiles interface tests.
Thanks tremendously to groupcompress, which gave me an easy source for copy & paste
all of the implementations are pretty generic. Shouldn't they really be on
the VersionedFiles base class?
-------------- next part --------------
=== modified file '__init__.py'
--- a/__init__.py 2009-02-20 18:53:46 +0000
+++ b/__init__.py 2009-02-20 19:58:18 +0000
@@ -18,27 +18,34 @@
"""Use xdelta as the compression methodology for bzr storage.
"""
-
-
-from bzrlib.bzrdir import format_registry
-from bzrlib.repository import format_registry as repo_registry
-format_registry.register_metadir('xd',
- 'bzrlib.plugins.xdelta_repo.repofmt.RepositoryFormatPackXDelta',
- help='pack-0.92 with btree index and xdelta. '
- 'Please read '
- 'http://doc.bazaar-vcs.org/latest/developers/development-repo.html '
- 'before use.',
- branch_format='bzrlib.branch.BzrBranchFormat7',
- tree_format='bzrlib.workingtree.WorkingTreeFormat5',
- hidden=False,
- experimental=True,
- )
-
-repo_registry.register_lazy(
- 'Bazaar development format - btree+xdelta (needs bzr.dev from 1.13)\n',
- 'bzrlib.plugins.groupcompress.repofmt',
- 'RepositoryFormatPackXDelta',
- )
+try:
+ import xd3
+except ImportError:
+ import sys
+ sys.stderr.note('Unable to import xd3, xdelta_repo storage'
+ ' will be disabled\n')
+else:
+ # Ideally we wouldn't need to import xd3 before we would know that we don't
+ # want to register things.
+ from bzrlib.bzrdir import format_registry
+ from bzrlib.repository import format_registry as repo_registry
+ format_registry.register_metadir('xd',
+ 'bzrlib.plugins.xdelta_repo.repofmt.RepositoryFormatPackXDelta',
+ help='pack-0.92 with btree index and xdelta. '
+ 'Please read '
+ 'http://doc.bazaar-vcs.org/latest/developers/development-repo.html '
+ 'before use.',
+ branch_format='bzrlib.branch.BzrBranchFormat7',
+ tree_format='bzrlib.workingtree.WorkingTreeFormat5',
+ hidden=False,
+ experimental=True,
+ )
+
+ repo_registry.register_lazy(
+ 'Bazaar development format - btree+xdelta (needs bzr.dev from 1.13)\n',
+ 'bzrlib.plugins.groupcompress.repofmt',
+ 'RepositoryFormatPackXDelta',
+ )
def load_tests(standard_tests, module, loader):
=== modified file 'tests/test_xdelta_repo.py'
--- a/tests/test_xdelta_repo.py 2009-02-20 18:53:46 +0000
+++ b/tests/test_xdelta_repo.py 2009-02-20 19:58:18 +0000
@@ -15,3 +15,33 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""Direct tests for xdelta_repo."""
+
+from bzrlib import tests
+
+from bzrlib.plugins.xdelta_repo import xdelta_repo
+
+
+# Adapt all TestVersionedFiles for the xdelta repo implementation
+def load_tests(standard_tests, module, loader):
+ from bzrlib.tests.test_versionedfile import TestVersionedFiles
+ vf_interface_tests = loader.loadTestsFromTestCase(TestVersionedFiles)
+ cleanup_pack_group = xdelta_repo.cleanup_pack_group
+ make_pack_factory = xdelta_repo.make_pack_factory
+ group_scenario = ('xdelta', {
+ 'cleanup':cleanup_pack_group,
+ 'factory':make_pack_factory(False, False, 1),
+ 'graph': False,
+ 'key_length':1,
+ 'support_partial_insertion': False, ### ???
+ }
+ )
+ applier = tests.TestScenarioApplier()
+ applier.scenarios = [group_scenario]
+ tests.adapt_tests(vf_interface_tests, applier, standard_tests)
+ return standard_tests
+
+
+class TestXDeltaVersionedFile(tests.TestCaseWithTransport):
+
+ def test_pass(self):
+ pass
=== modified file 'xdelta_repo.py'
--- a/xdelta_repo.py 2009-02-20 18:53:46 +0000
+++ b/xdelta_repo.py 2009-02-20 19:58:18 +0000
@@ -14,5 +14,547 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-""""""
+"""Implementations of XDelta repository objects"""
+
+import xd3
+
+from bzrlib import (
+ annotate,
+ btree_index,
+ debug,
+ errors,
+ graph as _mod_graph,
+ knit,
+ osutils,
+ pack,
+ versionedfile,
+ )
+from bzrlib.repofmt import pack_repo
+
+
+def _parse(bytes):
+ header, label_line, sha1_line, text_bytes = bytes.split('\n', 3)
+ if header != 'xdelta3':
+ raise errors.KnitCorrupt("invalid header bytes: %r"
+ % (bytes[:20],))
+ if (not label_line.startswith('label: ')
+ or not sha1_line.startswith('sha1: ')):
+ raise errors.KnitCorrupt("invalid header value")
+ label = tuple(label_line[7:].split('\x00'))
+ sha1 = sha1_line[6:]
+ return label, sha1, text_bytes
+
+
+class _XDeltaIndex(object):
+ """Similar to _KnitIndex, overlay on top of a GraphIndex."""
+
+ def __init__(self, graph_index, is_locked, parents=True,
+ add_callback=None):
+ """Construct a _XDeltaIndex on a graph_index.
+
+ :param graph_index: An implementation of bzrlib.index.GraphIndex.
+ :param is_locked: A callback to check whether the object should answer
+ queries.
+ :param parents: If True, record knits parents, if not do not record
+ parents.
+ :param add_callback: If not None, allow additions to the index and call
+ this callback with a list of added GraphIndex nodes:
+ [(node, value, node_refs), ...]
+ :param is_locked: A callback, returns True if the index is locked and
+ thus usable.
+ """
+ self._add_callback = add_callback
+ self._graph_index = graph_index
+ self._parents = parents
+ self.has_graph = parents
+ self._is_locked = is_locked
+
+ def add_records(self, records, random_id=False):
+ """Add multiple records to the index.
+
+ This function does not insert data into the Immutable GraphIndex
+ backing the KnitGraphIndex, instead it prepares data for insertion by
+ the caller and checks that it is safe to insert then calls
+ self._add_callback with the prepared GraphIndex nodes.
+
+ :param records: a list of tuples:
+ (key, options, access_memo, parents).
+ :param random_id: If True the ids being added were randomly generated
+ and no check for existence will be performed.
+ """
+ if not self._add_callback:
+ raise errors.ReadOnlyError(self)
+ # we hope there are no repositories with inconsistent parentage
+ # anymore.
+
+ changed = False
+ keys = {}
+ for (key, value, refs) in records:
+ if not self._parents:
+ if refs:
+ for ref in refs:
+ if ref:
+ raise errors.KnitCorrupt(self,
+ "attempt to add node with parents "
+ "in parentless index.")
+ refs = ()
+ changed = True
+ keys[key] = (value, refs)
+ # check for dups
+ if not random_id:
+ present_nodes = self._get_entries(keys)
+ for (index, key, value, node_refs) in present_nodes:
+ if node_refs != keys[key][1]:
+ raise errors.KnitCorrupt(self,
+ "inconsistent details in add_records"
+ ": %s %s" % ((value, node_refs), keys[key]))
+ del keys[key]
+ changed = True
+ if changed:
+ result = []
+ if self._parents:
+ for key, (value, node_refs) in keys.iteritems():
+ result.append((key, value, node_refs))
+ else:
+ for key, (value, node_refs) in keys.iteritems():
+ result.append((key, value))
+ records = result
+ self._add_callback(records)
+
+ def _get_entries(self, keys, check_present=False):
+ """Get the entries for keys.
+
+ :param keys: An iterable of index key tuples.
+ """
+ keys = set(keys)
+ found_keys = set()
+ if self._parents:
+ for node in self._graph_index.iter_entries(keys):
+ yield node
+ found_keys.add(node[1])
+ else:
+ # adapt parentless index to the rest of the code.
+ for node in self._graph_index.iter_entries(keys):
+ yield node[0], node[1], node[2], ()
+ found_keys.add(node[1])
+ if check_present:
+ missing_keys = keys.difference(found_keys)
+ if missing_keys:
+ raise RevisionNotPresent(missing_keys.pop(), self)
+
+ def get_build_details(self, keys):
+ """Get the various build details for these keys."""
+ # self._check_read()
+ result = {}
+ entries = self._get_entries(keys, False)
+ for entry in entries:
+ key = entry[1]
+ if not self._parents:
+ parents = None
+ else:
+ parents = entry[3][0]
+ value = entry[2]
+ method = 'group'
+ result[key] = (self._node_to_position(entry),
+ None, parents, (method, None))
+ return result
+
+ def get_parent_map(self, keys):
+ """Get a map of the parents of keys.
+
+ :param keys: The keys to look up parents for.
+ :return: A mapping from keys to parents. Absent keys are absent from
+ the mapping.
+ """
+ # self._check_read()
+ nodes = self._get_entries(keys)
+ result = {}
+ if self._parents:
+ for node in nodes:
+ result[node[1]] = node[3][0]
+ else:
+ for node in nodes:
+ result[node[1]] = None
+ return result
+
+ def keys(self):
+ """Get all the keys in the collection.
+
+ The keys are not ordered.
+ """
+ ## self._check_read()
+ return [node[1] for node in self._graph_index.iter_all_entries()]
+
+ def _node_to_position(self, node):
+ """Convert an index value to position details."""
+ bits = node[2].split(' ')
+ start = int(bits[0])
+ length = int(bits[1])
+ raw_bytes = int(bits[2])
+ return node[0], start, length, raw_bytes
+
+
+class XDeltaVersionedFiles(versionedfile.VersionedFiles):
+
+ def __init__(self, index, access, delta=True):
+ """Create a VersionedFiles object.
+
+ :param index: The index object storing access and graph data.
+ :param access: The access object storing raw data.
+ :param delta: Whether to delta compress or just entropy compress.
+ """
+ self._index = index
+ self._access = access
+ self._delta = delta
+
+ def add_lines(self, key, parents, lines, parent_texts=None,
+ left_matching_blocks=None, nostore_sha=None, random_id=False,
+ check_content=True):
+ """Add a text to the store.
+
+ :param key: The key tuple of the text to add.
+ :param parents: The parents key tuples of the text to add.
+ :param lines: A list of lines. Each line must be a bytestring. And all
+ of them except the last must be terminated with \n and contain no
+ other \n's. The last line may either contain no \n's or a single
+ terminating \n. If the lines list does meet this constraint the add
+ routine may error or may succeed - but you will be unable to read
+ the data back accurately. (Checking the lines have been split
+ correctly is expensive and extremely unlikely to catch bugs so it
+ is not done at runtime unless check_content is True.)
+ :param parent_texts: An optional dictionary containing the opaque
+ representations of some or all of the parents of version_id to
+ allow delta optimisations. VERY IMPORTANT: the texts must be those
+ returned by add_lines or data corruption can be caused.
+ :param left_matching_blocks: a hint about which areas are common
+ between the text and its left-hand-parent. The format is
+ the SequenceMatcher.get_matching_blocks format.
+ :param nostore_sha: Raise ExistingContent and do not add the lines to
+ the versioned file if the digest of the lines matches this.
+ :param random_id: If True a random id has been selected rather than
+ an id determined by some deterministic process such as a converter
+ from a foreign VCS. When True the backend may choose not to check
+ for uniqueness of the resulting key within the versioned file, so
+ this should only be done when the result is expected to be unique
+ anyway.
+ :param check_content: If True, the lines supplied are verified to be
+ bytestrings that are correctly formed lines.
+ :return: The text sha1, the number of bytes in the text, and an opaque
+ representation of the inserted version which can be provided
+ back to future add_lines calls in the parent_texts dictionary.
+ """
+ # self._index._check_write_ok()
+ self._check_add(key, lines, random_id, check_content)
+ if parents is None:
+ # The caller might pass None if there is no graph data, but kndx
+ # indexes can't directly store that, so we give them
+ # an empty tuple instead.
+ parents = ()
+ # double handling for now. Make it work until then.
+ length = sum(map(len, lines))
+ record = versionedfile.ChunkedContentFactory(key, parents, None, lines)
+ sha1 = list(self._insert_record_stream([record], random_id=random_id))[0]
+ return sha1, length, None
+
+ def annotate(self, key):
+ """See VersionedFiles.annotate."""
+ g = _mod_graph.Graph(self)
+ parent_map = self.get_parent_map([key])
+ if not parent_map:
+ raise errors.RevisionNotPresent(key, self)
+ if parent_map[key] is not None:
+ search = g._make_breadth_first_searcher([key])
+ keys = set()
+ while True:
+ try:
+ present, ghosts = search.next_with_ghosts()
+ except StopIteration:
+ break
+ keys.update(present)
+ parent_map = self.get_parent_map(keys)
+ else:
+ keys = [key]
+ parent_map = {key:()}
+ head_cache = _mod_graph.FrozenHeadsCache(g)
+ parent_cache = {}
+ reannotate = annotate.reannotate
+ for record in self.get_record_stream(keys, 'topological', True):
+ key = record.key
+ chunks = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
+ parent_lines = [parent_cache[parent] for parent in parent_map[key]]
+ parent_cache[key] = list(
+ reannotate(parent_lines, chunks, key, None, head_cache))
+ return parent_cache[key]
+
+ def _check_add(self, key, lines, random_id, check_content):
+ """check that version_id and lines are safe to add."""
+ # XXX: Can we just use the base implementation?
+ version_id = key[-1]
+ if version_id is not None:
+ if osutils.contains_whitespace(version_id):
+ raise InvalidRevisionId(version_id, self)
+ self.check_not_reserved_id(version_id)
+ # TODO: If random_id==False and the key is already present, we should
+ # probably check that the existing content is identical to what is
+ # being inserted, and otherwise raise an exception. This would make
+ # the bundle code simpler.
+ if check_content:
+ self._check_lines_not_unicode(lines)
+ self._check_lines_are_lines(lines)
+
+ def check(self, progress_bar=None):
+ """See VersionedFiles.check()."""
+ keys = self.keys()
+ for record in self.get_record_stream(keys, 'unordered', True):
+ record.get_bytes_as('fulltext')
+
+ def get_parent_map(self, keys):
+ """Get a map of the parents of keys.
+
+ :param keys: The keys to look up parents for.
+ :return: A mapping from keys to parents. Absent keys are absent from
+ the mapping.
+ """
+ result = {}
+ sources = [self._index]
+ source_results = []
+ missing = set(keys)
+ for source in sources:
+ if not missing:
+ break
+ new_result = source.get_parent_map(missing)
+ source_results.append(new_result)
+ result.update(new_result)
+ missing.difference_update(set(new_result))
+ # if self._unadded_refs:
+ # for key in missing:
+ # if key in self._unadded_refs:
+ # result[key] = self._unadded_refs[key]
+ return result
+
+ def get_record_stream(self, keys, ordering, include_delta_closure):
+ """Get a stream of records for keys.
+
+ :param keys: The keys to include.
+ :param ordering: Either 'unordered' or 'topological'. A topologically
+ sorted stream has compression parents strictly before their
+ children.
+ :param include_delta_closure: If True then the closure across any
+ compression parents will be included (in the opaque data).
+ :return: An iterator of ContentFactory objects, each of which is only
+ valid until the iterator is advanced.
+ """
+ # keys might be a generator
+ keys = set(keys)
+ if not keys:
+ return
+ if not self._index.has_graph:
+ # Cannot topological order when no graph has been stored.
+ ordering = 'unordered'
+ # Cheap: iterate
+ locations = self._index.get_build_details(keys)
+ # local_keys = frozenset(keys).intersection(set(self._unadded_refs))
+ # locations.update((key, None) for key in local_keys)
+ if ordering == 'topological':
+ # would be better to not globally sort initially but instead
+ # start with one key, recurse to its oldest parent, then grab
+ # everything in the same group, etc.
+ parent_map = dict((key, details[2]) for key, details in
+ locations.iteritems())
+ # for key in local_keys:
+ # parent_map[key] = self._unadded_refs[key]
+ present_keys = topo_sort(parent_map)
+ # Now group by source:
+ else:
+ # Group by the index memo
+ def get_index_memo(key):
+ # This is the group the bytes are stored in, followed by the
+ # location in the group
+ return locations[key][0]
+ present_keys = sorted(locations.iterkeys(), key=get_index_memo)
+ # We don't have an ordering for keys in the in-memory object, but
+ # lets process the in-memory ones first.
+ # local = list(local_keys)
+ # present_keys = list(local_keys) + present_keys
+ absent_keys = keys.difference(set(locations))
+ for key in absent_keys:
+ yield versionedfile.AbsentContentFactory(key)
+ for key in present_keys:
+ # if key in self._unadded_refs:
+ # lines, sha1 = self._compressor.extract(key)
+ # parents = self._unadded_refs[key]
+ # else:
+ index_memo, _, parents, (method, _) = locations[key]
+ read_memo = index_memo[0:3]
+ enc_data = self._access.get_raw_records([read_memo]).next()
+ total_bytes_length = index_memo[3]
+ bytes = xd3.decode_memory(enc_data, total_bytes_length, source=None)
+ label, sha1, text_bytes = _parse(bytes)
+ if not bytes.startswith('xdelta3\n'):
+ raise errors.KnitCorrupt("invalid header bytes: %r"
+ % (bytes[:20],))
+ if label != key:
+ raise AssertionError("wrong key: %r, wanted %r" % (label, key))
+ if osutils.sha_string(text_bytes) != sha1:
+ raise AssertionError('sha1 sum did not match')
+ yield versionedfile.FulltextContentFactory(key, parents, sha1,
+ text_bytes)
+
+ def get_sha1s(self, keys):
+ """See VersionedFiles.get_sha1s()."""
+ result = {}
+ for record in self.get_record_stream(keys, 'unordered', True):
+ if record.sha1 != None:
+ result[record.key] = record.sha1
+ else:
+ if record.storage_kind != 'absent':
+ result[record.key] == sha_string(record.get_bytes_as(
+ 'fulltext'))
+ return result
+
+ def insert_record_stream(self, stream):
+ """Insert a record stream into this container.
+
+ :param stream: A stream of records to insert.
+ :return: None
+ :seealso VersionedFiles.get_record_stream:
+ """
+ for _ in self._insert_record_stream(stream):
+ pass
+
+ def _insert_record_stream(self, stream, random_id=False):
+ """Internal core to insert a record stream into this container.
+
+ This helper function has a different interface than insert_record_stream
+ to allow add_lines to be minimal, but still return the needed data.
+
+ :param stream: A stream of records to insert.
+ :return: An iterator over the sha1 of the inserted records.
+ :seealso insert_record_stream:
+ :seealso add_lines:
+ """
+ def get_adapter(adapter_key):
+ try:
+ return adapters[adapter_key]
+ except KeyError:
+ adapter_factory = versionedfile.adapter_registry.get(
+ adapter_key)
+ adapter = adapter_factory(self)
+ adapters[adapter_key] = adapter
+ return adapter
+ adapters = {}
+
+ for record in stream:
+ if record.storage_kind == 'absent':
+ raise errors.RevisionNotPresent([record.key], self)
+ elif record.storage_kind in ('chunked', 'fulltext'):
+ bytes = record.get_bytes_as('fulltext')
+ else:
+ adapter_key = record.storage_kind, 'fulltext'
+ adapter = get_adapter(adapter_key)
+ bytes = adapter.get_bytes(record)
+ sha1 = osutils.sha_string(bytes)
+ header = 'xdelta3\nlabel: %s\nsha1: %s\n' % (
+ '\x00'.join(record.key), sha1)
+ total_bytes = header + bytes
+ total_bytes_length = len(total_bytes)
+ # XXX: Eventually we'll want to include 'source'
+ comp_bytes = xd3.encode_memory(total_bytes, source=None,
+ flags=xd3.SEC_DJW)
+ index, start, length = self._access.add_raw_records(
+ [(None, len(comp_bytes))], comp_bytes)[0]
+ # For now, we have no compression parents
+ nodes = [(record.key, "%d %d %d" % (start, length,
+ total_bytes_length),
+ (record.parents, ()))]
+ self._index.add_records(nodes)
+ yield sha1
+
+ def iter_lines_added_or_present_in_keys(self, keys, pb=None):
+ """Iterate over the lines in the versioned files from keys.
+
+ This may return lines from other keys. Each item the returned
+ iterator yields is a tuple of a line and a text version that that line
+ is present in (not introduced in).
+
+ Ordering of results is in whatever order is most suitable for the
+ underlying storage format.
+
+ If a progress bar is supplied, it may be used to indicate progress.
+ The caller is responsible for cleaning up progress bars (because this
+ is an iterator).
+
+ NOTES:
+ * Lines are normalised by the underlying store: they will all have \n
+ terminators.
+ * Lines are returned in arbitrary order.
+
+ :return: An iterator over (line, key).
+ """
+ if pb is None:
+ pb = progress.DummyProgress()
+ keys = set(keys)
+ total = len(keys)
+ # we don't care about inclusions, the caller cares.
+ # but we need to setup a list of records to visit.
+ # we need key, position, length
+ for key_idx, record in enumerate(self.get_record_stream(keys,
+ 'unordered', True)):
+ # XXX: todo - optimise to use less than full texts.
+ key = record.key
+ pb.update('Walking content.', key_idx, total)
+ if record.storage_kind == 'absent':
+ raise errors.RevisionNotPresent(record.key, self)
+ lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
+ for line in lines:
+ yield line, key
+ pb.update('Walking content.', total, total)
+
+ def keys(self):
+ """See VersionedFiles.keys."""
+ if 'evil' in debug.debug_flags:
+ trace.mutter_callsite(2, "keys scales with size of history")
+ sources = [self._index]
+ result = set()
+ for source in sources:
+ result.update(source.keys())
+ return result
+
+
+def make_pack_factory(graph, delta, keylength):
+ """Create a factory for creating a pack based xdelta compression.
+
+ This is only functional enough to run interface tests, it doesn't try to
+ provide a full pack environment.
+
+ :param graph: Store a graph.
+ :param delta: Delta compress contents.
+ :param keylength: How long should keys be.
+ """
+ def factory(transport):
+ parents = graph or delta
+ ref_length = 0
+ if graph:
+ ref_length += 1
+ graph_index = btree_index.BTreeBuilder(reference_lists=ref_length,
+ key_elements=keylength)
+ stream = transport.open_write_stream('newpack')
+ writer = pack.ContainerWriter(stream.write)
+ writer.begin()
+ # index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
+ # add_callback=graph_index.add_nodes)
+ index = _XDeltaIndex(graph_index, lambda:True, parents=parents,
+ add_callback=graph_index.add_nodes)
+ access = knit._DirectPackAccess({})
+ access.set_writer(writer, graph_index, (transport, 'newpack'))
+ result = XDeltaVersionedFiles(index, access, delta)
+ result.stream = stream
+ result.writer = writer
+ return result
+ return factory
+
+
+def cleanup_pack_group(versioned_files):
+ versioned_files.writer.end()
+ versioned_files.stream.close()
More information about the bazaar-commits
mailing list