Rev 5: nograph tests completely passing. in http://people.ubuntu.com/~robertc/baz2.0/plugins/groupcompress/trunk
Robert Collins
robertc at robertcollins.net
Tue Jul 8 05:51:02 BST 2008
At http://people.ubuntu.com/~robertc/baz2.0/plugins/groupcompress/trunk
------------------------------------------------------------
revno: 5
revision-id: robertc at robertcollins.net-20080708045101-i45nqjdifdquuyhc
parent: robertc at robertcollins.net-20080707085630-wjx40bv0izfau7gk
committer: Robert Collins <robertc at robertcollins.net>
branch nick: trunk
timestamp: Tue 2008-07-08 14:51:01 +1000
message:
nograph tests completely passing.
modified:
groupcompress.py groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
=== modified file 'groupcompress.py'
--- a/groupcompress.py 2008-07-07 08:56:30 +0000
+++ b/groupcompress.py 2008-07-08 04:51:01 +0000
@@ -17,9 +17,14 @@
"""Core compression logic for compressing streams of related files."""
+from cStringIO import StringIO
+import zlib
+
from bzrlib import (
annotate,
+ debug,
diff,
+ errors,
graph as _mod_graph,
pack,
patiencediff,
@@ -35,16 +40,24 @@
)
from bzrlib.plugins.index2.repofmt import InMemoryBTree
from bzrlib.versionedfile import (
+ adapter_registry,
+ AbsentContentFactory,
FulltextContentFactory,
VersionedFiles,
)
-def parse(lines):
+def parse(line_list):
result = []
- lines = iter(lines)
+ lines = iter(line_list)
next = lines.next
- print next(), next()
+ label_line = lines.next()
+ sha1_line = lines.next()
+ if (not label_line.startswith('label: ') or
+ not sha1_line.startswith('sha1: ')):
+ raise AssertionError("bad text record %r" % lines)
+ label = tuple(label_line[7:-1].split('\x00'))
+ sha1 = sha1_line[6:-1]
for header in lines:
op = header[0]
numbers = header[2:]
@@ -54,7 +67,7 @@
else:
contents = [next() for i in xrange(numbers[0])]
result.append((op, None, numbers[0], contents))
- return result
+ return label, sha1, result
def apply_delta(basis, delta):
"""Apply delta to this object to become new_version_id."""
@@ -295,22 +308,29 @@
# double handling for now. Make it work until then.
bytes = ''.join(lines)
record = FulltextContentFactory(key, parents, None, bytes)
- sha1 = self._insert_record_stream([record]).next()
+ sha1 = list(self._insert_record_stream([record], random_id=random_id))[0]
return sha1, len(bytes), None
def annotate(self, key):
"""See VersionedFiles.annotate."""
graph = Graph(self)
+ parent_map = self.get_parent_map([key])
+ if not parent_map:
+ raise errors.RevisionNotPresent(key, self)
+ if parent_map[key] is not None:
+ search = graph._make_breadth_first_searcher([key])
+ keys = set()
+ while True:
+ try:
+ present, ghosts = search.next_with_ghosts()
+ except StopIteration:
+ break
+ keys.update(present)
+ parent_map = self.get_parent_map(keys)
+ else:
+ keys = [key]
+ parent_map = {key:()}
head_cache = _mod_graph.FrozenHeadsCache(graph)
- search = graph._make_breadth_first_searcher([key])
- keys = set()
- while True:
- try:
- present, ghosts = search.next_with_ghosts()
- except StopIteration:
- break
- keys.update(present)
- parent_map = self.get_parent_map(keys)
parent_cache = {}
reannotate = annotate.reannotate
for record in self.get_record_stream(keys, 'topological', True):
@@ -321,6 +341,12 @@
reannotate(parent_lines, fulltext, key, None, head_cache))
return parent_cache[key]
+ def check(self, progress_bar=None):
+ """See VersionedFiles.check()."""
+ keys = self.keys()
+ for record in self.get_record_stream(keys, 'unordered', True):
+ record.get_bytes_as('fulltext')
+
def _check_add(self, key, lines, random_id, check_content):
"""check that version_id and lines are safe to add."""
version_id = key[-1]
@@ -335,6 +361,91 @@
self._check_lines_not_unicode(lines)
self._check_lines_are_lines(lines)
+ def get_parent_map(self, keys):
+ """Get a map of the parents of keys.
+
+ :param keys: The keys to look up parents for.
+ :return: A mapping from keys to parents. Absent keys are absent from
+ the mapping.
+ """
+ result = {}
+ sources = [self._index]
+ source_results = []
+ missing = set(keys)
+ for source in sources:
+ if not missing:
+ break
+ new_result = source.get_parent_map(missing)
+ source_results.append(new_result)
+ result.update(new_result)
+ missing.difference_update(set(new_result))
+ return result
+
+ def get_record_stream(self, keys, ordering, include_delta_closure):
+ """Get a stream of records for keys.
+
+ :param keys: The keys to include.
+ :param ordering: Either 'unordered' or 'topological'. A topologically
+ sorted stream has compression parents strictly before their
+ children.
+ :param include_delta_closure: If True then the closure across any
+ compression parents will be included (in the opaque data).
+ :return: An iterator of ContentFactory objects, each of which is only
+ valid until the iterator is advanced.
+ """
+ # keys might be a generator
+ keys = set(keys)
+ if not keys:
+ return
+ if not self._index.has_graph:
+ # Cannot topological order when no graph has been stored.
+ ordering = 'unordered'
+ # Cheap: iterate
+ locations = self._index.get_build_details(keys)
+ if ordering == 'topological':
+ # would be better to not globally sort initially but instead
+ # start with one key, recurse to its oldest parent, then grab
+ # everything in the same group, etc.
+ parent_map = dict((key, details[2]) for key, details in
+ locations.iteritems())
+ present_keys = topo_sort(parent_map)
+ # Now group by source:
+ else:
+ present_keys = locations.keys()
+ absent_keys = keys.difference(set(locations))
+ for key in absent_keys:
+ yield AbsentContentFactory(key)
+ for key in present_keys:
+ index_memo, _, parents, (method, _) = locations[key]
+ # read
+ read_memo = index_memo[0:3]
+ zdata = self._access.get_raw_records([read_memo]).next()
+ # decompress
+ plain = zlib.decompress(zdata)
+ # parse
+ delta_lines = split_lines(plain[index_memo[3]:index_memo[4]])
+ label, sha1, delta = parse(delta_lines)
+ if label != key:
+ raise AssertionError("wrong key: %r, wanted %r" % (label, key))
+ basis = plain[:index_memo[3]]
+ basis = StringIO(basis).readlines()
+ #basis = split_lines(plain[:last_end])
+ lines = apply_delta(basis, delta)
+ bytes = ''.join(lines)
+ yield FulltextContentFactory(key, parents, sha1, bytes)
+
+ def get_sha1s(self, keys):
+ """See VersionedFiles.get_sha1s()."""
+ result = {}
+ for record in self.get_record_stream(keys, 'unordered', True):
+ if record.sha1 != None:
+ result[record.key] = record.sha1
+ else:
+ if record.storage_kind != 'absent':
+ result[record.key] == sha_string(record.get_bytes_as(
+ 'fulltext'))
+ return result
+
def insert_record_stream(self, stream):
"""Insert a record stream into this container.
@@ -342,9 +453,10 @@
:return: None
:seealso VersionedFiles.get_record_stream:
"""
- self._insert_record_stream(stream)
+ for _ in self._insert_record_stream(stream):
+ pass
- def _insert_record_stream(self, stream):
+ def _insert_record_stream(self, stream, random_id=False):
"""Internal core to insert a record stream into this container.
This helper function has a different interface than insert_record_stream
@@ -355,13 +467,95 @@
:seealso insert_record_stream:
:seealso add_lines:
"""
+ def get_adapter(adapter_key):
+ try:
+ return adapters[adapter_key]
+ except KeyError:
+ adapter_factory = adapter_registry.get(adapter_key)
+ adapter = adapter_factory(self)
+ adapters[adapter_key] = adapter
+ return adapter
+ adapters = {}
compressor = GroupCompressor(self._delta)
# This will go up to fulltexts for gc to gc fetching, which isn't
# ideal.
+ keys_to_add = []
+ basis_end = 0
for record in stream:
+ # Raise an error when a record is missing.
+ if record.storage_kind == 'absent':
+ raise errors.RevisionNotPresent([record.key], self)
+ elif record.storage_kind == 'fulltext':
+ bytes = record.get_bytes_as('fulltext')
+ else:
+ adapter_key = record.storage_kind, 'fulltext'
+ adapter = get_adapter(adapter_key)
+ bytes = adapter.get_bytes(record,
+ record.get_bytes_as(record.storage_kind))
found_sha1, end_point = compressor.compress(record.key,
- split_lines(record.get_bytes_as('fulltext')), record.sha1)
+ split_lines(bytes), record.sha1)
yield found_sha1
+ keys_to_add.append((record.key, '%d %d' % (basis_end, end_point),
+ (record.parents,)))
+ basis_end = end_point
+ compressed = zlib.compress(''.join(compressor.lines))
+ index, start, length = self._access.add_raw_records(
+ [(None, len(compressed))], compressed)[0]
+ nodes = []
+ for key, reads, refs in keys_to_add:
+ nodes.append((key, "%d %d %s" % (start, length, reads), refs))
+ self._index.add_records(nodes, random_id=random_id)
+
+ def iter_lines_added_or_present_in_keys(self, keys, pb=None):
+ """Iterate over the lines in the versioned files from keys.
+
+ This may return lines from other keys. Each item the returned
+ iterator yields is a tuple of a line and a text version that that line
+ is present in (not introduced in).
+
+ Ordering of results is in whatever order is most suitable for the
+ underlying storage format.
+
+ If a progress bar is supplied, it may be used to indicate progress.
+ The caller is responsible for cleaning up progress bars (because this
+ is an iterator).
+
+ NOTES:
+ * Lines are normalised by the underlying store: they will all have \n
+ terminators.
+ * Lines are returned in arbitrary order.
+
+ :return: An iterator over (line, key).
+ """
+ if pb is None:
+ pb = progress.DummyProgress()
+ keys = set(keys)
+ total = len(keys)
+ # we don't care about inclusions, the caller cares.
+ # but we need to setup a list of records to visit.
+ # we need key, position, length
+ for key_idx, record in enumerate(self.get_record_stream(keys,
+ 'unordered', True)):
+ # XXX: todo - optimise to use less than full texts.
+ key = record.key
+ pb.update('Walking content.', key_idx, total)
+ if record.storage_kind == 'absent':
+ raise errors.RevisionNotPresent(record.key, self)
+ lines = split_lines(record.get_bytes_as('fulltext'))
+ for line in lines:
+ yield line, key
+ pb.update('Walking content.', total, total)
+
+ def keys(self):
+ """See VersionedFiles.keys."""
+ if 'evil' in debug.debug_flags:
+ trace.mutter_callsite(2, "keys scales with size of history")
+ sources = [self._index]
+ result = set()
+ for source in sources:
+ result.update(source.keys())
+ return result
+
class _GCGraphIndex(object):
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
@@ -389,13 +583,159 @@
if deltas and not parents:
# XXX: TODO: Delta tree and parent graph should be conceptually
# separate.
- raise KnitCorrupt(self, "Cannot do delta compression without "
+ raise errors.KnitCorrupt(self, "Cannot do delta compression without "
"parent tracking.")
self.has_graph = parents
self._is_locked = is_locked
+ def add_records(self, records, random_id=False):
+ """Add multiple records to the index.
+
+ This function does not insert data into the Immutable GraphIndex
+ backing the KnitGraphIndex, instead it prepares data for insertion by
+ the caller and checks that it is safe to insert then calls
+ self._add_callback with the prepared GraphIndex nodes.
+
+ :param records: a list of tuples:
+ (key, options, access_memo, parents).
+ :param random_id: If True the ids being added were randomly generated
+ and no check for existence will be performed.
+ """
+ if not self._add_callback:
+ raise errors.ReadOnlyError(self)
+ # we hope there are no repositories with inconsistent parentage
+ # anymore.
+
+ changed = False
+ keys = {}
+ for (key, value, refs) in records:
+ if not self._parents:
+ if refs:
+ for ref in refs:
+ if ref:
+ raise KnitCorrupt(self,
+ "attempt to add node with parents "
+ "in parentless index.")
+ refs = ()
+ changed = True
+ keys[key] = (value, refs)
+ # check for dups
+ if not random_id:
+ present_nodes = self._get_entries(keys)
+ for (index, key, value, node_refs) in present_nodes:
+ if node_refs != keys[key][1]:
+ raise errors.KnitCorrupt(self, "inconsistent details in add_records"
+ ": %s %s" % ((value, node_refs), keys[key]))
+ del keys[key]
+ changed = True
+ if changed:
+ result = []
+ if self._parents:
+ for key, (value, node_refs) in keys.iteritems():
+ result.append((key, value, node_refs))
+ else:
+ for key, (value, node_refs) in keys.iteritems():
+ result.append((key, value))
+ records = result
+ self._add_callback(records)
+
+ def _check_read(self):
+ """raise if reads are not permitted."""
+ if not self._is_locked():
+ raise errors.ObjectNotLocked(self)
+
def _check_write_ok(self):
"""Assert if writes are not permitted."""
if not self._is_locked():
raise errors.ObjectNotLocked(self)
+ def _get_entries(self, keys, check_present=False):
+ """Get the entries for keys.
+
+ :param keys: An iterable of index key tuples.
+ """
+ keys = set(keys)
+ found_keys = set()
+ if self._parents:
+ for node in self._graph_index.iter_entries(keys):
+ yield node
+ found_keys.add(node[1])
+ else:
+ # adapt parentless index to the rest of the code.
+ for node in self._graph_index.iter_entries(keys):
+ yield node[0], node[1], node[2], ()
+ found_keys.add(node[1])
+ if check_present:
+ missing_keys = keys.difference(found_keys)
+ if missing_keys:
+ raise RevisionNotPresent(missing_keys.pop(), self)
+
+ def get_parent_map(self, keys):
+ """Get a map of the parents of keys.
+
+ :param keys: The keys to look up parents for.
+ :return: A mapping from keys to parents. Absent keys are absent from
+ the mapping.
+ """
+ self._check_read()
+ nodes = self._get_entries(keys)
+ result = {}
+ if self._parents:
+ for node in nodes:
+ result[node[1]] = node[3][0]
+ else:
+ for node in nodes:
+ result[node[1]] = None
+ return result
+
+ def get_build_details(self, keys):
+ """Get the various build details for keys.
+
+ Ghosts are omitted from the result.
+
+ :param keys: An iterable of keys.
+ :return: A dict of key:
+ (index_memo, compression_parent, parents, record_details).
+ index_memo
+ opaque structure to pass to read_records to extract the raw
+ data
+ compression_parent
+ Content that this record is built upon, may be None
+ parents
+ Logical parents of this node
+ record_details
+ extra information about the content which needs to be passed to
+ Factory.parse_record
+ """
+ self._check_read()
+ result = {}
+ entries = self._get_entries(keys, False)
+ for entry in entries:
+ key = entry[1]
+ if not self._parents:
+ parents = None
+ else:
+ parents = entry[3][0]
+ value = entry[2]
+ method = 'group'
+ result[key] = (self._node_to_position(entry),
+ None, parents, (method, None))
+ return result
+
+ def keys(self):
+ """Get all the keys in the collection.
+
+ The keys are not ordered.
+ """
+ self._check_read()
+ return [node[1] for node in self._graph_index.iter_all_entries()]
+
+ def _node_to_position(self, node):
+ """Convert an index value to position details."""
+ bits = node[2].split(' ')
+ # It would be nice not to read the entire gzip.
+ start = int(bits[0])
+ stop = int(bits[1])
+ basis_end = int(bits[2])
+ delta_end = int(bits[3])
+ return node[0], start, stop, basis_end, delta_end
More information about the bazaar-commits
mailing list