Rev 5: nograph tests completely passing. in http://people.ubuntu.com/~robertc/baz2.0/plugins/groupcompress/trunk

Robert Collins robertc at robertcollins.net
Tue Jul 8 05:51:02 BST 2008


At http://people.ubuntu.com/~robertc/baz2.0/plugins/groupcompress/trunk

------------------------------------------------------------
revno: 5
revision-id: robertc at robertcollins.net-20080708045101-i45nqjdifdquuyhc
parent: robertc at robertcollins.net-20080707085630-wjx40bv0izfau7gk
committer: Robert Collins <robertc at robertcollins.net>
branch nick: trunk
timestamp: Tue 2008-07-08 14:51:01 +1000
message:
  nograph tests completely passing.
modified:
  groupcompress.py               groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
=== modified file 'groupcompress.py'
--- a/groupcompress.py	2008-07-07 08:56:30 +0000
+++ b/groupcompress.py	2008-07-08 04:51:01 +0000
@@ -17,9 +17,14 @@
 
 """Core compression logic for compressing streams of related files."""
 
+from cStringIO import StringIO
+import zlib
+
 from bzrlib import (
     annotate,
+    debug,
     diff,
+    errors,
     graph as _mod_graph,
     pack,
     patiencediff,
@@ -35,16 +40,24 @@
     )
 from bzrlib.plugins.index2.repofmt import InMemoryBTree
 from bzrlib.versionedfile import (
+    adapter_registry,
+    AbsentContentFactory,
     FulltextContentFactory,
     VersionedFiles,
     )
 
 
-def parse(lines):
+def parse(line_list):
     result = []
-    lines = iter(lines)
+    lines = iter(line_list)
     next = lines.next
-    print next(), next()
+    label_line = lines.next()
+    sha1_line = lines.next()
+    if (not label_line.startswith('label: ') or
+        not sha1_line.startswith('sha1: ')):
+        raise AssertionError("bad text record %r" % lines)
+    label = tuple(label_line[7:-1].split('\x00'))
+    sha1 = sha1_line[6:-1]
     for header in lines:
         op = header[0]
         numbers = header[2:]
@@ -54,7 +67,7 @@
         else:
             contents = [next() for i in xrange(numbers[0])]
             result.append((op, None, numbers[0], contents))
-    return result
+    return label, sha1, result
 
 def apply_delta(basis, delta):
     """Apply delta to this object to become new_version_id."""
@@ -295,22 +308,29 @@
         # double handling for now. Make it work until then.
         bytes = ''.join(lines)
         record = FulltextContentFactory(key, parents, None, bytes)
-        sha1 = self._insert_record_stream([record]).next()
+        sha1 = list(self._insert_record_stream([record], random_id=random_id))[0]
         return sha1, len(bytes), None
 
     def annotate(self, key):
         """See VersionedFiles.annotate."""
         graph = Graph(self)
+        parent_map = self.get_parent_map([key])
+        if not parent_map:
+            raise errors.RevisionNotPresent(key, self)
+        if parent_map[key] is not None:
+            search = graph._make_breadth_first_searcher([key])
+            keys = set()
+            while True:
+                try:
+                    present, ghosts = search.next_with_ghosts()
+                except StopIteration:
+                    break
+                keys.update(present)
+            parent_map = self.get_parent_map(keys)
+        else:
+            keys = [key]
+            parent_map = {key:()}
         head_cache = _mod_graph.FrozenHeadsCache(graph)
-        search = graph._make_breadth_first_searcher([key])
-        keys = set()
-        while True:
-            try:
-                present, ghosts = search.next_with_ghosts()
-            except StopIteration:
-                break
-            keys.update(present)
-        parent_map = self.get_parent_map(keys)
         parent_cache = {}
         reannotate = annotate.reannotate
         for record in self.get_record_stream(keys, 'topological', True):
@@ -321,6 +341,12 @@
                 reannotate(parent_lines, fulltext, key, None, head_cache))
         return parent_cache[key]
 
+    def check(self, progress_bar=None):
+        """See VersionedFiles.check()."""
+        keys = self.keys()
+        for record in self.get_record_stream(keys, 'unordered', True):
+            record.get_bytes_as('fulltext')
+
     def _check_add(self, key, lines, random_id, check_content):
         """check that version_id and lines are safe to add."""
         version_id = key[-1]
