Rev 2716: Slightly broken, but branch and fetch performance is now roughly on par (for bzr.dev) with knits - should be much faster for large repos. in http://people.ubuntu.com/~robertc/baz2.0/repository

Robert Collins robertc at robertcollins.net
Tue Aug 14 08:18:07 BST 2007


At http://people.ubuntu.com/~robertc/baz2.0/repository

------------------------------------------------------------
revno: 2716
revision-id: robertc at robertcollins.net-20070814071804-71d340lob7qv3wep
parent: robertc at robertcollins.net-20070813045212-hzoaikwhwp4ojy29
committer: Robert Collins <robertc at robertcollins.net>
branch nick: repository
timestamp: Tue 2007-08-14 17:18:04 +1000
message:
  Slightly broken, but branch and fetch performance is now roughly on par (for bzr.dev) with knits - should be much faster for large repos.
modified:
  bzrlib/branch.py               branch.py-20050309040759-e4baf4e0d046576e
  bzrlib/repofmt/pack_repo.py    pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
  bzrlib/repository.py           rev_storage.py-20051111201905-119e9401e46257e3
=== modified file 'bzrlib/branch.py'
--- a/bzrlib/branch.py	2007-07-27 13:02:00 +0000
+++ b/bzrlib/branch.py	2007-08-14 07:18:04 +0000
@@ -2093,14 +2093,18 @@
         if revision_id is None:
             revno, revision_id = self.last_revision_info()
         else:
-            # To figure out the revno for a random revision, we need to build
-            # the revision history, and count its length.
-            # We don't care about the order, just how long it is.
-            # Alternatively, we could start at the current location, and count
-            # backwards. But there is no guarantee that we will find it since
-            # it may be a merged revision.
-            revno = len(list(self.repository.iter_reverse_revision_history(
-                                                                revision_id)))
+            # usually when revision_id is supplied it is the tip - try 
+            # for the fast_path
+            revno, tip_revision_id = self.last_revision_info()
+            if tip_revision_id != revision_id:
+                # To figure out the revno for a random revision, we need to
+                # build the revision history, and count its length.  We don't
+                # care about the order, just how long it is.  Alternatively, we
+                # could start at the current location, and count backwards. But
+                # there is no guarantee that we will find it since it may be a
+                # merged revision.
+                revno = len(list(self.repository.iter_reverse_revision_history(
+                                                                    revision_id)))
         destination.set_last_revision_info(revno, revision_id)
 
     def _make_tags(self):

=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py	2007-08-13 04:45:41 +0000
+++ b/bzrlib/repofmt/pack_repo.py	2007-08-14 07:18:04 +0000
@@ -22,6 +22,7 @@
 
 from bzrlib import (
         pack,
+        ui,
         )
 from bzrlib.index import (
     GraphIndex,
@@ -30,7 +31,7 @@
     CombinedGraphIndex,
     GraphIndexPrefixAdapter,
     )
-from bzrlib.knit import KnitGraphIndex, _PackAccess
+from bzrlib.knit import KnitGraphIndex, _PackAccess, _KnitData
 from bzrlib.pack import ContainerWriter
 from bzrlib.store import revision
 """)
@@ -59,11 +60,69 @@
 from bzrlib.trace import mutter, note, warning
 
 
+class Pack(object):
+    """An in memory proxy for a .pack and its indices."""
+
+    def __init__(self):
+        self.revision_index = None
+        self.inventory_index = None
+        self.text_index = None
+        self.signature_index = None
+        self.name = None
+        self.transport = None
+
+    def get_revision_count(self):
+        return len(list(self.revision_index.iter_all_entries()))
+
+
 class RepositoryPackCollection(object):
 
     def __init__(self, repo, transport):
         self.repo = repo
         self.transport = transport
+        self.packs = []
+
+    def add_pack_to_memory(self, pack):
+        """Make a Pack object available to the repository to satisfy queries.
+        
+        :param pack: A Pack object.
+        """
+        self.packs.append(pack)
+        if self.repo._revision_all_indices is None:
+            # to make this function more useful, perhaps we should make an
+            # all_indices object in future?
+            pass
+        else:
+            self.repo._revision_pack_map[pack.revision_index] = (
+                pack.transport, pack.name)
+            self.repo._revision_all_indices.insert_index(0, pack.revision_index)
+        if self.repo._inv_all_indices is not None:
+            # inv 'knit' has been used : update it.
+            self.repo._inv_all_indices.insert_index(0,
+                pack.inventory_index)
+        if self.repo._text_all_indices is not None:
+            # text 'knits' have been used : update it.
+            self.repo._text_all_indices.insert_index(0,
+                pack.text_index)
+        if self.repo._signature_all_indices is not None:
+            # sigatures 'knit' accessed : update it.
+            self.repo._signature_all_indices.insert_index(0,
+                pack.signature_index)
+
+    def all_pack_details(self):
+        """Return a list of all the packs as transport,name tuples.
+
+        :return: A list of (transport, name) tuples for all the packs in the
+            repository.
+        """
+        # XXX: fix me, should be direct rather than indirect
+        if self.repo._revision_all_indices is None:
+            # trigger creation of the all revision index.
+            self.repo._revision_store.get_revision_file(self.repo.get_transaction())
+        result = []
+        for index, transport_and_name in self.repo._revision_pack_map.iteritems():
+            result.append(transport_and_name)
+        return result
 
     def autopack(self):
         """Pack the pack collection incrementally.
