Rev 2: We now pass the versionedfiles interface tests. in http://bzr.arbash-meinel.com/plugins/xdelta_repo

John Arbash Meinel john at arbash-meinel.com
Fri Feb 20 19:58:42 GMT 2009


At http://bzr.arbash-meinel.com/plugins/xdelta_repo

------------------------------------------------------------
revno: 2
revision-id: john at arbash-meinel.com-20090220195818-r2u0w445n9qa9cmx
parent: john at arbash-meinel.com-20090220185346-ezbwh3bm1o3skyda
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: xdelta_repo
timestamp: Fri 2009-02-20 13:58:18 -0600
message:
  We now pass the versionedfiles interface tests.
  
  Thanks tremendously to groupcompress, which gave me an easy source for copy & paste
  all of the implementations are pretty generic. Shouldn't they really be on
  the VersionedFiles base class?
-------------- next part --------------
=== modified file '__init__.py'
--- a/__init__.py	2009-02-20 18:53:46 +0000
+++ b/__init__.py	2009-02-20 19:58:18 +0000
@@ -18,27 +18,34 @@
 """Use xdelta as the compression methodology for bzr storage.
 """
 
-
-
-from bzrlib.bzrdir import format_registry
-from bzrlib.repository import format_registry as repo_registry
-format_registry.register_metadir('xd',
-    'bzrlib.plugins.xdelta_repo.repofmt.RepositoryFormatPackXDelta',
-    help='pack-0.92 with btree index and xdelta. '
-        'Please read '
-        'http://doc.bazaar-vcs.org/latest/developers/development-repo.html '
-        'before use.',
-    branch_format='bzrlib.branch.BzrBranchFormat7',
-    tree_format='bzrlib.workingtree.WorkingTreeFormat5',
-    hidden=False,
-    experimental=True,
-    )
-
-repo_registry.register_lazy(
-    'Bazaar development format - btree+xdelta (needs bzr.dev from 1.13)\n',
-    'bzrlib.plugins.groupcompress.repofmt',
-    'RepositoryFormatPackXDelta',
-    )
+try:
+    import xd3
+except ImportError:
+    import sys
+    sys.stderr.note('Unable to import xd3, xdelta_repo storage'
+                    ' will be disabled\n')
+else:
+    # Ideally we wouldn't need to import xd3 before we would know that we don't
+    # want to register things.
+    from bzrlib.bzrdir import format_registry
+    from bzrlib.repository import format_registry as repo_registry
+    format_registry.register_metadir('xd',
+        'bzrlib.plugins.xdelta_repo.repofmt.RepositoryFormatPackXDelta',
+        help='pack-0.92 with btree index and xdelta. '
+            'Please read '
+            'http://doc.bazaar-vcs.org/latest/developers/development-repo.html '
+            'before use.',
+        branch_format='bzrlib.branch.BzrBranchFormat7',
+        tree_format='bzrlib.workingtree.WorkingTreeFormat5',
+        hidden=False,
+        experimental=True,
+        )
+
+    repo_registry.register_lazy(
+        'Bazaar development format - btree+xdelta (needs bzr.dev from 1.13)\n',
+        'bzrlib.plugins.groupcompress.repofmt',
+        'RepositoryFormatPackXDelta',
+        )
 
 
 def load_tests(standard_tests, module, loader):

=== modified file 'tests/test_xdelta_repo.py'
--- a/tests/test_xdelta_repo.py	2009-02-20 18:53:46 +0000
+++ b/tests/test_xdelta_repo.py	2009-02-20 19:58:18 +0000
@@ -15,3 +15,33 @@
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 """Direct tests for xdelta_repo."""
+
+from bzrlib import tests
+
+from bzrlib.plugins.xdelta_repo import xdelta_repo
+
+
+# Adapt all TestVersionedFiles for the xdelta repo implementation
+def load_tests(standard_tests, module, loader):
+    from bzrlib.tests.test_versionedfile import TestVersionedFiles
+    vf_interface_tests = loader.loadTestsFromTestCase(TestVersionedFiles)
+    cleanup_pack_group = xdelta_repo.cleanup_pack_group
+    make_pack_factory = xdelta_repo.make_pack_factory
+    group_scenario = ('xdelta', {
+            'cleanup':cleanup_pack_group,
+            'factory':make_pack_factory(False, False, 1),
+            'graph': False,
+            'key_length':1,
+            'support_partial_insertion': False, ### ???
+            }
+        )
+    applier = tests.TestScenarioApplier()
+    applier.scenarios = [group_scenario]
+    tests.adapt_tests(vf_interface_tests, applier, standard_tests)
+    return standard_tests
+
+
+class TestXDeltaVersionedFile(tests.TestCaseWithTransport):
+
+    def test_pass(self):
+        pass

