Rev 2820: Output the revision index from NewPack.finish in http://people.ubuntu.com/~robertc/baz2.0/repository

Robert Collins robertc at robertcollins.net
Mon Oct 15 04:09:54 BST 2007


At http://people.ubuntu.com/~robertc/baz2.0/repository

------------------------------------------------------------
revno: 2820
revision-id: robertc at robertcollins.net-20071015030942-koi2eoiaewe9kdod
parent: robertc at robertcollins.net-20071015014503-0nxu50fz1xcfwy6f
committer: Robert Collins <robertc at robertcollins.net>
branch nick: repository
timestamp: Mon 2007-10-15 13:09:42 +1000
message:
  Output the revision index from NewPack.finish
modified:
  bzrlib/repofmt/pack_repo.py    pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
  bzrlib/tests/test_repository.py test_repository.py-20060131075918-65c555b881612f4d
=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py	2007-10-15 01:45:03 +0000
+++ b/bzrlib/repofmt/pack_repo.py	2007-10-15 03:09:42 +0000
@@ -106,6 +106,13 @@
         """
         self.revision_index = revision_index
 
+    def get_revision_count(self):
+        return self.revision_index.key_count()
+
+    def inventory_index_name(self, name):
+        """The inv index is the name + .iix."""
+        return name + '.iix'
+
     def revision_index_name(self, name):
         """The revision index is the name + .rix."""
         return name + '.rix'
@@ -118,10 +125,6 @@
         """The text index is the name + .tix."""
         return name + '.tix'
 
-    def inventory_index_name(self, name):
-        """The inv index is the name + .iix."""
-        return name + '.iix'
-
 
 class ExistingPack(Pack):
     """An in memory proxy for an exisiting .pack and its disk indices."""
@@ -151,17 +154,61 @@
         """Get the file name for the pack on disk."""
         return self.name + '.pack'
 
-    def get_revision_count(self):
-        return self.revision_index.key_count()
-
 
 class NewPack(Pack):
     """An in memory proxy for a pack which is being created."""
 
-    def __init__(self):
-        Pack.__init__(self, InMemoryGraphIndex(1))
+    def __init__(self, upload_transport, index_transport, pack_transport,
+        upload_suffix=''):
+        """Create a NewPack instance.
+
+        :param upload_transport: A writable transport for the pack to be
+            incrementally uploaded to.
+        :param index_transport: A writable transport for the pack's indices to
+            be written to when the pack is finished.
+        :param pack_transport: A writable transport for the pack to be renamed
+            to when the upload is complete.
+        :param upload_suffix: An optional suffix to be given to any temporary
+            files created during the pack creation. e.g '.autopack'
+        """
+        Pack.__init__(self, InMemoryGraphIndex(reference_lists=1))
+        # where should the new pack be opened
+        self.upload_transport = upload_transport
+        # where are indices written out to
+        self.index_transport = index_transport
+        # where is the pack renamed to when it is finished?
+        self.pack_transport = pack_transport
         # tracks the content written to the .pack file.
         self._hash = md5.new()
+        # a four-tuple with the length in bytes of the indices, once the pack
+        # is finalised.
+        self.index_sizes = None
+        # the temporary pack file name.
+        self.random_name = rand_chars(20) + upload_suffix
+        # when was this pack started ?
+        self.start_time = time.time()
+
+    def finish(self):
+        """Finish the new pack.
+
+        This:
+         - finalises the content
+         - assigns a name (the md5 of the content, currently)
+         - writes out the associated indices
+         - renames the pack into place.
+         - stores the index size tuple for the pack in the index_sizes
+           attribute.
+        """
+        new_name = self._hash.hexdigest()
+        self.index_sizes = [None, None, None, None]
+        revision_index_name = self.revision_index_name(new_name)
+        self.index_sizes[0] = self.index_transport.put_file(
+            revision_index_name, self.revision_index.finish())
+        if 'fetch' in debug.debug_flags:
+            # XXX: size might be interesting?
+            mutter('%s: create_pack: wrote revision index: %s%s t+%6.3fs',
+                time.ctime(), self.upload_transport.base, self.random_name,
+                time.time() - self.start_time)
 
 
 class RepositoryPackCollection(object):
@@ -297,12 +344,10 @@
         self._execute_pack_operations(pack_operations)
         return True
 
-    def flush_revision_signature_indices(self, new_name):
+    def flush_revision_signature_indices(self, new_name, revision_index_length):
         """Write out pending indices."""
         # write a revision index (might be empty)
-        new_index_name = self.repo._packs._new_pack.revision_index_name(new_name)
-        revision_index_length = self._index_transport.put_file(
-            new_index_name, self.repo._packs._new_pack.revision_index.finish())
+        new_index_name = self._new_pack.revision_index_name(new_name)
         rev_index = GraphIndex(self._index_transport, new_index_name,
                 revision_index_length)
         if self.repo._revision_all_indices is None:
@@ -311,7 +356,7 @@
             # that in these mapping classes
             self.repo._revision_pack_map = self._make_index_map('.rix')[0]
         else:
-            del self.repo._revision_pack_map[self.repo._packs._new_pack.revision_index]
+            del self.repo._revision_pack_map[self._new_pack.revision_index]
             self.repo._revision_pack_map[rev_index] = (self._pack_tuple(new_name))
             # revisions 'knit' accessed : update it.
             self.repo._revision_all_indices.insert_index(0, rev_index)
@@ -322,7 +367,7 @@
             self.repo._revision_knit_access.set_writer(None, None, (None, None))
 
         # write a signatures index (might be empty)
-        new_index_name = NewPack().signature_index_name(new_name)
+        new_index_name = self._new_pack.signature_index_name(new_name)
         signature_index_length = self._index_transport.put_file(
             new_index_name, self.repo._signature_write_index.finish())
         self.repo._signature_write_index = None
@@ -342,7 +387,7 @@
     def flush_inventory_index(self, new_name):
         """Write the index out to new_name."""
         # write an index (might be empty)
-        new_index_name = NewPack().inventory_index_name(new_name)
+        new_index_name = self._new_pack.inventory_index_name(new_name)
         inventory_index_length = self._index_transport.put_file(
             new_index_name, self.repo._inv_write_index.finish())
         self.repo._inv_write_index = None
@@ -364,7 +409,7 @@
     def flush_text_index(self, new_name):
         """Write the index out to new_name."""
         # write a revision index (might be empty)
-        new_index_name = NewPack().text_index_name(new_name)
+        new_index_name = self._new_pack.text_index_name(new_name)
         text_index_length = self._index_transport.put_file(
             new_index_name, self.repo._text_write_index.finish())
         txt_index = GraphIndex(self._index_transport, new_index_name,
@@ -404,10 +449,15 @@
         if getattr(self.repo, '_open_pack_tuple', None) is not None:
             raise errors.BzrError('call to create_pack_from_packs while '
                 'another pack is being written.')
+        if self._new_pack is not None:
+            raise errors.BzrError('call to create_pack_from_packs while '
+                'another pack is being written.')
         if revision_ids is not None and len(revision_ids) == 0:
             # silly fetch request.
             return None
-        random_name = self._random_name() + suffix
+        new_pack = NewPack(self._upload_transport, self._index_transport,
+            self._pack_transport, upload_suffix=suffix)
+        random_name = new_pack.random_name
         if 'fetch' in debug.debug_flags:
             plain_pack_list = ['%s%s' % (a_pack.transport.base, a_pack.name)
                 for a_pack in packs]
@@ -419,13 +469,11 @@
                 '%s%s %s revisions wanted %s t=0',
                 time.ctime(), self._upload_transport.base, random_name,
                 plain_pack_list, rev_count)
-            start_time = time.time()
         write_stream = self._upload_transport.open_write_stream(random_name)
         if 'fetch' in debug.debug_flags:
             mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
                 time.ctime(), self._upload_transport.base, random_name,
-                time.time() - start_time)
-        new_pack = NewPack()
+                time.time() - new_pack.start_time)
         buffer = []
         def write_data(bytes, update=new_pack._hash.update, write=write_stream.write):
             buffer.append(bytes)
@@ -437,7 +485,6 @@
         writer = pack.ContainerWriter(write_data)
         writer.begin()
         # open new indices
-        revision_index = InMemoryGraphIndex(reference_lists=1)
         inv_index = InMemoryGraphIndex(reference_lists=2)
         text_index = InMemoryGraphIndex(reference_lists=2, key_elements=2)
         signature_index = InMemoryGraphIndex(reference_lists=0)
@@ -453,12 +500,12 @@
         revision_nodes = self._index_contents(revision_index_map, revision_keys)
         # copy revision keys and adjust values
         list(self._copy_nodes_graph(revision_nodes, revision_index_map, writer,
-            revision_index))
+            new_pack.revision_index))
         if 'fetch' in debug.debug_flags:
             mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
                 time.ctime(), self._upload_transport.base, random_name,
-                revision_index.key_count(),
-                time.time() - start_time)
+                new_pack.revision_index.key_count(),
+                time.time() - new_pack.start_time)
         # select inventory keys
         inv_keys = revision_keys # currently the same keyspace, and note that
         # querying for keys here could introduce a bug where an inventory item
@@ -487,7 +534,7 @@
             mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
                 time.ctime(), self._upload_transport.base, random_name,
                 inv_index.key_count(),
-                time.time() - start_time)
+                time.time() - new_pack.start_time)
         # select text keys
         text_index_map = self._packs_list_to_pack_map_and_index_list(
             packs, 'text_index')[0]
@@ -515,7 +562,7 @@
             mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
                 time.ctime(), self._upload_transport.base, random_name,
                 text_index.key_count(),
-                time.time() - start_time)
+                time.time() - new_pack.start_time)
         # select signature keys
         signature_filter = revision_keys # same keyspace
         signature_index_map = self._packs_list_to_pack_map_and_index_list(
@@ -528,7 +575,7 @@
             mutter('%s: create_pack: revision signatures copied: %s%s %d items t+%6.3fs',
                 time.ctime(), self._upload_transport.base, random_name,
                 signature_index.key_count(),
-                time.time() - start_time)
+                time.time() - new_pack.start_time)
         # finish the pack
         writer.end()
         if len(buffer):
@@ -537,7 +584,7 @@
             new_pack._hash.update(bytes)
         new_name = new_pack._hash.hexdigest()
         # if nothing has been written, discard the new pack.
-        if 0 == sum((revision_index.key_count(),
+        if 0 == sum((new_pack.get_revision_count(),
             inv_index.key_count(),
             text_index.key_count(),
             signature_index.key_count(),
@@ -545,16 +592,9 @@
             self._upload_transport.delete(random_name)
             return None
         # write indices
-        new_pack = NewPack()
         index_transport = self._index_transport
-        rev_index_name = new_pack.revision_index_name(new_name)
-        revision_index_length = index_transport.put_file(rev_index_name,
-            revision_index.finish())
-        if 'fetch' in debug.debug_flags:
-            # XXX: size might be interesting?
-            mutter('%s: create_pack: wrote revision index: %s%s t+%6.3fs',
-                time.ctime(), self._upload_transport.base, random_name,
-                time.time() - start_time)
+        new_pack.finish()
+        revision_index_length = new_pack.index_sizes[0]
         inv_index_name = new_pack.inventory_index_name(new_name)
         inventory_index_length = index_transport.put_file(inv_index_name,
             inv_index.finish())
@@ -562,7 +602,7 @@
             # XXX: size might be interesting?
             mutter('%s: create_pack: wrote inventory index: %s%s t+%6.3fs',
                 time.ctime(), self._upload_transport.base, random_name,
-                time.time() - start_time)
+                time.time() - new_pack.start_time)
         text_index_name = new_pack.text_index_name(new_name)
         text_index_length = index_transport.put_file(text_index_name,
             text_index.finish())
@@ -570,7 +610,7 @@
             # XXX: size might be interesting?
             mutter('%s: create_pack: wrote file texts index: %s%s t+%6.3fs',
                 time.ctime(), self._upload_transport.base, random_name,
-                time.time() - start_time)
+                time.time() - new_pack.start_time)
         signature_index_name = new_pack.signature_index_name(new_name)
         signature_index_length = index_transport.put_file(signature_index_name,
             signature_index.finish())
@@ -578,7 +618,7 @@
             # XXX: size might be interesting?
             mutter('%s: create_pack: wrote revision signatures index: %s%s t+%6.3fs',
                 time.ctime(), self._upload_transport.base, random_name,
-                time.time() - start_time)
+                time.time() - new_pack.start_time)
         # add to name
         self.allocate(new_name, revision_index_length, inventory_index_length,
             text_index_length, signature_index_length)
@@ -587,18 +627,18 @@
         write_stream.close()
         self._upload_transport.rename(random_name, '../packs/' + new_name + '.pack')
         result = ExistingPack(self._upload_transport.clone('../packs/'), new_name,
-            revision_index, inv_index, text_index, signature_index)
+            new_pack.revision_index, inv_index, text_index, signature_index)
         if 'fetch' in debug.debug_flags:
             # XXX: size might be interesting?
             mutter('%s: create_pack: pack renamed into place: %s%s->%s%s t+%6.3fs',
                 time.ctime(), self._upload_transport.base, random_name,
                 result.transport, result.name,
-                time.time() - start_time)
+                time.time() - new_pack.start_time)
         if 'fetch' in debug.debug_flags:
             # XXX: size might be interesting?
             mutter('%s: create_pack: finished: %s%s t+%6.3fs',
                 time.ctime(), self._upload_transport.base, random_name,
-                time.time() - start_time)
+                time.time() - new_pack.start_time)
         return result
 
     def _execute_pack_operations(self, pack_operations):
@@ -1019,10 +1059,6 @@
         else:
             return all_index.iter_entries(key_filter)
 
-    def _random_name(self):
-        """Return a random name."""
-        return rand_chars(20)
-
     def release_names(self):
         """Release the mutex around the pack-names index."""
         self.repo.control_files.unlock()
@@ -1061,14 +1097,15 @@
         # Do not permit preparation for writing if we're not in a 'write lock'.
         if not self.repo.is_write_locked():
             raise errors.NotWriteLocked(self)
-        self._new_pack = NewPack()
+        self._new_pack = NewPack(self._upload_transport, self._index_transport,
+            self._pack_transport, upload_suffix='.pack')
 
     def _start_write_group(self):
-        random_name = self._random_name()
-        self.repo._open_pack_tuple = (self._upload_transport, random_name + '.pack')
-        write_stream = self._upload_transport.open_write_stream(random_name + '.pack')
+        self.setup()
+        random_name = self._new_pack.random_name
+        self.repo._open_pack_tuple = (self._upload_transport, random_name)
+        write_stream = self._upload_transport.open_write_stream(random_name)
         self._write_stream = write_stream
-        self.setup()
         def write_data(bytes, write=write_stream.write,
                        update=self._new_pack._hash.update):
             write(bytes)
@@ -1091,12 +1128,14 @@
         if data_inserted:
             self._open_pack_writer.end()
             new_name = self._new_pack._hash.hexdigest()
+            self._new_pack.finish()
             txt_index, text_index_length = self.flush_text_index(new_name)
             inv_index, inventory_index_length = \
                 self.flush_inventory_index(new_name)
             rev_index, revision_index_length, \
                 sig_index, signature_index_length = \
-                self.flush_revision_signature_indices(new_name)
+                self.flush_revision_signature_indices(new_name,
+                self._new_pack.index_sizes[0])
             new_pack = ExistingPack(self._upload_transport.clone('../packs/'),
                 new_name, rev_index, inv_index, txt_index, sig_index)
             self._write_stream.close()
@@ -1112,6 +1151,7 @@
                 inventory_index_length, text_index_length,
                 signature_index_length)
             self.repo._open_pack_tuple = None
+            self._new_pack = None
             if not self.autopack():
                 self._save_pack_names()
         else:
@@ -1154,7 +1194,7 @@
         if getattr(self.repo, '_revision_knit', None) is not None:
             return self.repo._revision_knit
         pack_map, indices = self.repo._packs._make_index_map('.rix')
-        if self.repo.is_in_write_group():
+        if self.repo._packs._new_pack is not None:
             # allow writing: queue writes to a new index
             indices.insert(0, self.repo._packs._new_pack.revision_index)
             pack_map[self.repo._packs._new_pack.revision_index] = self.repo._open_pack_tuple

=== modified file 'bzrlib/tests/test_repository.py'
--- a/bzrlib/tests/test_repository.py	2007-10-15 01:45:03 +0000
+++ b/bzrlib/tests/test_repository.py	2007-10-15 03:09:42 +0000
@@ -1162,6 +1162,18 @@
     """Tests for pack_repo.NewPack."""
 
     def test_new_instance_attributes(self):
-        pack = pack_repo.NewPack()
+        upload_transport = self.get_transport('upload')
+        pack_transport = self.get_transport('pack')
+        index_transport = self.get_transport('index')
+        upload_transport.mkdir('.')
+        pack = pack_repo.NewPack(upload_transport, index_transport,
+            pack_transport)
         self.assertIsInstance(pack.revision_index, InMemoryGraphIndex)
         self.assertIsInstance(pack._hash, type(md5.new()))
+        self.assertTrue(pack.upload_transport is upload_transport)
+        self.assertTrue(pack.index_transport is index_transport)
+        self.assertTrue(pack.pack_transport is pack_transport)
+        self.assertEqual(None, pack.index_sizes)
+        self.assertEqual(20, len(pack.random_name))
+        self.assertIsInstance(pack.random_name, str)
+        self.assertIsInstance(pack.start_time, float)



More information about the bazaar-commits mailing list