Rev 4011: Support delta_closure=True with NetworkRecordStream to transmit deltas over the wire when full text extraction is required on the far end. in http://people.ubuntu.com/~robertc/baz2.0/versioned_files.network

Robert Collins robertc at robertcollins.net
Tue Feb 17 06:21:57 GMT 2009


At http://people.ubuntu.com/~robertc/baz2.0/versioned_files.network

------------------------------------------------------------
revno: 4011
revision-id: robertc at robertcollins.net-20090217062152-8j02ev4tjhaidj54
parent: robertc at robertcollins.net-20090216000547-jzy3qm5ssb5nw5wg
committer: Robert Collins <robertc at robertcollins.net>
branch nick: VersionedFiles.NetworkRecordStream
timestamp: Tue 2009-02-17 17:21:52 +1100
message:
  Support delta_closure=True with NetworkRecordStream to transmit deltas over the wire when full text extraction is required on the far end.
=== modified file 'bzrlib/knit.py'
--- a/bzrlib/knit.py	2009-02-15 22:46:41 +0000
+++ b/bzrlib/knit.py	2009-02-17 06:21:52 +0000
@@ -314,6 +314,61 @@
             self.storage_kind)
 
 
+class LazyKnitContentFactory(ContentFactory):
+    """A ContentFactory which can either generate full text or a wire form.
+
+    :seealso ContentFactory:
+    """
+
+    def __init__(self, key, parents, generator, first):
+        """Create a LazyKnitContentFactory.
+        
+        :param key: The key of the record.
+        :param parents: The parents of the record.
+        :param generator: A _ContentMapGenerator containing the record for this
+            key.
+        :param first: Is this the first content object returned from generator?
+            if it is, its storage kind is knit-delta-closure, otherwise it is
+            knit-delta-closure-ref
+        """
+        self.key = key
+        self.parents = parents
+        self.sha1 = None
+        self._generator = generator
+        self.storage_kind = "knit-delta-closure"
+        if not first:
+            self.storage_kind = self.storage_kind + "-ref"
+        self._first = first
+
+    def get_bytes_as(self, storage_kind):
+        if storage_kind == self.storage_kind:
+            if self._first:
+                return self._generator._wire_bytes()
+            else:
+                # all the keys etc are contained in the bytes returned in the
+                # first record.
+                return ''
+        if storage_kind in ('chunked', 'fulltext'):
+            chunks = self._generator._get_one_work(self.key).text()
+            if storage_kind == 'chunked':
+                return chunks
+            else:
+                return ''.join(chunks)
+        raise errors.UnavailableRepresentation(self.key, storage_kind,
+            self.storage_kind)
+
+
+def knit_delta_closure_to_records(storage_kind, bytes, line_end):
+    """Convert a network record to a iterator over stream records.
+
+    :param storage_kind: The storage kind of the record.
+        Must be 'knit-delta-closure'.
+    :param bytes: The bytes of the record on the network.
+    """
+    generator = _NetworkContentMapGenerator(bytes, line_end)
+    return generator.get_record_stream()
+
+
 def knit_network_to_record(storage_kind, bytes, line_end):
     """Convert a network record to a record object.
 
@@ -342,8 +397,8 @@
     start = start + 1
     raw_record = bytes[start:]
     annotated = 'annotated' in storage_kind
-    return KnitContentFactory(key, parents, build_details, None, raw_record,
-        annotated, network_bytes=bytes)
+    return [KnitContentFactory(key, parents, build_details, None, raw_record,
+        annotated, network_bytes=bytes)]
 
 
 class KnitContent(object):
@@ -1045,87 +1100,8 @@
             if not self.get_parent_map([key]):
                 raise RevisionNotPresent(key, self)
             return cached_version
-        text_map, contents_map = self._get_content_maps([key])
-        return contents_map[key]
-
-    def _get_content_maps(self, keys, nonlocal_keys=None):
-        """Produce maps of text and KnitContents
-        
-        :param keys: The keys to produce content maps for.
-        :param nonlocal_keys: An iterable of keys(possibly intersecting keys)
-            which are known to not be in this knit, but rather in one of the
-            fallback knits.
-        :return: (text_map, content_map) where text_map contains the texts for
-            the requested versions and content_map contains the KnitContents.
-        """
-        # FUTURE: This function could be improved for the 'extract many' case
-        # by tracking each component and only doing the copy when the number of
-        # children than need to apply delta's to it is > 1 or it is part of the
-        # final output.
-        keys = list(keys)
-        multiple_versions = len(keys) != 1
-        record_map = self._get_record_map(keys, allow_missing=True)
-
-        text_map = {}
-        content_map = {}
-        final_content = {}
-        if nonlocal_keys is None:
-            nonlocal_keys = set()
-        else:
-            nonlocal_keys = frozenset(nonlocal_keys)
-        missing_keys = set(nonlocal_keys)
-        for source in self._fallback_vfs:
-            if not missing_keys:
-                break
-            for record in source.get_record_stream(missing_keys,
-                'unordered', True):
-                if record.storage_kind == 'absent':
-                    continue
-                missing_keys.remove(record.key)
-                lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
-                text_map[record.key] = lines
-                content_map[record.key] = PlainKnitContent(lines, record.key)
-                if record.key in keys:
-                    final_content[record.key] = content_map[record.key]
-        for key in keys:
-            if key in nonlocal_keys:
-                # already handled
-                continue
-            components = []
-            cursor = key
-            while cursor is not None:
-                try:
-                    record, record_details, digest, next = record_map[cursor]
-                except KeyError:
-                    raise RevisionNotPresent(cursor, self)
-                components.append((cursor, record, record_details, digest))
-                cursor = next
-                if cursor in content_map:
-                    # no need to plan further back
-                    components.append((cursor, None, None, None))
-                    break
-
-            content = None
-            for (component_id, record, record_details,
-                 digest) in reversed(components):
-                if component_id in content_map:
-                    content = content_map[component_id]
-                else:
-                    content, delta = self._factory.parse_record(key[-1],
-                        record, record_details, content,
-                        copy_base_content=multiple_versions)
-                    if multiple_versions:
-                        content_map[component_id] = content
-
-            final_content[key] = content
-
-            # digest here is the digest from the last applied component.
-            text = content.text()
-            actual_sha = sha_strings(text)
-            if actual_sha != digest:
-                raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
-            text_map[key] = text
-        return text_map, final_content
+        generator = _VFContentMapGenerator(self, [key])
+        return generator._get_content(key)
 
     def get_parent_map(self, keys):
         """Get a map of the graph parents of keys.
