Rev 2953: Partial refactoring of pack_repo to create a Packer object for packing. in http://people.ubuntu.com/~robertc/baz2.0/reconcile

Robert Collins robertc at robertcollins.net
Wed Oct 31 20:53:28 GMT 2007


At http://people.ubuntu.com/~robertc/baz2.0/reconcile

------------------------------------------------------------
revno: 2953
revision-id:robertc at robertcollins.net-20071031205323-pbtan7ltoci9slpq
parent: robertc at robertcollins.net-20071031165438-lc0o11v5vaf3p1ta
committer: Robert Collins <robertc at robertcollins.net>
branch nick: reconcile
timestamp: Thu 2007-11-01 07:53:23 +1100
message:
  Partial refactoring of pack_repo to create a Packer object for packing.
modified:
  bzrlib/reconcile.py            reweave_inventory.py-20051108164726-1e5e0934febac06e
  bzrlib/repofmt/pack_repo.py    pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
  bzrlib/repository.py           rev_storage.py-20051111201905-119e9401e46257e3
  bzrlib/tests/test_repository.py test_repository.py-20060131075918-65c555b881612f4d
=== modified file 'bzrlib/reconcile.py'
--- a/bzrlib/reconcile.py	2007-10-26 06:58:43 +0000
+++ b/bzrlib/reconcile.py	2007-10-31 20:53:23 +0000
@@ -467,3 +467,30 @@
 
     def _reconcile_steps(self):
         """Perform the steps to reconcile this repository."""
+        if not self.thorough:
+            return
+        self.repo.lock_write()
+        try:
+            self.repo._pack_collection.ensure_loaded()
+            self.repo._pack_collection.lock_names()
+            try:
+                self.repo.start_write_group()
+                try:
+                    self._new_pack = self.repo._pack_collection._new_pack
+                    self._copy_revisions()
+                except:
+                    self.repo.abort_write_group()
+                    raise
+                else:
+                    self.repo.commit_write_group()
+            finally:
+                self.repo._pack_collection._unlock_names()
+        finally:
+            self.repo.unlock()
+
+    def _copy_revisions(self):
+        """Copy revisions, regenerating the index as we go."""
+
+    def _pack_correcting_data(self):
+        """Perform a total pack, regenerating as much data as possible."""
+        revisions = self.repo.all_revision_ids()

=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py	2007-10-31 16:54:38 +0000
+++ b/bzrlib/repofmt/pack_repo.py	2007-10-31 20:53:23 +0000
@@ -475,126 +475,16 @@
             self.knit_access.set_writer(None, None, (None, None))
 
 