=== modified file 'xdelta_repo.py'
--- a/xdelta_repo.py	2009-02-20 18:53:46 +0000
+++ b/xdelta_repo.py	2009-02-20 19:58:18 +0000
@@ -14,5 +14,547 @@
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-""""""
+"""Implementations of XDelta repository objects"""
+
+import xd3
+
+from bzrlib import (
+    annotate,
+    btree_index,
+    debug,
+    errors,
+    graph as _mod_graph,
+    knit,
+    osutils,
+    pack,
+    versionedfile,
+    )
+from bzrlib.repofmt import pack_repo
+
+
+def _parse(bytes):
+    header, label_line, sha1_line, text_bytes = bytes.split('\n', 3)
+    if header != 'xdelta3':
+        raise errors.KnitCorrupt("invalid header bytes: %r"
+            % (bytes[:20],))
+    if (not label_line.startswith('label: ')
+        or not sha1_line.startswith('sha1: ')):
+        raise errors.KnitCorrupt("invalid header value")
+    label = tuple(label_line[7:].split('\x00'))
+    sha1 = sha1_line[6:]
+    return label, sha1, text_bytes
+
+
+class _XDeltaIndex(object):
+    """Similar to _KnitIndex, overlay on top of a GraphIndex."""
+
+    def __init__(self, graph_index, is_locked, parents=True,
+                 add_callback=None):
+        """Construct a _XDeltaIndex on a graph_index.
+
+        :param graph_index: An implementation of bzrlib.index.GraphIndex.
+        :param is_locked: A callback to check whether the object should answer
+            queries.
+        :param parents: If True, record knits parents, if not do not record 
+            parents.
+        :param add_callback: If not None, allow additions to the index and call
+            this callback with a list of added GraphIndex nodes:
+            [(node, value, node_refs), ...]
+        :param is_locked: A callback, returns True if the index is locked and
+            thus usable.
+        """
+        self._add_callback = add_callback
+        self._graph_index = graph_index
+        self._parents = parents
+        self.has_graph = parents
+        self._is_locked = is_locked
+
+    def add_records(self, records, random_id=False):
+        """Add multiple records to the index.
+
+        This function does not insert data into the Immutable GraphIndex
+        backing the KnitGraphIndex, instead it prepares data for insertion by
+        the caller and checks that it is safe to insert then calls
+        self._add_callback with the prepared GraphIndex nodes.
+
+        :param records: a list of tuples:
+                         (key, options, access_memo, parents).
+        :param random_id: If True the ids being added were randomly generated
+            and no check for existence will be performed.
+        """
+        if not self._add_callback:
+            raise errors.ReadOnlyError(self)
+        # we hope there are no repositories with inconsistent parentage
+        # anymore.
+
+        changed = False
+        keys = {}
+        for (key, value, refs) in records:
+            if not self._parents:
+                if refs:
+                    for ref in refs:
+                        if ref:
+                            raise errors.KnitCorrupt(self,
+                                "attempt to add node with parents "
+                                "in parentless index.")
+                    refs = ()
+                    changed = True
+            keys[key] = (value, refs)
+        # check for dups
+        if not random_id:
+            present_nodes = self._get_entries(keys)
+            for (index, key, value, node_refs) in present_nodes:
+                if node_refs != keys[key][1]:
+                    raise errors.KnitCorrupt(self,
+                        "inconsistent details in add_records"
+                        ": %s %s" % ((value, node_refs), keys[key]))
+                del keys[key]
+                changed = True
+        if changed:
+            result = []
+            if self._parents:
+                for key, (value, node_refs) in keys.iteritems():
+                    result.append((key, value, node_refs))
+            else:
+                for key, (value, node_refs) in keys.iteritems():
+                    result.append((key, value))
+            records = result
+        self._add_callback(records)
+
+    def _get_entries(self, keys, check_present=False):
+        """Get the entries for keys.
+
+        :param keys: An iterable of index key tuples.
+        """
+        keys = set(keys)
+        found_keys = set()
+        if self._parents:
+            for node in self._graph_index.iter_entries(keys):
+                yield node
+                found_keys.add(node[1])
+        else:
+            # adapt parentless index to the rest of the code.
+            for node in self._graph_index.iter_entries(keys):
+                yield node[0], node[1], node[2], ()
+                found_keys.add(node[1])
+        if check_present:
+            missing_keys = keys.difference(found_keys)
+            if missing_keys:
+                raise RevisionNotPresent(missing_keys.pop(), self)
+
+    def get_build_details(self, keys):
+        """Get the various build details for these keys."""
+        # self._check_read()
+        result = {}
+        entries = self._get_entries(keys, False)
+        for entry in entries:
+            key = entry[1]
+            if not self._parents:
+                parents = None
+            else:
+                parents = entry[3][0]
+            value = entry[2]
+            method = 'group'
+            result[key] = (self._node_to_position(entry),
+                                  None, parents, (method, None))
+        return result
+
+    def get_parent_map(self, keys):
+        """Get a map of the parents of keys.
+
+        :param keys: The keys to look up parents for.
+        :return: A mapping from keys to parents. Absent keys are absent from
+            the mapping.
+        """
+        # self._check_read()
+        nodes = self._get_entries(keys)
+        result = {}
+        if self._parents:
+            for node in nodes:
+                result[node[1]] = node[3][0]
+        else:
+            for node in nodes:
+                result[node[1]] = None
+        return result
+
+    def keys(self):
+        """Get all the keys in the collection.
+
+        The keys are not ordered.
+        """
+        ## self._check_read()
+        return [node[1] for node in self._graph_index.iter_all_entries()]
+
+    def _node_to_position(self, node):
+        """Convert an index value to position details."""
+        bits = node[2].split(' ')
+        start = int(bits[0])
+        length = int(bits[1])
+        raw_bytes = int(bits[2])
+        return node[0], start, length, raw_bytes
+
+
+class XDeltaVersionedFiles(versionedfile.VersionedFiles):
+
+    def __init__(self, index, access, delta=True):
+        """Create a VersionedFiles object.
+
+        :param index: The index object storing access and graph data.
+        :param access: The access object storing raw data.
+        :param delta: Whether to delta compress or just entropy compress.
+        """
+        self._index = index
+        self._access = access
+        self._delta = delta
+
+    def add_lines(self, key, parents, lines, parent_texts=None,
+        left_matching_blocks=None, nostore_sha=None, random_id=False,
+        check_content=True):
+        """Add a text to the store.
+
+        :param key: The key tuple of the text to add.
+        :param parents: The parents key tuples of the text to add.
+        :param lines: A list of lines. Each line must be a bytestring. And all
+            of them except the last must be terminated with \n and contain no
+            other \n's. The last line may either contain no \n's or a single
+            terminating \n. If the lines list does meet this constraint the add
+            routine may error or may succeed - but you will be unable to read
+            the data back accurately. (Checking the lines have been split
+            correctly is expensive and extremely unlikely to catch bugs so it
+            is not done at runtime unless check_content is True.)
+        :param parent_texts: An optional dictionary containing the opaque 
+            representations of some or all of the parents of version_id to
+            allow delta optimisations.  VERY IMPORTANT: the texts must be those
+            returned by add_lines or data corruption can be caused.
+        :param left_matching_blocks: a hint about which areas are common
+            between the text and its left-hand-parent.  The format is
+            the SequenceMatcher.get_matching_blocks format.
+        :param nostore_sha: Raise ExistingContent and do not add the lines to
+            the versioned file if the digest of the lines matches this.
+        :param random_id: If True a random id has been selected rather than
+            an id determined by some deterministic process such as a converter
+            from a foreign VCS. When True the backend may choose not to check
+            for uniqueness of the resulting key within the versioned file, so
+            this should only be done when the result is expected to be unique
+            anyway.
+        :param check_content: If True, the lines supplied are verified to be
+            bytestrings that are correctly formed lines.
+        :return: The text sha1, the number of bytes in the text, and an opaque
+                 representation of the inserted version which can be provided
+                 back to future add_lines calls in the parent_texts dictionary.
+        """
+        # self._index._check_write_ok()
+        self._check_add(key, lines, random_id, check_content)
+        if parents is None:
+            # The caller might pass None if there is no graph data, but kndx
+            # indexes can't directly store that, so we give them
+            # an empty tuple instead.
+            parents = ()
+        # double handling for now. Make it work until then.
+        length = sum(map(len, lines))
+        record = versionedfile.ChunkedContentFactory(key, parents, None, lines)
+        sha1 = list(self._insert_record_stream([record], random_id=random_id))[0]
+        return sha1, length, None
+
+    def annotate(self, key):
+        """See VersionedFiles.annotate."""
+        g = _mod_graph.Graph(self)
+        parent_map = self.get_parent_map([key])
+        if not parent_map:
+            raise errors.RevisionNotPresent(key, self)
+        if parent_map[key] is not None:
+            search = g._make_breadth_first_searcher([key])
+            keys = set()
+            while True:
+                try:
+                    present, ghosts = search.next_with_ghosts()
+                except StopIteration:
+                    break
+                keys.update(present)
+            parent_map = self.get_parent_map(keys)
+        else:
+            keys = [key]
+            parent_map = {key:()}
+        head_cache = _mod_graph.FrozenHeadsCache(g)
+        parent_cache = {}
+        reannotate = annotate.reannotate
+        for record in self.get_record_stream(keys, 'topological', True):
+            key = record.key
+            chunks = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
+            parent_lines = [parent_cache[parent] for parent in parent_map[key]]
+            parent_cache[key] = list(
+                reannotate(parent_lines, chunks, key, None, head_cache))
+        return parent_cache[key]
+
+    def _check_add(self, key, lines, random_id, check_content):
+        """check that version_id and lines are safe to add."""
+        # XXX: Can we just use the base implementation?
+        version_id = key[-1]
+        if version_id is not None:
+            if osutils.contains_whitespace(version_id):
+                raise InvalidRevisionId(version_id, self)
+        self.check_not_reserved_id(version_id)
+        # TODO: If random_id==False and the key is already present, we should
+        # probably check that the existing content is identical to what is
+        # being inserted, and otherwise raise an exception.  This would make
+        # the bundle code simpler.
+        if check_content:
+            self._check_lines_not_unicode(lines)
+            self._check_lines_are_lines(lines)
+
+    def check(self, progress_bar=None):
+        """See VersionedFiles.check()."""
+        keys = self.keys()
+        for record in self.get_record_stream(keys, 'unordered', True):
+            record.get_bytes_as('fulltext')
+
+    def get_parent_map(self, keys):
+        """Get a map of the parents of keys.
+
+        :param keys: The keys to look up parents for.
+        :return: A mapping from keys to parents. Absent keys are absent from
+            the mapping.
+        """
+        result = {}
+        sources = [self._index]
+        source_results = []
+        missing = set(keys)
+        for source in sources:
+            if not missing:
+                break
+            new_result = source.get_parent_map(missing)
+            source_results.append(new_result)
+            result.update(new_result)
+            missing.difference_update(set(new_result))
+        # if self._unadded_refs:
+        #     for key in missing:
+        #         if key in self._unadded_refs:
+        #             result[key] = self._unadded_refs[key]
+        return result
+
+    def get_record_stream(self, keys, ordering, include_delta_closure):
+        """Get a stream of records for keys.
+
+        :param keys: The keys to include.
+        :param ordering: Either 'unordered' or 'topological'. A topologically
+            sorted stream has compression parents strictly before their
+            children.
+        :param include_delta_closure: If True then the closure across any
+            compression parents will be included (in the opaque data).
+        :return: An iterator of ContentFactory objects, each of which is only
+            valid until the iterator is advanced.
+        """
+        # keys might be a generator
+        keys = set(keys)
+        if not keys:
+            return
+        if not self._index.has_graph:
+            # Cannot topological order when no graph has been stored.
+            ordering = 'unordered'
+        # Cheap: iterate
+        locations = self._index.get_build_details(keys)
+        # local_keys = frozenset(keys).intersection(set(self._unadded_refs))
+        # locations.update((key, None) for key in local_keys)
+        if ordering == 'topological':
+            # would be better to not globally sort initially but instead
+            # start with one key, recurse to its oldest parent, then grab
+            # everything in the same group, etc.
+            parent_map = dict((key, details[2]) for key, details in
+                locations.iteritems())
+            # for key in local_keys:
+            #     parent_map[key] = self._unadded_refs[key]
+            present_keys = topo_sort(parent_map)
+            # Now group by source:
+        else:
+            # Group by the index memo
+            def get_index_memo(key):
+                # This is the group the bytes are stored in, followed by the
+                # location in the group
+                return locations[key][0]
+            present_keys = sorted(locations.iterkeys(), key=get_index_memo)
+            # We don't have an ordering for keys in the in-memory object, but
+            # lets process the in-memory ones first.
+            # local = list(local_keys)
+            # present_keys = list(local_keys) + present_keys
+        absent_keys = keys.difference(set(locations))
+        for key in absent_keys:
+            yield versionedfile.AbsentContentFactory(key)
+        for key in present_keys:
+            # if key in self._unadded_refs:
+            #     lines, sha1 = self._compressor.extract(key)
+            #     parents = self._unadded_refs[key]
+            # else:
+            index_memo, _, parents, (method, _) = locations[key]
+            read_memo = index_memo[0:3]
+            enc_data = self._access.get_raw_records([read_memo]).next()
+            total_bytes_length = index_memo[3]
+            bytes = xd3.decode_memory(enc_data, total_bytes_length, source=None)
+            label, sha1, text_bytes = _parse(bytes)
+            if not bytes.startswith('xdelta3\n'):
+                raise errors.KnitCorrupt("invalid header bytes: %r"
+                    % (bytes[:20],))
+            if label != key:
+                raise AssertionError("wrong key: %r, wanted %r" % (label, key))
+            if osutils.sha_string(text_bytes) != sha1:
+                raise AssertionError('sha1 sum did not match')
+            yield versionedfile.FulltextContentFactory(key, parents, sha1,
+                                                       text_bytes)
+
+    def get_sha1s(self, keys):
+        """See VersionedFiles.get_sha1s()."""
+        result = {}
+        for record in self.get_record_stream(keys, 'unordered', True):
+            if record.sha1 != None:
+                result[record.key] = record.sha1
+            else:
+                if record.storage_kind != 'absent':
+                    result[record.key] == sha_string(record.get_bytes_as(
+                        'fulltext'))
+        return result
+
+    def insert_record_stream(self, stream):
+        """Insert a record stream into this container.
+
+        :param stream: A stream of records to insert. 
+        :return: None
+        :seealso VersionedFiles.get_record_stream:
+        """
+        for _ in self._insert_record_stream(stream):
+            pass
+
+    def _insert_record_stream(self, stream, random_id=False):
+        """Internal core to insert a record stream into this container.
+
+        This helper function has a different interface than insert_record_stream
+        to allow add_lines to be minimal, but still return the needed data.
+
+        :param stream: A stream of records to insert.
+        :return: An iterator over the sha1 of the inserted records.
+        :seealso insert_record_stream:
+        :seealso add_lines:
+        """
+        def get_adapter(adapter_key):
+            try:
+                return adapters[adapter_key]
+            except KeyError:
+                adapter_factory = versionedfile.adapter_registry.get(
+                                        adapter_key)
+                adapter = adapter_factory(self)
+                adapters[adapter_key] = adapter
+                return adapter
+        adapters = {}
+
+        for record in stream:
+            if record.storage_kind == 'absent':
+                raise errors.RevisionNotPresent([record.key], self)
+            elif record.storage_kind in ('chunked', 'fulltext'):
+                bytes = record.get_bytes_as('fulltext')
+            else:
+                adapter_key = record.storage_kind, 'fulltext'
+                adapter = get_adapter(adapter_key)
+                bytes = adapter.get_bytes(record)
+            sha1 = osutils.sha_string(bytes)
+            header = 'xdelta3\nlabel: %s\nsha1: %s\n' % (
+                '\x00'.join(record.key), sha1)
+            total_bytes = header + bytes
+            total_bytes_length = len(total_bytes)
+            # XXX: Eventually we'll want to include 'source'
+            comp_bytes = xd3.encode_memory(total_bytes, source=None,
+                                           flags=xd3.SEC_DJW)
+            index, start, length = self._access.add_raw_records(
+                [(None, len(comp_bytes))], comp_bytes)[0]
+            # For now, we have no compression parents
+            nodes = [(record.key, "%d %d %d" % (start, length,
+                                                total_bytes_length),
+                      (record.parents, ()))]
+            self._index.add_records(nodes)
+            yield sha1
+
+    def iter_lines_added_or_present_in_keys(self, keys, pb=None):
+        """Iterate over the lines in the versioned files from keys.
+
+        This may return lines from other keys. Each item the returned
+        iterator yields is a tuple of a line and a text version that that line
+        is present in (not introduced in).
+
+        Ordering of results is in whatever order is most suitable for the
+        underlying storage format.
+
+        If a progress bar is supplied, it may be used to indicate progress.
+        The caller is responsible for cleaning up progress bars (because this
+        is an iterator).
+
+        NOTES:
+         * Lines are normalised by the underlying store: they will all have \n
+           terminators.
+         * Lines are returned in arbitrary order.
+
+        :return: An iterator over (line, key).
+        """
+        if pb is None:
+            pb = progress.DummyProgress()
+        keys = set(keys)
+        total = len(keys)
+        # we don't care about inclusions, the caller cares.
+        # but we need to setup a list of records to visit.
+        # we need key, position, length
+        for key_idx, record in enumerate(self.get_record_stream(keys,
+            'unordered', True)):
+            # XXX: todo - optimise to use less than full texts.
+            key = record.key
+            pb.update('Walking content.', key_idx, total)
+            if record.storage_kind == 'absent':
+                raise errors.RevisionNotPresent(record.key, self)
+            lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
+            for line in lines:
+                yield line, key
+        pb.update('Walking content.', total, total)
+
+    def keys(self):
+        """See VersionedFiles.keys."""
+        if 'evil' in debug.debug_flags:
+            trace.mutter_callsite(2, "keys scales with size of history")
+        sources = [self._index]
+        result = set()
+        for source in sources:
+            result.update(source.keys())
+        return result
+
+
+def make_pack_factory(graph, delta, keylength):
+    """Create a factory for creating a pack based xdelta compression.
+
+    This is only functional enough to run interface tests, it doesn't try to
+    provide a full pack environment.
+
+    :param graph: Store a graph.
+    :param delta: Delta compress contents.
+    :param keylength: How long should keys be.
+    """
+    def factory(transport):
+        parents = graph or delta
+        ref_length = 0
+        if graph:
+            ref_length += 1
+        graph_index = btree_index.BTreeBuilder(reference_lists=ref_length,
+            key_elements=keylength)
+        stream = transport.open_write_stream('newpack')
+        writer = pack.ContainerWriter(stream.write)
+        writer.begin()
+        # index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
+        #     add_callback=graph_index.add_nodes)
+        index = _XDeltaIndex(graph_index, lambda:True, parents=parents,
+                             add_callback=graph_index.add_nodes)
+        access = knit._DirectPackAccess({})
+        access.set_writer(writer, graph_index, (transport, 'newpack'))
+        result = XDeltaVersionedFiles(index, access, delta)
+        result.stream = stream
+        result.writer = writer
+        return result
+    return factory
+
+
+def cleanup_pack_group(versioned_files):
+    versioned_files.writer.end()
+    versioned_files.stream.close()
 



More information about the bazaar-commits mailing list