@@ -1164,7 +1140,7 @@
         
         :return: {key:(record, record_details, digest, next)}
             record
-                data returned from read_records
+                data returned from read_records (a KnitContentobject)
             record_details
                 opaque information to pass to parse_record
             digest
@@ -1176,12 +1152,34 @@
         :param allow_missing: If some records are missing, rather than 
             error, just return the data that could be generated.
         """
+        raw_map = self._get_record_map_unparsed(keys,
+            allow_missing=allow_missing)
+        return self._raw_map_to_record_map(raw_map)
+
+    def _raw_map_to_record_map(self, raw_map):
+        """Parse the contents of _get_record_map_unparsed.
+        
+        :return: see _get_record_map.
+        """
+        result = {}
+        for key in raw_map:
+            data, record_details, next = raw_map[key]
+            content, digest = self._parse_record(key[-1], data)
+            result[key] = content, record_details, digest, next
+        return result
+
+    def _get_record_map_unparsed(self, keys, allow_missing=False):
+        """Get the raw data for reconstructing keys without parsing it.
+        
+        :return: A dict suitable for parsing via _raw_map_to_record_map.
+            key-> raw_bytes, (method, noeol), compression_parent
+        """
         # This retries the whole request if anything fails. Potentially we
         # could be a bit more selective. We could track the keys whose records
         # we have successfully found, and then only request the new records
         # from there. However, _get_components_positions grabs the whole build
         # chain, which means we'll likely try to grab the same records again
-        # anyway. Also, can the build chains change as part of a pack
+        # anyway. Also, can the build chains change as art of a pack
         # operation? We wouldn't want to end up with a broken chain.
         while True:
             try:
@@ -1191,11 +1189,11 @@
                 # n = next
                 records = [(key, i_m) for key, (r, i_m, n)
                                        in position_map.iteritems()]
-                record_map = {}
-                for key, record, digest in self._read_records_iter(records):
+                raw_record_map = {}
+                for key, data in self._read_records_iter_unchecked(records):
                     (record_details, index_memo, next) = position_map[key]
-                    record_map[key] = record, record_details, digest, next
-                return record_map
+                    raw_record_map[key] = data, record_details, next
+                return raw_record_map
             except errors.RetryWithNewPacks, e:
                 self._access.reload_or_raise(e)
 
@@ -1265,6 +1263,8 @@
         absent_keys = keys.difference(set(positions))
         # There may be more absent keys : if we're missing the basis component
         # and are trying to include the delta closure.
+        # XXX: We should not ever need to examine remote sources because we do
+        # not permit deltas across versioned files boundaries.
         if include_delta_closure:
             needed_from_fallback = set()
             # Build up reconstructable_keys dict.  key:True in this dict means
@@ -1347,11 +1347,10 @@
             for prefix, keys in prefix_split_keys.iteritems():
                 non_local = prefix_split_non_local_keys.get(prefix, [])
                 non_local = set(non_local)
-                text_map, _ = self._get_content_maps(keys, non_local)
-                for key in keys:
-                    lines = text_map.pop(key)
-                    yield ChunkedContentFactory(key, global_map[key], None,
-                                                lines)
+                generator = _VFContentMapGenerator(self, keys, non_local,
+                    global_map)
+                for record in generator.get_record_stream():
+                    yield record
         else:
             for source, keys in source_keys:
                 if source is parent_maps[0]:
@@ -1759,7 +1758,22 @@
         This unpacks enough of the text record to validate the id is
         as expected but thats all.
 
-        Each item the iterator yields is (key, bytes, sha1_of_full_text).
+        Each item the iterator yields is (key, bytes,
+            expected_sha1_of_full_text).
+        """
+        for key, data in self._read_records_iter_unchecked(records):
+            # validate the header (note that we can only use the suffix in
+            # current knit records).
+            df, rec = self._parse_record_header(key, data)
+            df.close()
+            yield key, data, rec[3]
+
+    def _read_records_iter_unchecked(self, records):
+        """Read text records from data file and yield raw data.
+
+        No validation is done.
+
+        Yields tuples of (key, data).
         """
         # setup an iterator of the external records:
         # uses readv so nice and fast we hope.