@@ -119,6 +178,88 @@
         self._execute_pack_operations(pack_operations)
         return True
 
+    def create_pack_from_packs(self, revision_index_map, inventory_index_map,
+        text_index_map, signature_index_map, revision_ids=None):
+        """Create a new pack by reading data from other packs.
+
+        This does little more than a bulk copy of data. One key difference
+        is that data with the same item key across multiple packs is elided
+        from the output. The new pack is written into the current pack store
+        along with its indices, and the name added to the pack names. The 
+        source packs are not altered.
+
+        :param revision_index_map: A revision index map.
+        :param inventory_index_map: A inventory index map.
+        :param text_index_map: A text index map.
+        :param signature_index_map: A signature index map.
+        :param revision_ids: Either None, to copy all data, or a list
+            of revision_ids to limit the copied data to the data they
+            introduced.
+        :return: The number of revisions copied.
+        """
+        # open a pack - using the same name as the last temporary file
+        # - which has already been flushed, so its safe.
+        # XXX: - duplicate code warning with start_write_group; fix before
+        #      considering 'done'.
+        if getattr(self.repo, '_open_pack_tuple', None) is not None:
+            raise errors.BzrError('call to create_pack_from_packs while '
+                'another pack is being written.')
+        random_name = self.repo.control_files._lock.nonce + '.autopack'
+        write_stream = self.repo._upload_transport.open_file_stream(random_name)
+        pack_hash = md5.new()
+        def write_data(bytes, update=pack_hash.update):
+            write_stream(bytes)
+            update(bytes)
+        writer = pack.ContainerWriter(write_data)
+        writer.begin()
+        # open new indices
+        revision_index = InMemoryGraphIndex(reference_lists=1)
+        inv_index = InMemoryGraphIndex(reference_lists=2)
+        text_index = InMemoryGraphIndex(reference_lists=2, key_elements=2)
+        signature_index = InMemoryGraphIndex(reference_lists=0)
+        # select revision keys
+        revision_nodes = self._index_contents(revision_index_map)
+        # copy revision keys and adjust values
+        self._copy_nodes_graph(revision_nodes, revision_index_map, writer, revision_index)
+        # select inventory keys
+        inv_nodes = self._index_contents(inventory_index_map)
+        # copy inventory keys and adjust values
+        self._copy_nodes_graph(inv_nodes, inventory_index_map, writer, inv_index)
+        # select text keys
+        text_nodes = self._index_contents(text_index_map)
+        # copy text keys and adjust values
+        self._copy_nodes_graph(text_nodes, text_index_map, writer, text_index)
+        # select signature keys
+        signature_nodes = self._index_contents(signature_index_map)
+        # copy signature keys and adjust values
+        self._copy_nodes(signature_nodes, signature_index_map, writer, signature_index)
+        # finish the pack
+        writer.end()
+        new_name = pack_hash.hexdigest()
+        # add to names
+        self.allocate(new_name)
+        # rename into place
+        self.repo._upload_transport.close_file_stream(random_name)
+        self.repo._upload_transport.rename(random_name, '../packs/' + new_name + '.pack')
+        # write indices
+        index_transport = self.repo._upload_transport.clone('../indices')
+        rev_index_name = self.repo._revision_store.name_to_revision_index_name(new_name)
+        index_transport.put_file(rev_index_name, revision_index.finish())
+        inv_index_name = self.repo._inv_thunk.name_to_inv_index_name(new_name)
+        index_transport.put_file(inv_index_name, inv_index.finish())
+        text_index_name = self.repo.weave_store.name_to_text_index_name(new_name)
+        index_transport.put_file(text_index_name, text_index.finish())
+        signature_index_name = self.repo._revision_store.name_to_signature_index_name(new_name)
+        index_transport.put_file(signature_index_name, signature_index.finish())
+        result = Pack()
+        result.revision_index = revision_index
+        result.inventory_index = inv_index
+        result.text_index = text_index
+        result.signature_index = signature_index
+        result.name = new_name
+        result.transport = self.repo._upload_transport.clone('../packs/')
+        return result
+
     def _execute_pack_operations(self, pack_operations):
         """Execute a series of pack operations.
 
@@ -218,61 +359,16 @@
             in use.
         :return: None
         """