-class RepositoryPackCollection(object):
-    """Management of packs within a repository."""
-
-    def __init__(self, repo, transport, index_transport, upload_transport,
-                 pack_transport):
-        """Create a new RepositoryPackCollection.
-
-        :param transport: Addresses the repository base directory 
-            (typically .bzr/repository/).
-        :param index_transport: Addresses the directory containing indices.
-        :param upload_transport: Addresses the directory into which packs are written
-            while they're being created.
-        :param pack_transport: Addresses the directory of existing complete packs.
-        """
-        self.repo = repo
-        self.transport = transport
-        self._index_transport = index_transport
-        self._upload_transport = upload_transport
-        self._pack_transport = pack_transport
-        self._suffix_offsets = {'.rix': 0, '.iix': 1, '.tix': 2, '.six': 3}
-        self.packs = []
-        # name:Pack mapping
-        self._packs_by_name = {}
-        # the previous pack-names content
-        self._packs_at_load = None
-        # when a pack is being created by this object, the state of that pack.
-        self._new_pack = None
-        # aggregated revision index data
-        self.revision_index = AggregateIndex()
-        self.inventory_index = AggregateIndex()
-        self.text_index = AggregateIndex()
-        self.signature_index = AggregateIndex()
-
-    def add_pack_to_memory(self, pack):
-        """Make a Pack object available to the repository to satisfy queries.
-        
-        :param pack: A Pack object.
-        """
-        assert pack.name not in self._packs_by_name
-        self.packs.append(pack)
-        self._packs_by_name[pack.name] = pack
-        self.revision_index.add_index(pack.revision_index, pack)
-        self.inventory_index.add_index(pack.inventory_index, pack)
-        self.text_index.add_index(pack.text_index, pack)
-        self.signature_index.add_index(pack.signature_index, pack)
-        
-    def _add_text_to_weave(self, file_id, revision_id, new_lines, parents,
-        nostore_sha, random_revid):
-        file_id_index = GraphIndexPrefixAdapter(
-            self.text_index.combined_index,
-            (file_id, ), 1,
-            add_nodes_callback=self.text_index.add_callback)
-        self.repo._text_knit._index._graph_index = file_id_index
-        self.repo._text_knit._index._add_callback = file_id_index.add_nodes
-        return self.repo._text_knit.add_lines_with_ghosts(
-            revision_id, parents, new_lines, nostore_sha=nostore_sha,
-            random_id=random_revid, check_content=False)[0:2]
-
-    def all_packs(self):
-        """Return a list of all the Pack objects this repository has.
-
-        Note that an in-progress pack being created is not returned.
-
-        :return: A list of Pack objects for all the packs in the repository.
-        """
-        result = []
-        for name in self.names():
-            result.append(self.get_pack_by_name(name))
-        return result
-
-    def autopack(self):
-        """Pack the pack collection incrementally.
-        
-        This will not attempt global reorganisation or recompression,
-        rather it will just ensure that the total number of packs does
-        not grow without bound. It uses the _max_pack_count method to
-        determine if autopacking is needed, and the pack_distribution
-        method to determine the number of revisions in each pack.
-
-        If autopacking takes place then the packs name collection will have
-        been flushed to disk - packing requires updating the name collection
-        in synchronisation with certain steps. Otherwise the names collection
-        is not flushed.
-
-        :return: True if packing took place.
-        """
-        # XXX: Should not be needed when the management of indices is sane.
-        total_revisions = self.revision_index.combined_index.key_count()
-        total_packs = len(self._names)
-        if self._max_pack_count(total_revisions) >= total_packs:
-            return False
-        # XXX: the following may want to be a class, to pack with a given
-        # policy.
-        mutter('Auto-packing repository %s, which has %d pack files, '
-            'containing %d revisions into %d packs.', self, total_packs,
-            total_revisions, self._max_pack_count(total_revisions))
-        # determine which packs need changing
-        pack_distribution = self.pack_distribution(total_revisions)
-        existing_packs = []
-        for pack in self.all_packs():
-            revision_count = pack.get_revision_count()
-            if revision_count == 0:
-                # revision less packs are not generated by normal operation,
-                # only by operations like sign-my-commits, and thus will not
-                # tend to grow rapdily or without bound like commit containing
-                # packs do - leave them alone as packing them really should
-                # group their data with the relevant commit, and that may
-                # involve rewriting ancient history - which autopack tries to
-                # avoid. Alternatively we could not group the data but treat
-                # each of these as having a single revision, and thus add 
-                # one revision for each to the total revision count, to get
-                # a matching distribution.
-                continue
-            existing_packs.append((revision_count, pack))
-        pack_operations = self.plan_autopack_combinations(
-            existing_packs, pack_distribution)
-        self._execute_pack_operations(pack_operations)
-        return True
-
-    def create_pack_from_packs(self, packs, suffix, revision_ids=None):
+class Packer(object):
+    """Create a pack from packs."""
+
+    def __init__(self, pack_collection, packs, suffix, revision_ids=None):
+        self.packs = packs
+        self.suffix = suffix
+        self.revision_ids = revision_ids
+        self._pack_collection = pack_collection
+
+    def pack(self):
         """Create a new pack by reading data from other packs.
 
         This does little more than a bulk copy of data. One key difference
@@ -614,57 +504,61 @@
         # - which has already been flushed, so its safe.
         # XXX: - duplicate code warning with start_write_group; fix before
         #      considering 'done'.
-        if self._new_pack is not None:
+        if self._pack_collection._new_pack is not None:
             raise errors.BzrError('call to create_pack_from_packs while '
                 'another pack is being written.')
-        if revision_ids is not None:
-            if len(revision_ids) == 0:
+        if self.revision_ids is not None:
+            if len(self.revision_ids) == 0:
                 # silly fetch request.
                 return None
             else:
-                revision_ids = frozenset(revision_ids)
-        pb = ui.ui_factory.nested_progress_bar()
+                self.revision_ids = frozenset(self.revision_ids)
+        self.pb = ui.ui_factory.nested_progress_bar()
         try:
-            return self._create_pack_from_packs(packs, suffix, revision_ids,
-                pb)
+            return self._create_pack_from_packs()
         finally:
-            pb.finished()
-
-    def _create_pack_from_packs(self, packs, suffix, revision_ids, pb):
-        pb.update("Opening pack", 0, 5)
-        new_pack = NewPack(self._upload_transport, self._index_transport,
-            self._pack_transport, upload_suffix=suffix)
+            self.pb.finished()
+
+    def open_pack(self):
+        """Open a pack for the pack we are creating."""
+        return NewPack(self._pack_collection._upload_transport,
+            self._pack_collection._index_transport,
+            self._pack_collection._pack_transport, upload_suffix=self.suffix)
+
+    def _create_pack_from_packs(self):
+        self.pb.update("Opening pack", 0, 5)
+        new_pack = self.open_pack()
         # buffer data - we won't be reading-back during the pack creation and
         # this makes a significant difference on sftp pushes.
         new_pack.set_write_cache_size(1024*1024)
         if 'pack' in debug.debug_flags:
             plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
-                for a_pack in packs]
-            if revision_ids is not None:
-                rev_count = len(revision_ids)
+                for a_pack in self.packs]
+            if self.revision_ids is not None:
+                rev_count = len(self.revision_ids)
             else:
                 rev_count = 'all'
             mutter('%s: create_pack: creating pack from source packs: '
                 '%s%s %s revisions wanted %s t=0',
-                time.ctime(), self._upload_transport.base, new_pack.random_name,
+                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
                 plain_pack_list, rev_count)
         # select revisions
-        if revision_ids:
-            revision_keys = [(revision_id,) for revision_id in revision_ids]
+        if self.revision_ids:
+            revision_keys = [(revision_id,) for revision_id in self.revision_ids]
         else:
             revision_keys = None
 
         # select revision keys
-        revision_index_map = self._packs_list_to_pack_map_and_index_list(
-            packs, 'revision_index')[0]
-        revision_nodes = self._index_contents(revision_index_map, revision_keys)
+        revision_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
+            self.packs, 'revision_index')[0]
+        revision_nodes = self._pack_collection._index_contents(revision_index_map, revision_keys)
         # copy revision keys and adjust values
-        pb.update("Copying revision texts", 1)
+        self.pb.update("Copying revision texts", 1)
         list(self._copy_nodes_graph(revision_nodes, revision_index_map,
             new_pack._writer, new_pack.revision_index))
         if 'pack' in debug.debug_flags:
             mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
-                time.ctime(), self._upload_transport.base, new_pack.random_name,
+                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
                 new_pack.revision_index.key_count(),
                 time.time() - new_pack.start_time)
         # select inventory keys
@@ -672,18 +566,18 @@
         # querying for keys here could introduce a bug where an inventory item
         # is missed, so do not change it to query separately without cross
         # checking like the text key check below.
-        inventory_index_map = self._packs_list_to_pack_map_and_index_list(
-            packs, 'inventory_index')[0]
-        inv_nodes = self._index_contents(inventory_index_map, inv_keys)
+        inventory_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
+            self.packs, 'inventory_index')[0]
+        inv_nodes = self._pack_collection._index_contents(inventory_index_map, inv_keys)
         # copy inventory keys and adjust values
         # XXX: Should be a helper function to allow different inv representation
         # at this point.
-        pb.update("Copying inventory texts", 2)
+        self.pb.update("Copying inventory texts", 2)
         inv_lines = self._copy_nodes_graph(inv_nodes, inventory_index_map,
             new_pack._writer, new_pack.inventory_index, output_lines=True)
-        if revision_ids:
-            fileid_revisions = self.repo._find_file_ids_from_xml_inventory_lines(
-                inv_lines, revision_ids)
+        if self.revision_ids:
+            fileid_revisions = self._pack_collection.repo._find_file_ids_from_xml_inventory_lines(
+                inv_lines, self.revision_ids)
             text_filter = []
             for fileid, file_revids in fileid_revisions.iteritems():
                 text_filter.extend(
@@ -694,13 +588,13 @@
             text_filter = None
         if 'pack' in debug.debug_flags:
             mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
-                time.ctime(), self._upload_transport.base, new_pack.random_name,
+                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
                 new_pack.inventory_index.key_count(),
                 time.time() - new_pack.start_time)
         # select text keys
-        text_index_map = self._packs_list_to_pack_map_and_index_list(
-            packs, 'text_index')[0]
-        text_nodes = self._index_contents(text_index_map, text_filter)
+        text_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
+            self.packs, 'text_index')[0]
+        text_nodes = self._pack_collection._index_contents(text_index_map, text_filter)
         if text_filter is not None:
             # We could return the keys copied as part of the return value from
             # _copy_nodes_graph but this doesn't work all that well with the
@@ -718,37 +612,277 @@
                 raise errors.RevisionNotPresent(a_missing_key[1],
                     a_missing_key[0])
         # copy text keys and adjust values
-        pb.update("Copying content texts", 3)
+        self.pb.update("Copying content texts", 3)
         list(self._copy_nodes_graph(text_nodes, text_index_map,
             new_pack._writer, new_pack.text_index))
         if 'pack' in debug.debug_flags:
             mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
-                time.ctime(), self._upload_transport.base, new_pack.random_name,
+                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
                 new_pack.text_index.key_count(),
                 time.time() - new_pack.start_time)
         # select signature keys
         signature_filter = revision_keys # same keyspace
-        signature_index_map = self._packs_list_to_pack_map_and_index_list(
-            packs, 'signature_index')[0]
-        signature_nodes = self._index_contents(signature_index_map,
+        signature_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
+            self.packs, 'signature_index')[0]
+        signature_nodes = self._pack_collection._index_contents(signature_index_map,
             signature_filter)
         # copy signature keys and adjust values
-        pb.update("Copying signature texts", 4)
+        self.pb.update("Copying signature texts", 4)
         self._copy_nodes(signature_nodes, signature_index_map, new_pack._writer,
             new_pack.signature_index)
         if 'pack' in debug.debug_flags:
             mutter('%s: create_pack: revision signatures copied: %s%s %d items t+%6.3fs',
-                time.ctime(), self._upload_transport.base, new_pack.random_name,
+                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
                 new_pack.signature_index.key_count(),
                 time.time() - new_pack.start_time)
         if not new_pack.data_inserted():
             new_pack.abort()
             return None
-        pb.update("Finishing pack", 5)
+        self.pb.update("Finishing pack", 5)
         new_pack.finish()
-        self.allocate(new_pack)
+        self._pack_collection.allocate(new_pack)
         return new_pack
 
+    def _copy_nodes(self, nodes, index_map, writer, write_index):
+        """Copy knit nodes between packs with no graph references."""
+        pb = ui.ui_factory.nested_progress_bar()
+        try:
+            return self._do_copy_nodes(nodes, index_map, writer,
+                write_index, pb)
+        finally:
+            pb.finished()
+
+    def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb):
+        # for record verification
+        knit_data = _KnitData(None)
+        # plan a readv on each source pack:
+        # group by pack
+        nodes = sorted(nodes)
+        # how to map this into knit.py - or knit.py into this?
+        # we don't want the typical knit logic, we want grouping by pack
+        # at this point - perhaps a helper library for the following code 
+        # duplication points?
+        request_groups = {}
+        for index, key, value in nodes:
+            if index not in request_groups:
+                request_groups[index] = []
+            request_groups[index].append((key, value))
+        record_index = 0
+        pb.update("Copied record", record_index, len(nodes))
+        for index, items in request_groups.iteritems():
+            pack_readv_requests = []
+            for key, value in items:
+                # ---- KnitGraphIndex.get_position
+                bits = value[1:].split(' ')
+                offset, length = int(bits[0]), int(bits[1])
+                pack_readv_requests.append((offset, length, (key, value[0])))
+            # linear scan up the pack
+            pack_readv_requests.sort()
+            # copy the data
+            transport, path = index_map[index]
+            reader = pack.make_readv_reader(transport, path,
+                [offset[0:2] for offset in pack_readv_requests])
+            for (names, read_func), (_1, _2, (key, eol_flag)) in \
+                izip(reader.iter_records(), pack_readv_requests):
+                raw_data = read_func(None)
+                # check the header only
+                df, _ = knit_data._parse_record_header(key[-1], raw_data)
+                df.close()
+                pos, size = writer.add_bytes_record(raw_data, names)
+                write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
+                pb.update("Copied record", record_index)
+                record_index += 1
+
+    def _copy_nodes_graph(self, nodes, index_map, writer, write_index,
+        output_lines=False):
+        """Copy knit nodes between packs.
+
+        :param output_lines: Return lines present in the copied data as
+            an iterator.
+        """
+        pb = ui.ui_factory.nested_progress_bar()
+        try:
+            return self._do_copy_nodes_graph(nodes, index_map, writer,
+                write_index, output_lines, pb)
+        finally:
+            pb.finished()
+
+    def _do_copy_nodes_graph(self, nodes, index_map, writer, write_index,
+        output_lines, pb):
+        # for record verification
+        knit_data = _KnitData(None)
+        # for line extraction when requested (inventories only)
+        if output_lines:
+            factory = knit.KnitPlainFactory()
+        # plan a readv on each source pack:
+        # group by pack
+        nodes = sorted(nodes)
+        # how to map this into knit.py - or knit.py into this?
+        # we don't want the typical knit logic, we want grouping by pack
+        # at this point - perhaps a helper library for the following code 
+        # duplication points?
+        request_groups = {}
+        record_index = 0
+        pb.update("Copied record", record_index, len(nodes))
+        for index, key, value, references in nodes:
+            if index not in request_groups:
+                request_groups[index] = []
+            request_groups[index].append((key, value, references))
+        for index, items in request_groups.iteritems():
+            pack_readv_requests = []
+            for key, value, references in items:
+                # ---- KnitGraphIndex.get_position
+                bits = value[1:].split(' ')
+                offset, length = int(bits[0]), int(bits[1])
+                pack_readv_requests.append((offset, length, (key, value[0], references)))
+            # linear scan up the pack
+            pack_readv_requests.sort()
+            # copy the data
+            transport, path = index_map[index]
+            reader = pack.make_readv_reader(transport, path,
+                [offset[0:2] for offset in pack_readv_requests])
+            for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
+                izip(reader.iter_records(), pack_readv_requests):
+                raw_data = read_func(None)
+                if output_lines:
+                    # read the entire thing
+                    content, _ = knit_data._parse_record(key[-1], raw_data)
+                    if len(references[-1]) == 0:
+                        line_iterator = factory.get_fulltext_content(content)
+                    else:
+                        line_iterator = factory.get_linedelta_content(content)
+                    for line in line_iterator:
+                        yield line
+                else:
+                    # check the header only
+                    df, _ = knit_data._parse_record_header(key[-1], raw_data)
+                    df.close()
+                pos, size = writer.add_bytes_record(raw_data, names)
+                write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
+                pb.update("Copied record", record_index)
+                record_index += 1
+
+
+
+class RepositoryPackCollection(object):
+    """Management of packs within a repository."""
+
+    def __init__(self, repo, transport, index_transport, upload_transport,
+                 pack_transport):
+        """Create a new RepositoryPackCollection.
+
+        :param transport: Addresses the repository base directory 
+            (typically .bzr/repository/).
+        :param index_transport: Addresses the directory containing indices.
+        :param upload_transport: Addresses the directory into which packs are written
+            while they're being created.
+        :param pack_transport: Addresses the directory of existing complete packs.
+        """
+        self.repo = repo
+        self.transport = transport
+        self._index_transport = index_transport
+        self._upload_transport = upload_transport
+        self._pack_transport = pack_transport
+        self._suffix_offsets = {'.rix': 0, '.iix': 1, '.tix': 2, '.six': 3}
+        self.packs = []
+        # name:Pack mapping
+        self._packs_by_name = {}
+        # the previous pack-names content
+        self._packs_at_load = None
+        # when a pack is being created by this object, the state of that pack.
+        self._new_pack = None
+        # aggregated revision index data
+        self.revision_index = AggregateIndex()
+        self.inventory_index = AggregateIndex()
+        self.text_index = AggregateIndex()
+        self.signature_index = AggregateIndex()
+
+    def add_pack_to_memory(self, pack):
+        """Make a Pack object available to the repository to satisfy queries.
+        
+        :param pack: A Pack object.
+        """
+        assert pack.name not in self._packs_by_name
+        self.packs.append(pack)
+        self._packs_by_name[pack.name] = pack
+        self.revision_index.add_index(pack.revision_index, pack)
+        self.inventory_index.add_index(pack.inventory_index, pack)
+        self.text_index.add_index(pack.text_index, pack)
+        self.signature_index.add_index(pack.signature_index, pack)
+        
+    def _add_text_to_weave(self, file_id, revision_id, new_lines, parents,
+        nostore_sha, random_revid):
+        file_id_index = GraphIndexPrefixAdapter(
+            self.text_index.combined_index,
+            (file_id, ), 1,
+            add_nodes_callback=self.text_index.add_callback)
+        self.repo._text_knit._index._graph_index = file_id_index
+        self.repo._text_knit._index._add_callback = file_id_index.add_nodes
+        return self.repo._text_knit.add_lines_with_ghosts(
+            revision_id, parents, new_lines, nostore_sha=nostore_sha,
+            random_id=random_revid, check_content=False)[0:2]
+
+    def all_packs(self):
+        """Return a list of all the Pack objects this repository has.
+
+        Note that an in-progress pack being created is not returned.
+
+        :return: A list of Pack objects for all the packs in the repository.
+        """
+        result = []
+        for name in self.names():
+            result.append(self.get_pack_by_name(name))
+        return result
+
+    def autopack(self):
+        """Pack the pack collection incrementally.
+        
+        This will not attempt global reorganisation or recompression,
+        rather it will just ensure that the total number of packs does
+        not grow without bound. It uses the _max_pack_count method to
+        determine if autopacking is needed, and the pack_distribution
+        method to determine the number of revisions in each pack.
+
+        If autopacking takes place then the packs name collection will have
+        been flushed to disk - packing requires updating the name collection
+        in synchronisation with certain steps. Otherwise the names collection
+        is not flushed.
+
+        :return: True if packing took place.
+        """
+        # XXX: Should not be needed when the management of indices is sane.
+        total_revisions = self.revision_index.combined_index.key_count()
+        total_packs = len(self._names)
+        if self._max_pack_count(total_revisions) >= total_packs:
+            return False
+        # XXX: the following may want to be a class, to pack with a given
+        # policy.
+        mutter('Auto-packing repository %s, which has %d pack files, '
+            'containing %d revisions into %d packs.', self, total_packs,
+            total_revisions, self._max_pack_count(total_revisions))
+        # determine which packs need changing
+        pack_distribution = self.pack_distribution(total_revisions)
+        existing_packs = []
+        for pack in self.all_packs():
+            revision_count = pack.get_revision_count()
+            if revision_count == 0:
+                # revision less packs are not generated by normal operation,
+                # only by operations like sign-my-commits, and thus will not
+                # tend to grow rapdily or without bound like commit containing
+                # packs do - leave them alone as packing them really should
+                # group their data with the relevant commit, and that may
+                # involve rewriting ancient history - which autopack tries to
+                # avoid. Alternatively we could not group the data but treat
+                # each of these as having a single revision, and thus add 
+                # one revision for each to the total revision count, to get
+                # a matching distribution.
+                continue
+            existing_packs.append((revision_count, pack))
+        pack_operations = self.plan_autopack_combinations(
+            existing_packs, pack_distribution)
+        self._execute_pack_operations(pack_operations)
+        return True
+
     def _execute_pack_operations(self, pack_operations):
         """Execute a series of pack operations.
 