@@ -1771,11 +1785,7 @@
 
         for key, index_memo in records:
             data = raw_records.next()
-            # validate the header (note that we can only use the suffix in
-            # current knit records).
-            df, rec = self._parse_record_header(key, data)
-            df.close()
-            yield key, data, rec[3]
+            yield key, data
 
     def _record_to_data(self, key, digest, lines, dense_lines=None):
         """Convert key, digest, lines into a raw data block.
@@ -1825,6 +1835,314 @@
         return result
 
 
+class _ContentMapGenerator(object):
+    """Generate texts or expose raw deltas for a set of texts."""
+
+    def _get_content(self, key):
+        """Get the content object for key."""
+        if key in self.nonlocal_keys:
+            record = self.get_record_stream().next()
+            # Create a content object on the fly
+            lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
+            return PlainKnitContent(lines, record.key)
+        else:
+            # local keys we can ask for directly
+            return self._get_one_work(key)
+
+    def get_record_stream(self):
+        """Get a record stream for the keys requested during __init__."""
+        for record in self._work():
+            yield record
+
+    def _work(self):
+        """Produce maps of text and KnitContents as dicts.
+        
+        :return: (text_map, content_map) where text_map contains the texts for
+            the requested versions and content_map contains the KnitContents.
+        """
+        # NB: By definition we never need to read remote sources unless texts
+        # are requested from them: we don't delta across stores - and we
+        # explicitly do not want to to prevent data loss situations.
+        if self.global_map is None:
+            self.global_map = self.vf.get_parent_map(self.keys)
+        nonlocal_keys = self.nonlocal_keys
+
+        missing_keys = set(nonlocal_keys)
+        # Read from remote versioned file instances and provide to our caller.
+        for source in self.vf._fallback_vfs:
+            if not missing_keys:
+                break
+            # Loop over fallback repositories asking them for texts - ignore
+            # any missing from a particular fallback.
+            for record in source.get_record_stream(missing_keys,
+                'unordered', True):
+                if record.storage_kind == 'absent':
+                    # Not in thie particular stream, may be in one of the
+                    # other fallback vfs objects.
+                    continue
+                missing_keys.remove(record.key)
+                yield record
+
+        self._raw_record_map = self.vf._get_record_map_unparsed(self.keys,
+            allow_missing=True)
+        first = True
+        for key in self.keys:
+            if key in self.nonlocal_keys:
+                continue
+            yield LazyKnitContentFactory(key, self.global_map[key], self, first)
+            first = False
+
+    def _get_one_work(self, requested_key):
+        # Now, if we have calculated everything already, just return the
+        # desired text.
+        if requested_key in self._contents_map:
+            return self._contents_map[requested_key]
+        # To simply things, parse everything at once - code that wants one text
+        # probably wants them all.
+        # FUTURE: This function could be improved for the 'extract many' case
+        # by tracking each component and only doing the copy when the number of
+        # children than need to apply delta's to it is > 1 or it is part of the
+        # final output.
+        multiple_versions = len(self.keys) != 1
+        if self._record_map is None:
+            self._record_map = self.vf._raw_map_to_record_map(
+                self._raw_record_map)
+        record_map = self._record_map
+        # raw_record_map is key:
+        # Have read and parsed records at this point. 
+        for key in self.keys:
+            if key in self.nonlocal_keys:
+                # already handled
+                continue
+            components = []
+            cursor = key
+            while cursor is not None:
+                try:
+                    record, record_details, digest, next = record_map[cursor]
+                except KeyError:
+                    raise RevisionNotPresent(cursor, self)
+                components.append((cursor, record, record_details, digest))
+                cursor = next
+                if cursor in self._contents_map:
+                    # no need to plan further back
+                    components.append((cursor, None, None, None))
+                    break
+
+            content = None
+            for (component_id, record, record_details,
+                 digest) in reversed(components):
+                if component_id in self._contents_map:
+                    content = self._contents_map[component_id]
+                else:
+                    content, delta = self._factory.parse_record(key[-1],
+                        record, record_details, content,
+                        copy_base_content=multiple_versions)
+                    if multiple_versions:
+                        self._contents_map[component_id] = content
+
+            # digest here is the digest from the last applied component.
+            text = content.text()
+            actual_sha = sha_strings(text)
+            if actual_sha != digest:
+                raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
+        if multiple_versions:
+            return self._contents_map[requested_key]
+        else:
+            return content
+
+    def _wire_bytes(self):
+        """Get the bytes to put on the wire for 'key'.
+
+        The first collection of bytes asked for returns the serialised
+        raw_record_map and the additional details (key, parent) for key.
+        Subsequent calls return just the additional details (key, parent).
+        The wire storage_kind given for the first key is 'knit-delta-closure',
+        For subsequent keys it is 'knit-delta-closure-ref'.
+
+        :param key: A key from the content generator.
+        :return: Bytes to put on the wire.
+        """
+        lines = []
+        # kind marker for dispatch on the far side,
+        lines.append('knit-delta-closure')
+        # Annotated or not
+        if self.vf._factory.annotated:
+            lines.append('annotated')
+        else:
+            lines.append('')
+        # then the list of keys
+        lines.append('\t'.join(['\x00'.join(key) for key in self.keys
+            if key not in self.nonlocal_keys]))
+        # then the _raw_record_map in serialised form:
+        map_byte_list = []
+        # for each item in the map:
+        # 1 line with key
+        # 1 line with parents if the key is to be yielded (None: for None, '' for ())
+        # one line with method
+        # one line with noeol
+        # one line with next ('' for None)
+        # one line with byte count of the record bytes
+        # the record bytes
+        for key, (record_bytes, (method, noeol), next) in \
+            self._raw_record_map.iteritems():
+            key_bytes = '\x00'.join(key)
+            parents = self.global_map.get(key, None)
+            if parents is None:
+                parent_bytes = 'None:'
+            else:
+                parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
+            method_bytes = method
+            if noeol:
+                noeol_bytes = "T"
+            else:
+                noeol_bytes = "F"
+            if next:
+                next_bytes = '\x00'.join(next)
+            else:
+                next_bytes = ''
+            map_byte_list.append('%s\n%s\n%s\n%s\n%s\n%d\n%s' % (
+                key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
+                len(record_bytes), record_bytes))
+        map_bytes = ''.join(map_byte_list)
+        lines.append(map_bytes)
+        bytes = '\n'.join(lines)
+        return bytes
+
+
+class _VFContentMapGenerator(_ContentMapGenerator):
+    """Content map generator reading from a VersionedFiles object."""
+
+    def __init__(self, versioned_files, keys, nonlocal_keys=None,
+        global_map=None, raw_record_map=None):
+        """Create a _ContentMapGenerator.
+        
+        :param versioned_files: The versioned files that the texts are being
+            extracted from.
+        :param keys: The keys to produce content maps for.
+        :param nonlocal_keys: An iterable of keys(possibly intersecting keys)
+            which are known to not be in this knit, but rather in one of the
+            fallback knits.
+        :param global_map: The result of get_parent_map(keys) (or a supermap).
+            This is required if get_record_stream() is to be used.
+        :param raw_record_map: A unparsed raw record map to use for answering
+            contents.
+        """
+        # The vf to source data from
+        self.vf = versioned_files
+        # The keys desired
+        self.keys = list(keys)
+        # Keys known to be in fallback vfs objects
+        if nonlocal_keys is None:
+            self.nonlocal_keys = set()
+        else:
+            self.nonlocal_keys = frozenset(nonlocal_keys)
+        # Parents data for keys to be returned in get_record_stream
+        self.global_map = global_map
+        # The chunked lists for self.keys in text form
+        self._text_map = {}
+        # A cache of KnitContent objects used in extracting texts.
+        self._contents_map = {}
+        # All the knit records needed to assemble the requested keys as full
+        # texts.
+        self._record_map = None
+        if raw_record_map is None:
+            self._raw_record_map = self.vf._get_record_map_unparsed(keys,
+                allow_missing=True)
+        else:
+            self._raw_record_map = raw_record_map
+        # the factory for parsing records
+        self._factory = self.vf._factory
+
+
+class _NetworkContentMapGenerator(_ContentMapGenerator):
+    """Content map generator sourced from a network stream."""
+
+    def __init__(self, bytes, line_end):
+        """Construct a _NetworkContentMapGenerator from a bytes block."""
+        self._bytes = bytes
+        self.global_map = {}
+        self._raw_record_map = {}
+        self._contents_map = {}
+        self._record_map = None
+        self.nonlocal_keys = []
+        # Get access to record parsing facilities
+        self.vf = KnitVersionedFiles(None, None)
+        start = line_end
+        # Annotated or not
+        line_end = bytes.find('\n', start)
+        line = bytes[start:line_end]
+        start = line_end + 1
+        if line == 'annotated':
+            self._factory = KnitAnnotateFactory()
+        else:
+            self._factory = KnitPlainFactory()
+        # list of keys to emit in get_record_stream
+        line_end = bytes.find('\n', start)
+        line = bytes[start:line_end]
+        start = line_end + 1
+        self.keys = [
+            tuple(segment.split('\x00')) for segment in line.split('\t')
+            if segment]
+        # now a loop until the end. XXX: It would be nice if this was just a
+        # bunch of the same records as get_record_stream(..., False) gives, but
+        # there is a decent sized gap stopping that at the moment.
+        end = len(bytes)
+        while start < end:
+            # 1 line with key
+            line_end = bytes.find('\n', start)
+            key = tuple(bytes[start:line_end].split('\x00'))
+            start = line_end + 1
+            # 1 line with parents (None: for None, '' for ())
+            line_end = bytes.find('\n', start)
+            line = bytes[start:line_end]
+            if line == 'None:':
+                parents = None
+            else:
+                parents = tuple(
+                    [tuple(segment.split('\x00')) for segment in line.split('\t')
+                     if segment])
+            self.global_map[key] = parents
+            start = line_end + 1
+            # one line with method
+            line_end = bytes.find('\n', start)
+            line = bytes[start:line_end]
+            method = line
+            start = line_end + 1
+            # one line with noeol
+            line_end = bytes.find('\n', start)
+            line = bytes[start:line_end]
+            noeol = line == "T"
+            start = line_end + 1
+            # one line with next ('' for None)
+            line_end = bytes.find('\n', start)
+            line = bytes[start:line_end]
+            if not line:
+                next = None
+            else:
+                next = tuple(bytes[start:line_end].split('\x00'))
+            start = line_end + 1
+            # one line with byte count of the record bytes
+            line_end = bytes.find('\n', start)
+            line = bytes[start:line_end]
+            count = int(line)
+            start = line_end + 1
+            # the record bytes
+            record_bytes = bytes[start:start+count]
+            start = start + count
+            # put it in the map
+            self._raw_record_map[key] = (record_bytes, (method, noeol), next)
+
+    def get_record_stream(self):
+        """Get a record stream for for keys requested by the bytestream."""
+        first = True
+        for key in self.keys:
+            yield LazyKnitContentFactory(key, self.global_map[key], self, first)
+            first = False
+
+    def _wire_bytes(self):
+        return self._bytes
+
+
 class _KndxIndex(object):
     """Manages knit index files
 