@@ -335,6 +361,91 @@
             self._check_lines_not_unicode(lines)
             self._check_lines_are_lines(lines)
 
+    def get_parent_map(self, keys):
+        """Get a map of the parents of keys.
+
+        :param keys: The keys to look up parents for.
+        :return: A mapping from keys to parents. Absent keys are absent from
+            the mapping.
+        """
+        result = {}
+        sources = [self._index]
+        source_results = []
+        missing = set(keys)
+        for source in sources:
+            if not missing:
+                break
+            new_result = source.get_parent_map(missing)
+            source_results.append(new_result)
+            result.update(new_result)
+            missing.difference_update(set(new_result))
+        return result
+
+    def get_record_stream(self, keys, ordering, include_delta_closure):
+        """Get a stream of records for keys.
+
+        :param keys: The keys to include.
+        :param ordering: Either 'unordered' or 'topological'. A topologically
+            sorted stream has compression parents strictly before their
+            children.
+        :param include_delta_closure: If True then the closure across any
+            compression parents will be included (in the opaque data).
+        :return: An iterator of ContentFactory objects, each of which is only
+            valid until the iterator is advanced.
+        """
+        # keys might be a generator
+        keys = set(keys)
+        if not keys:
+            return
+        if not self._index.has_graph:
+            # Cannot topological order when no graph has been stored.
+            ordering = 'unordered'
+        # Cheap: iterate
+        locations = self._index.get_build_details(keys)
+        if ordering == 'topological':
+            # would be better to not globally sort initially but instead
+            # start with one key, recurse to its oldest parent, then grab
+            # everything in the same group, etc.
+            parent_map = dict((key, details[2]) for key, details in
+                locations.iteritems())
+            present_keys = topo_sort(parent_map)
+            # Now group by source:
+        else:
+            present_keys = locations.keys()
+        absent_keys = keys.difference(set(locations))
+        for key in absent_keys:
+            yield AbsentContentFactory(key)
+        for key in present_keys:
+            index_memo, _, parents, (method, _) = locations[key]
+            # read
+            read_memo = index_memo[0:3]
+            zdata = self._access.get_raw_records([read_memo]).next()
+            # decompress
+            plain = zlib.decompress(zdata)
+            # parse
+            delta_lines = split_lines(plain[index_memo[3]:index_memo[4]])
+            label, sha1, delta = parse(delta_lines)
+            if label != key:
+                raise AssertionError("wrong key: %r, wanted %r" % (label, key))
+            basis = plain[:index_memo[3]]
+            basis = StringIO(basis).readlines()
+            #basis = split_lines(plain[:last_end])
+            lines = apply_delta(basis, delta)
+            bytes = ''.join(lines)
+            yield FulltextContentFactory(key, parents, sha1, bytes)
+            
+    def get_sha1s(self, keys):
+        """See VersionedFiles.get_sha1s()."""
+        result = {}
+        for record in self.get_record_stream(keys, 'unordered', True):
+            if record.sha1 != None:
+                result[record.key] = record.sha1
+            else:
+                if record.storage_kind != 'absent':
+                    result[record.key] == sha_string(record.get_bytes_as(
+                        'fulltext'))
+        return result
+
     def insert_record_stream(self, stream):
         """Insert a record stream into this container.
 
@@ -342,9 +453,10 @@
         :return: None
         :seealso VersionedFiles.get_record_stream:
         """
-        self._insert_record_stream(stream)
+        for _ in self._insert_record_stream(stream):
+            pass
 
-    def _insert_record_stream(self, stream):
+    def _insert_record_stream(self, stream, random_id=False):
         """Internal core to insert a record stream into this container.
 
         This helper function has a different interface than insert_record_stream
@@ -355,13 +467,95 @@
         :seealso insert_record_stream:
         :seealso add_lines:
         """
+        def get_adapter(adapter_key):
+            try:
+                return adapters[adapter_key]
+            except KeyError:
+                adapter_factory = adapter_registry.get(adapter_key)
+                adapter = adapter_factory(self)
+                adapters[adapter_key] = adapter
+                return adapter
+        adapters = {}
         compressor = GroupCompressor(self._delta)
         # This will go up to fulltexts for gc to gc fetching, which isn't
         # ideal.