-        # open new pack - using the same name as the last temporary file
-        # - which has already been flushed, so its safe.
-        # XXX: - duplicate code warning with start_write_group; fix before
-        #      considering 'done'.
-        random_name = self.repo.control_files._lock.nonce + '.autopack'
-        write_stream = self.repo._upload_transport.open_file_stream(random_name)
-        pack_hash = md5.new()
-        def write_data(bytes, update=pack_hash.update):
-            write_stream(bytes)
-            update(bytes)
-        writer = pack.ContainerWriter(write_data)
-        writer.begin()
-        # open new indices
-        revision_index = InMemoryGraphIndex(reference_lists=1)
-        inv_index = InMemoryGraphIndex(reference_lists=2)
-        text_index = InMemoryGraphIndex(reference_lists=2, key_elements=2)
-        signature_index = InMemoryGraphIndex(reference_lists=0)
         # select revision keys
         revision_index_map = self._revision_index_map(pack_details)
-        revision_nodes = self._index_contents(revision_index_map)
-        # copy revision keys and adjust values
-        self._copy_nodes_graph(revision_nodes, revision_index_map, writer, revision_index)
         # select inventory keys
         inv_index_map = self._inv_index_map(pack_details)
-        inv_nodes = self._index_contents(inv_index_map)
-        # copy inventory keys and adjust values
-        self._copy_nodes_graph(inv_nodes, inv_index_map, writer, inv_index)
         # select text keys
         text_index_map = self._text_index_map(pack_details)
-        text_nodes = self._index_contents(text_index_map)
-        # copy text keys and adjust values
-        self._copy_nodes_graph(text_nodes, text_index_map, writer, text_index)
         # select signature keys
         signature_index_map = self._signature_index_map(pack_details)
-        signature_nodes = self._index_contents(signature_index_map)
-        # copy signature keys and adjust values
-        self._copy_nodes(signature_nodes, signature_index_map, writer, signature_index)
-        # finish the pack
-        writer.end()
-        new_name = pack_hash.hexdigest()
-        # add to names
-        self.allocate(new_name)
-        # rename into place
-        self.repo._upload_transport.close_file_stream(random_name)
-        self.repo._upload_transport.rename(random_name, '../packs/' + new_name + '.pack')
-        # write indices
-        index_transport = self.repo._upload_transport.clone('../indices')
-        rev_index_name = self.repo._revision_store.name_to_revision_index_name(new_name)
-        index_transport.put_file(rev_index_name, revision_index.finish())
-        inv_index_name = self.repo._inv_thunk.name_to_inv_index_name(new_name)
-        index_transport.put_file(inv_index_name, inv_index.finish())
-        text_index_name = self.repo.weave_store.name_to_text_index_name(new_name)
-        index_transport.put_file(text_index_name, text_index.finish())
-        signature_index_name = self.repo._revision_store.name_to_signature_index_name(new_name)
-        index_transport.put_file(signature_index_name, signature_index.finish())
+        self.create_pack_from_packs(revision_index_map, inv_index_map,
+            text_index_map, signature_index_map)
 
     def _copy_nodes(self, nodes, index_map, writer, write_index):
         # plan a readv on each source pack:
@@ -307,6 +403,8 @@
                 write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
 
     def _copy_nodes_graph(self, nodes, index_map, writer, write_index):
+        # for record verification
+        knit_data = _KnitData(None)
         # plan a readv on each source pack:
         # group by pack
         nodes = sorted(nodes)
@@ -335,6 +433,8 @@
             for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
                 izip(reader.iter_records(), pack_readv_requests):
                 raw_data = read_func(None)
+                df, _ = knit_data._parse_record_header(key[-1], raw_data)
+                df.close()
                 pos, size = writer.add_bytes_record(raw_data, names)
                 write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
 
@@ -410,6 +510,7 @@
 
     def reset(self):
         self._names = None
+        self.packs = []
 
     def _inv_index_map(self, pack_details):
         """Get a map of inv index -> packs for pack_details."""
@@ -986,6 +1087,7 @@
             self._upload_transport.close_file_stream(self._open_pack_tuple[1])
             self._upload_transport.rename(self._open_pack_tuple[1],
                 '../packs/' + new_name + '.pack')
+            self._open_pack_tuple = None
             if not self._packs.autopack():
                 self._packs.save()
         else:
@@ -1100,6 +1202,7 @@
             self._upload_transport.close_file_stream(self._open_pack_tuple[1])
             self._upload_transport.rename(self._open_pack_tuple[1],
                 '../packs/' + new_name + '.pack')
+            self._open_pack_tuple = None
             if not self._packs.autopack():
                 self._packs.save()
         else:

=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py	2007-08-13 04:45:41 +0000
+++ b/bzrlib/repository.py	2007-08-14 07:18:04 +0000
@@ -1854,16 +1854,41 @@
     @needs_write_lock
     def fetch(self, revision_id=None, pb=None):
         """See InterRepository.fetch()."""
-        from bzrlib.fetch import KnitRepoFetcher
         mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
                self.source, self.source._format, self.target, self.target._format)
         # TODO: jam 20070210 This should be an assert, not a translate
         revision_id = osutils.safe_revision_id(revision_id)
-        f = KnitRepoFetcher(to_repository=self.target,
-                            from_repository=self.source,
-                            last_revision=revision_id,
-                            pb=pb)
-        return f.count_copied, f.failed_revisions
+        self.count_copied = 0
+        if revision_id is None:
+            # nothing to do.
+            return
+        if _mod_revision.is_null(revision_id):
+            # TODO:
+            # everything to do - use pack logic
+            # to fetch from all packs to one without
+            # inventory parsing etc.
+            # till then:
+            revision_ids = self.source.all_revision_ids()
+        else:
+            try:
+                revision_ids = self.missing_revision_ids(revision_id)
+            except errors.NoSuchRevision:
+                raise errors.InstallFailed([revision_id])
+        packs = self.source._packs.all_pack_details()
+        revision_index_map = self.source._packs._revision_index_map(packs)
+        inventory_index_map = self.source._packs._inv_index_map(packs)
+        text_index_map = self.source._packs._text_index_map(packs)
+        signature_index_map = self.source._packs._signature_index_map(packs)
+        pack = self.target._packs.create_pack_from_packs(
+            revision_index_map,
+            inventory_index_map,
+            text_index_map,
+            signature_index_map,
+            revision_ids
+            )
+        self.target._packs.save()
+        self.target._packs.add_pack_to_memory(pack)
+        return pack.get_revision_count()
 
     @needs_read_lock
     def missing_revision_ids(self, revision_id=None):
@@ -2017,6 +2042,7 @@
 InterRepository.register_optimiser(InterKnitRepo)
 InterRepository.register_optimiser(InterModel1and2)
 InterRepository.register_optimiser(InterKnit1and2)
+InterRepository.register_optimiser(InterPackRepo)
 InterRepository.register_optimiser(InterRemoteRepository)
 
 



More information about the bazaar-commits mailing list