=== modified file 'bzrlib/tests/test_knit.py'
--- a/bzrlib/tests/test_knit.py	2009-01-22 21:32:15 +0000
+++ b/bzrlib/tests/test_knit.py	2009-02-17 06:21:52 +0000
@@ -42,6 +42,7 @@
     KnitSequenceMatcher,
     KnitVersionedFiles,
     PlainKnitContent,
+    _VFContentMapGenerator,
     _DirectPackAccess,
     _KndxIndex,
     _KnitGraphIndex,
@@ -63,6 +64,7 @@
 from bzrlib.versionedfile import (
     AbsentContentFactory,
     ConstantMapper,
+    network_bytes_to_kind_and_offset,
     RecordingVersionedFilesDecorator,
     )
 
@@ -1296,7 +1298,7 @@
 class TestBadShaError(KnitTests):
     """Tests for handling of sha errors."""
 
-    def test_exception_has_text(self):
+    def test_sha_exception_has_text(self):
         # having the failed text included in the error allows for recovery.
         source = self.make_test_knit()
         target = self.make_test_knit(name="target")
@@ -1313,7 +1315,8 @@
         target.insert_record_stream(
             source.get_record_stream([broken], 'unordered', False))
         err = self.assertRaises(errors.KnitCorrupt,
-            target.get_record_stream([broken], 'unordered', True).next)
+            target.get_record_stream([broken], 'unordered', True
+            ).next().get_bytes_as, 'chunked')
         self.assertEqual(['gam\n', 'bar\n'], err.content)
         # Test for formatting with live data
         self.assertStartsWith(str(err), "Knit ")