+        keys_to_add = []
+        basis_end = 0
         for record in stream:
+            # Raise an error when a record is missing.
+            if record.storage_kind == 'absent':
+                raise errors.RevisionNotPresent([record.key], self)
+            elif record.storage_kind == 'fulltext':
+                bytes = record.get_bytes_as('fulltext')
+            else:
+                adapter_key = record.storage_kind, 'fulltext'
+                adapter = get_adapter(adapter_key)
+                bytes = adapter.get_bytes(record,
+                    record.get_bytes_as(record.storage_kind))
             found_sha1, end_point = compressor.compress(record.key,
-                split_lines(record.get_bytes_as('fulltext')), record.sha1)
+                split_lines(bytes), record.sha1)
             yield found_sha1
+            keys_to_add.append((record.key, '%d %d' % (basis_end, end_point),
+                (record.parents,)))
+            basis_end = end_point
+        compressed = zlib.compress(''.join(compressor.lines))
+        index, start, length = self._access.add_raw_records(
+            [(None, len(compressed))], compressed)[0]
+        nodes = []
+        for key, reads, refs in keys_to_add:
+            nodes.append((key, "%d %d %s" % (start, length, reads), refs))
+        self._index.add_records(nodes, random_id=random_id)
+
+    def iter_lines_added_or_present_in_keys(self, keys, pb=None):
+        """Iterate over the lines in the versioned files from keys.
+
+        This may return lines from other keys. Each item the returned
+        iterator yields is a tuple of a line and a text version that that line
+        is present in (not introduced in).
+
+        Ordering of results is in whatever order is most suitable for the
+        underlying storage format.
+
+        If a progress bar is supplied, it may be used to indicate progress.
+        The caller is responsible for cleaning up progress bars (because this
+        is an iterator).
+
+        NOTES:
+         * Lines are normalised by the underlying store: they will all have \n
+           terminators.
+         * Lines are returned in arbitrary order.
+
+        :return: An iterator over (line, key).
+        """
+        if pb is None:
+            pb = progress.DummyProgress()
+        keys = set(keys)
+        total = len(keys)
+        # we don't care about inclusions, the caller cares.
+        # but we need to setup a list of records to visit.
+        # we need key, position, length
+        for key_idx, record in enumerate(self.get_record_stream(keys,
+            'unordered', True)):
+            # XXX: todo - optimise to use less than full texts.
+            key = record.key
+            pb.update('Walking content.', key_idx, total)
+            if record.storage_kind == 'absent':
+                raise errors.RevisionNotPresent(record.key, self)
+            lines = split_lines(record.get_bytes_as('fulltext'))
+            for line in lines:
+                yield line, key
+        pb.update('Walking content.', total, total)
+
+    def keys(self):
+        """See VersionedFiles.keys."""
+        if 'evil' in debug.debug_flags:
+            trace.mutter_callsite(2, "keys scales with size of history")
+        sources = [self._index]
+        result = set()
+        for source in sources:
+            result.update(source.keys())
+        return result
+
 
 class _GCGraphIndex(object):
     """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
@@ -389,13 +583,159 @@
         if deltas and not parents:
             # XXX: TODO: Delta tree and parent graph should be conceptually
             # separate.
-            raise KnitCorrupt(self, "Cannot do delta compression without "
+            raise errors.KnitCorrupt(self, "Cannot do delta compression without "
                 "parent tracking.")
         self.has_graph = parents
         self._is_locked = is_locked
 
+    def add_records(self, records, random_id=False):
+        """Add multiple records to the index.
+        
+        This function does not insert data into the Immutable GraphIndex
+        backing the KnitGraphIndex, instead it prepares data for insertion by
+        the caller and checks that it is safe to insert then calls
+        self._add_callback with the prepared GraphIndex nodes.
+
+        :param records: a list of tuples:
+                         (key, options, access_memo, parents).
+        :param random_id: If True the ids being added were randomly generated
+            and no check for existence will be performed.
+        """
+        if not self._add_callback:
+            raise errors.ReadOnlyError(self)
+        # we hope there are no repositories with inconsistent parentage
+        # anymore.
+
+        changed = False
+        keys = {}
+        for (key, value, refs) in records:
+            if not self._parents:
+                if refs:
+                    for ref in refs:
+                        if ref:
+                            raise KnitCorrupt(self,
+                                "attempt to add node with parents "
+                                "in parentless index.")
+                    refs = ()
+                    changed = True
+            keys[key] = (value, refs)
+        # check for dups
+        if not random_id:
+            present_nodes = self._get_entries(keys)
+            for (index, key, value, node_refs) in present_nodes:
+                if node_refs != keys[key][1]:
+                    raise errors.KnitCorrupt(self, "inconsistent details in add_records"
+                        ": %s %s" % ((value, node_refs), keys[key]))
+                del keys[key]
+                changed = True
+        if changed:
+            result = []
+            if self._parents:
+                for key, (value, node_refs) in keys.iteritems():
+                    result.append((key, value, node_refs))
+            else:
+                for key, (value, node_refs) in keys.iteritems():
+                    result.append((key, value))
+            records = result
+        self._add_callback(records)
+        
+    def _check_read(self):
+        """raise if reads are not permitted."""
+        if not self._is_locked():
+            raise errors.ObjectNotLocked(self)
+
     def _check_write_ok(self):
         """Assert if writes are not permitted."""
         if not self._is_locked():
             raise errors.ObjectNotLocked(self)
 
+    def _get_entries(self, keys, check_present=False):
+        """Get the entries for keys.
+        
+        :param keys: An iterable of index key tuples.
+        """
+        keys = set(keys)
+        found_keys = set()
+        if self._parents:
+            for node in self._graph_index.iter_entries(keys):
+                yield node
+                found_keys.add(node[1])
+        else:
+            # adapt parentless index to the rest of the code.
+            for node in self._graph_index.iter_entries(keys):
+                yield node[0], node[1], node[2], ()
+                found_keys.add(node[1])
+        if check_present:
+            missing_keys = keys.difference(found_keys)
+            if missing_keys:
+                raise RevisionNotPresent(missing_keys.pop(), self)
+
+    def get_parent_map(self, keys):
+        """Get a map of the parents of keys.
+
+        :param keys: The keys to look up parents for.
+        :return: A mapping from keys to parents. Absent keys are absent from
+            the mapping.
+        """
+        self._check_read()
+        nodes = self._get_entries(keys)
+        result = {}
+        if self._parents:
+            for node in nodes:
+                result[node[1]] = node[3][0]
+        else:
+            for node in nodes:
+                result[node[1]] = None
+        return result
+
+    def get_build_details(self, keys):
+        """Get the various build details for keys.
+
+        Ghosts are omitted from the result.
+
+        :param keys: An iterable of keys.
+        :return: A dict of key:
+            (index_memo, compression_parent, parents, record_details).
+            index_memo
+                opaque structure to pass to read_records to extract the raw
+                data
+            compression_parent
+                Content that this record is built upon, may be None
+            parents
+                Logical parents of this node
+            record_details
+                extra information about the content which needs to be passed to
+                Factory.parse_record
+        """
+        self._check_read()
+        result = {}
+        entries = self._get_entries(keys, False)
+        for entry in entries:
+            key = entry[1]
+            if not self._parents:
+                parents = None
+            else:
+                parents = entry[3][0]
+            value = entry[2]
+            method = 'group'
+            result[key] = (self._node_to_position(entry),
+                                  None, parents, (method, None))
+        return result
+    
+    def keys(self):
+        """Get all the keys in the collection.
+        
+        The keys are not ordered.
+        """
+        self._check_read()
+        return [node[1] for node in self._graph_index.iter_all_entries()]
+    
+    def _node_to_position(self, node):
+        """Convert an index value to position details."""
+        bits = node[2].split(' ')
+        # It would be nice not to read the entire gzip.
+        start = int(bits[0])
+        stop = int(bits[1])
+        basis_end = int(bits[2])
+        delta_end = int(bits[3])
+        return node[0], start, stop, basis_end, delta_end




More information about the bazaar-commits mailing list