@@ -759,8 +893,7 @@
             # we may have no-ops from the setup logic
             if len(packs) == 0:
                 continue
-            # have a progress bar?
-            self.create_pack_from_packs(packs, '.autopack')
+            Packer(self, packs, '.autopack').pack()
             for pack in packs:
                 self._remove_pack_from_memory(pack)
         # record the newly available packs and stop advertising the old
@@ -841,125 +974,6 @@
         
         return pack_operations
 
-    def _copy_nodes(self, nodes, index_map, writer, write_index):
-        """Copy knit nodes between packs with no graph references."""
-        pb = ui.ui_factory.nested_progress_bar()
-        try:
-            return self._do_copy_nodes(nodes, index_map, writer,
-                write_index, pb)
-        finally:
-            pb.finished()
-
-    def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb):
-        # for record verification
-        knit_data = _KnitData(None)
-        # plan a readv on each source pack:
-        # group by pack
-        nodes = sorted(nodes)
-        # how to map this into knit.py - or knit.py into this?
-        # we don't want the typical knit logic, we want grouping by pack
-        # at this point - perhaps a helper library for the following code 
-        # duplication points?
-        request_groups = {}
-        for index, key, value in nodes:
-            if index not in request_groups:
-                request_groups[index] = []
-            request_groups[index].append((key, value))
-        record_index = 0
-        pb.update("Copied record", record_index, len(nodes))
-        for index, items in request_groups.iteritems():
-            pack_readv_requests = []
-            for key, value in items:
-                # ---- KnitGraphIndex.get_position
-                bits = value[1:].split(' ')
-                offset, length = int(bits[0]), int(bits[1])
-                pack_readv_requests.append((offset, length, (key, value[0])))
-            # linear scan up the pack
-            pack_readv_requests.sort()
-            # copy the data
-            transport, path = index_map[index]
-            reader = pack.make_readv_reader(transport, path,
-                [offset[0:2] for offset in pack_readv_requests])
-            for (names, read_func), (_1, _2, (key, eol_flag)) in \
-                izip(reader.iter_records(), pack_readv_requests):
-                raw_data = read_func(None)
-                # check the header only
-                df, _ = knit_data._parse_record_header(key[-1], raw_data)
-                df.close()
-                pos, size = writer.add_bytes_record(raw_data, names)
-                write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
-                pb.update("Copied record", record_index)
-                record_index += 1
-
-    def _copy_nodes_graph(self, nodes, index_map, writer, write_index,
-        output_lines=False):
-        """Copy knit nodes between packs.
-
-        :param output_lines: Return lines present in the copied data as
-            an iterator.
-        """
-        pb = ui.ui_factory.nested_progress_bar()
-        try:
-            return self._do_copy_nodes_graph(nodes, index_map, writer,
-                write_index, output_lines, pb)
-        finally:
-            pb.finished()
-
-    def _do_copy_nodes_graph(self, nodes, index_map, writer, write_index,
-        output_lines, pb):
-        # for record verification
-        knit_data = _KnitData(None)
-        # for line extraction when requested (inventories only)
-        if output_lines:
-            factory = knit.KnitPlainFactory()
-        # plan a readv on each source pack:
-        # group by pack
-        nodes = sorted(nodes)
-        # how to map this into knit.py - or knit.py into this?
-        # we don't want the typical knit logic, we want grouping by pack
-        # at this point - perhaps a helper library for the following code 
-        # duplication points?
-        request_groups = {}
-        record_index = 0
-        pb.update("Copied record", record_index, len(nodes))
-        for index, key, value, references in nodes:
-            if index not in request_groups:
-                request_groups[index] = []
-            request_groups[index].append((key, value, references))
-        for index, items in request_groups.iteritems():
-            pack_readv_requests = []
-            for key, value, references in items:
-                # ---- KnitGraphIndex.get_position
-                bits = value[1:].split(' ')
-                offset, length = int(bits[0]), int(bits[1])
-                pack_readv_requests.append((offset, length, (key, value[0], references)))
-            # linear scan up the pack
-            pack_readv_requests.sort()
-            # copy the data
-            transport, path = index_map[index]
-            reader = pack.make_readv_reader(transport, path,
-                [offset[0:2] for offset in pack_readv_requests])
-            for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
-                izip(reader.iter_records(), pack_readv_requests):
-                raw_data = read_func(None)
-                if output_lines:
-                    # read the entire thing
-                    content, _ = knit_data._parse_record(key[-1], raw_data)
-                    if len(references[-1]) == 0:
-                        line_iterator = factory.get_fulltext_content(content)
-                    else:
-                        line_iterator = factory.get_linedelta_content(content)
-                    for line in line_iterator:
-                        yield line
-                else:
-                    # check the header only
-                    df, _ = knit_data._parse_record_header(key[-1], raw_data)
-                    df.close()
-                pos, size = writer.add_bytes_record(raw_data, names)
-                write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
-                pb.update("Copied record", record_index)
-                record_index += 1
-
     def ensure_loaded(self):
         # NB: if you see an assertion error here, its probably access against
         # an unlocked repo. Naughty.