@@ -1936,7 +1939,10 @@
                 True).next()
             self.assertEqual(record.key, result[0])
             self.assertEqual(record.sha1, result[1])
-            self.assertEqual(record.storage_kind, result[2])
+            # We used to check that the storage kind matched, but actually it
+            # depends on whether it was sourced from the basis, or in a single
+            # group, because asking for full texts returns proxy objects to a
+            # _ContentMapGenerator object; so checking the kind is unneeded.
             self.assertEqual(record.get_bytes_as('fulltext'), result[3])
         # It's not strictly minimal, but it seems reasonable for now for it to
         # ask which fallbacks have which parents.
@@ -2173,3 +2179,65 @@
         self.assertEqual(set([key_left, key_right]), set(last_call[1]))
         self.assertEqual('unordered', last_call[2])
         self.assertEqual(True, last_call[3])
+
+
+class TestNetworkBehaviour(KnitTests):
+    """Tests for getting data out of/into knits over the network."""
+
+    def test_include_delta_closure_generates_a_knit_delta_closure(self):
+        vf = self.make_test_knit(name='test')
+        # put in three texts, giving ft, delta, delta
+        vf.add_lines(('base',), (), ['base\n', 'content\n'])
+        vf.add_lines(('d1',), (('base',),), ['d1\n'])
+        vf.add_lines(('d2',), (('d1',),), ['d2\n'])
+        # But heuristics could interfere, so check what happened:
+        self.assertEqual(['knit-ft-gz', 'knit-delta-gz', 'knit-delta-gz'],
+            [record.storage_kind for record in
+             vf.get_record_stream([('base',), ('d1',), ('d2',)],
+                'topological', False)])
+        # generate a stream of just the deltas include_delta_closure=True,
+        # serialise to the network, and check that we get a delta closure on the wire.
+        stream = vf.get_record_stream([('d1',), ('d2',)], 'topological', True)
+        netb = [record.get_bytes_as(record.storage_kind) for record in stream]
+        # The first bytes should be a memo from _ContentMapGenerator, and the
+        # second bytes should be empty (because its a API proxy not something
+        # for wire serialisation.
+        self.assertEqual('', netb[1])
+        bytes = netb[0]
+        kind, line_end = network_bytes_to_kind_and_offset(bytes)
+        self.assertEqual('knit-delta-closure', kind)
+
+
+class TestContentMapGenerator(KnitTests):
+    """Tests for ContentMapGenerator"""
+
+    def test_get_record_stream_gives_records(self):
+        vf = self.make_test_knit(name='test')
+        # put in three texts, giving ft, delta, delta
+        vf.add_lines(('base',), (), ['base\n', 'content\n'])
+        vf.add_lines(('d1',), (('base',),), ['d1\n'])
+        vf.add_lines(('d2',), (('d1',),), ['d2\n'])
+        keys = [('d1',), ('d2',)]
+        generator = _VFContentMapGenerator(vf, keys,
+            global_map=vf.get_parent_map(keys))
+        for record in generator.get_record_stream():
+            if record.key == ('d1',):
+                self.assertEqual('d1\n', record.get_bytes_as('fulltext'))
+            else:
+                self.assertEqual('d2\n', record.get_bytes_as('fulltext'))
+
+    def test_get_record_stream_kinds_are_raw(self):
+        vf = self.make_test_knit(name='test')
+        # put in three texts, giving ft, delta, delta
+        vf.add_lines(('base',), (), ['base\n', 'content\n'])
+        vf.add_lines(('d1',), (('base',),), ['d1\n'])
+        vf.add_lines(('d2',), (('d1',),), ['d2\n'])
+        keys = [('base',), ('d1',), ('d2',)]
+        generator = _VFContentMapGenerator(vf, keys,
+            global_map=vf.get_parent_map(keys))
+        kinds = {('base',): 'knit-delta-closure',
+            ('d1',): 'knit-delta-closure-ref',
+            ('d2',): 'knit-delta-closure-ref',
+            }
+        for record in generator.get_record_stream():
+            self.assertEqual(kinds[record.key], record.storage_kind)

=== modified file 'bzrlib/tests/test_versionedfile.py'
--- a/bzrlib/tests/test_versionedfile.py	2009-02-16 00:05:47 +0000
+++ b/bzrlib/tests/test_versionedfile.py	2009-02-17 06:21:52 +0000
@@ -1559,7 +1559,8 @@
             ['mpdiff', 'knit-annotated-ft', 'knit-annotated-delta',
              'knit-ft', 'knit-delta', 'chunked', 'fulltext',
              'knit-annotated-ft-gz', 'knit-annotated-delta-gz', 'knit-ft-gz',
-             'knit-delta-gz'])
+             'knit-delta-gz',
+             'knit-delta-closure', 'knit-delta-closure-ref'])
 
     def capture_stream(self, f, entries, on_seen, parents):
         """Capture a stream for testing."""

=== modified file 'bzrlib/versionedfile.py'
--- a/bzrlib/versionedfile.py	2009-02-15 22:46:41 +0000
+++ b/bzrlib/versionedfile.py	2009-02-17 06:21:52 +0000
@@ -1502,6 +1502,7 @@
             'knit-delta-gz':knit.knit_network_to_record,
             'knit-annotated-ft-gz':knit.knit_network_to_record,
             'knit-annotated-delta-gz':knit.knit_network_to_record,
+            'knit-delta-closure':knit.knit_delta_closure_to_records,
             }
 
     def read(self):
@@ -1511,5 +1512,6 @@
         """
         for bytes in self._bytes_iterator:
             storage_kind, line_end = network_bytes_to_kind_and_offset(bytes)
-            yield self._kind_factory[storage_kind](
-                storage_kind, bytes, line_end)
+            for record in self._kind_factory[storage_kind](
+                storage_kind, bytes, line_end):
+                yield record




More information about the bazaar-commits mailing list