@@ -1483,6 +1497,42 @@
             return 'w'
         return 'r'
 
+    def _find_inconsistent_revision_parents(self):
+        """Find revisions with incorrectly cached parents.
+
+        :returns: an iterator yielding tuples of (revison-id, parents-in-index,
+            parents-in-revision).
+        """
+        assert self.is_locked()
+        pb = ui.ui_factory.nested_progress_bar()
+        try:
+            revision_nodes = self._pack_collection.revision_index \
+                .combined_index.iter_all_entries()
+            index_positions = []
+            # Get the cached index values for all revisions, and also the location
+            # in each index of the revision text so we can perform linear IO.
+            for index, key, value, refs in revision_nodes:
+                pos, length = value[1:].split(' ')
+                index_positions.append((index, int(pos), key[0],
+                    tuple(parent[0] for parent in refs[0])))
+                pb.update("Reading revision index.", 0, 0)
+            index_positions.sort()
+            total = len(index_positions) / 1000 + 1
+            for offset in xrange(total):
+                pb.update("Checking cached revision graph.", offset)
+                to_query = index_positions[offset * 1000:(offset + 1) * 1000]
+                if not to_query:
+                    break
+                rev_ids = [item[2] for item in to_query]
+                revs = self.get_revisions(rev_ids)
+                for revision, item in zip(revs, to_query):
+                    index_parents = item[3]
+                    rev_parents = tuple(revision.parent_ids)
+                    if index_parents != rev_parents:
+                        yield (revision.revision_id, index_parents, rev_parents)
+        finally:
+            pb.finished()
+
     def get_parents(self, revision_ids):
         """See StackedParentsProvider.get_parents.
         

=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py	2007-10-30 19:42:31 +0000
+++ b/bzrlib/repository.py	2007-10-31 20:53:23 +0000
@@ -2331,6 +2331,7 @@
     @needs_write_lock
     def fetch(self, revision_id=None, pb=None, find_ghosts=False):
         """See InterRepository.fetch()."""
+        from bzrlib.repofmt.pack_repo import Packer
         mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
                self.source, self.source._format, self.target, self.target._format)
         self.count_copied = 0
@@ -2358,9 +2359,8 @@
             except errors.NoSuchRevision:
                 raise errors.InstallFailed([revision_id])
         packs = self.source._pack_collection.all_packs()
-        pack = self.target._pack_collection.create_pack_from_packs(
-            packs, '.fetch', revision_ids,
-            )
+        pack = Packer(self.target._pack_collection, packs, '.fetch',
+            revision_ids).pack()
         if pack is not None:
             self.target._pack_collection._save_pack_names()
             # Trigger an autopack. This may duplicate effort as we've just done

=== modified file 'bzrlib/tests/test_repository.py'
--- a/bzrlib/tests/test_repository.py	2007-10-30 17:39:11 +0000
+++ b/bzrlib/tests/test_repository.py	2007-10-31 20:53:23 +0000
@@ -1387,3 +1387,7 @@
         self.assertEqual(20, len(pack.random_name))
         self.assertIsInstance(pack.random_name, str)
         self.assertIsInstance(pack.start_time, float)
+
+
+class TestPacker(TestCaseWithTransport):
+    """Tests for the packs repository Packer class."""



More information about the bazaar-commits mailing list