Rev 2826: Make NewPack reopen the index files, separating out the task of refreshing the index maps in the repository and managing the completion of writing a single pack to disk. in http://people.ubuntu.com/~robertc/baz2.0/repository

Robert Collins robertc at robertcollins.net
Tue Oct 16 03:48:30 BST 2007


At http://people.ubuntu.com/~robertc/baz2.0/repository

------------------------------------------------------------
revno: 2826
revision-id: robertc at robertcollins.net-20071016024822-vsaucu2p3gnz3b6m
parent: robertc at robertcollins.net-20071016002033-cszqh7o0e9wym4vd
committer: Robert Collins <robertc at robertcollins.net>
branch nick: repository
timestamp: Tue 2007-10-16 12:48:22 +1000
message:
  Make NewPack reopen the index files, separating out the task of refreshing the index maps in the repository and managing the completion of writing a single pack to disk.
modified:
  bzrlib/repofmt/pack_repo.py    pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py	2007-10-16 00:20:33 +0000
+++ b/bzrlib/repofmt/pack_repo.py	2007-10-16 02:48:22 +0000
@@ -118,37 +118,46 @@
         self.text_index = text_index
         self.signature_index = signature_index
 
+    def file_name(self):
+        """Get the file name for the pack on disk."""
+        return self.name + '.pack'
+
     def get_revision_count(self):
         return self.revision_index.key_count()
 
     def inventory_index_name(self, name):
         """The inv index is the name + .iix."""
-        return name + '.iix'
+        return self.index_name('inventory', name)
 
     def revision_index_name(self, name):
         """The revision index is the name + .rix."""
-        return name + '.rix'
+        return self.index_name('revision', name)
 
     def signature_index_name(self, name):
         """The signature index is the name + .six."""
-        return name + '.six'
+        return self.index_name('signature', name)
 
     def text_index_name(self, name):
         """The text index is the name + .tix."""
-        return name + '.tix'
+        return self.index_name('text', name)
 
 
 class ExistingPack(Pack):
     """An in memory proxy for an exisiting .pack and its disk indices."""
 
-    def __init__(self, transport, name, revision_index, inventory_index,
+    def __init__(self, pack_transport, name, revision_index, inventory_index,
         text_index, signature_index):
+        """Create an ExistingPack object.
+
+        :param pack_transport: The transport where the pack file resides.
+        :param name: The name of the pack on disk in the pack_transport.
+        """
         Pack.__init__(self, revision_index, inventory_index, text_index,
             signature_index)
         self.name = name
-        self.transport = transport
+        self.pack_transport = pack_transport
         assert None not in (revision_index, inventory_index, text_index,
-            signature_index, name, transport)
+            signature_index, name, pack_transport)
 
     def __eq__(self, other):
         return self.__dict__ == other.__dict__
@@ -160,14 +169,19 @@
         return "<bzrlib.repofmt.pack_repo.Pack object at 0x%x, %s, %s" % (
             id(self), self.transport, self.name)
 
-    def file_name(self):
-        """Get the file name for the pack on disk."""
-        return self.name + '.pack'
-
 
 class NewPack(Pack):
     """An in memory proxy for a pack which is being created."""
 
+    # A map of index 'type' to the file extension and position in the
+    # index_sizes array.
+    indices = {
+        'revision':('.rix', 0),
+        'inventory':('.iix', 1),
+        'text':('.tix', 2),
+        'signature':('.six', 3),
+        }
+
     def __init__(self, upload_transport, index_transport, pack_transport,
         upload_suffix=''):
         """Create a NewPack instance.
@@ -233,16 +247,29 @@
         """
         self.name = self._hash.hexdigest()
         self.index_sizes = [None, None, None, None]
-        self._write_index(self.revision_index, 0,
-            self.revision_index_name, 'revision')
-        self._write_index(self.inventory_index, 1,
-            self.inventory_index_name, 'inventory')
-        self._write_index(self.text_index, 2,
-            self.text_index_name, 'file texts')
-        self._write_index(self.signature_index, 3,
-            self.signature_index_name, 'revision signatures')
-
-    def _write_index(self, index, index_offset, name_getter, label):
+        self._write_index('revision', self.revision_index, 'revision')
+        self._write_index('inventory', self.inventory_index, 'inventory')
+        self._write_index('text', self.text_index, 'file texts')
+        self._write_index('signature', self.signature_index,
+            'revision signatures')
+
+
+    def make_index(self, index_type):
+        """Construct a GraphIndex object for this packs index 'index_type'."""
+        setattr(self, index_type + '_index',
+            GraphIndex(self.index_transport,
+                self.index_name(index_type, self.name),
+                self.index_sizes[self.index_offset(index_type)]))
+
+    def index_name(self, index_type, name):
+        """Get the disk name of an index type for pack name 'name'."""
+        return name + NewPack.indices[index_type][0]
+
+    def index_offset(self, index_type):
+        """Get the position in a index_size array for a given index type."""
+        return NewPack.indices[index_type][1]
+
+    def _write_index(self, index_type, index, label):
         """Write out an index.
 
         :param index: The index object to serialise.
@@ -250,14 +277,19 @@
         :param name_getter: What to use to get the name of the index on disk.
         :param label: What label to give the index e.g. 'revision'.
         """
-        index_name = name_getter(self.name)
-        self.index_sizes[index_offset] = self.index_transport.put_file(
-            index_name, index.finish())
+        index_name = self.index_name(index_type, self.name)
+        self.index_sizes[self.index_offset(index_type)] = \
+            self.index_transport.put_file(index_name, index.finish())
         if 'fetch' in debug.debug_flags:
             # XXX: size might be interesting?
             mutter('%s: create_pack: wrote %s index: %s%s t+%6.3fs',
                 time.ctime(), label, self.upload_transport.base,
                 self.random_name, time.time() - self.start_time)
+        # As we have no current protection against erroneous additional
+        # insertions, load the index from disk on further use. We should alter
+        # the index layer to make it's finish() error if add_node is
+        # subsequently used. RBC
+        self.make_index(index_type)
 
 
 class RepositoryPackCollection(object):
@@ -302,13 +334,13 @@
             pass
         else:
             self.repo._revision_pack_map[pack.revision_index] = (
-                pack.transport, pack.name + '.pack')
+                pack.pack_transport, pack.name + '.pack')
             self.repo._revision_all_indices.insert_index(0, pack.revision_index)
         if self.repo._inv_all_indices is not None:
             # inv 'knit' has been used : update it.
             self.repo._inv_all_indices.insert_index(0,
                 pack.inventory_index)
-            self.repo._inv_pack_map[pack.inventory_index] = pack.transport, pack.name + '.pack'
+            self.repo._inv_pack_map[pack.inventory_index] = pack.pack_transport, pack.name + '.pack'
         if self.repo._text_all_indices is not None:
             # text 'knits' have been used : update it.
             self.repo._text_all_indices.insert_index(0,
@@ -393,78 +425,60 @@
         self._execute_pack_operations(pack_operations)
         return True
 
-    def flush_revision_signature_indices(self, new_name, revision_index_length,
-        signature_index_length):
-        """Write out pending indices."""
-        # write a revision index (might be empty)
-        new_index_name = self._new_pack.revision_index_name(new_name)
-        rev_index = GraphIndex(self._index_transport, new_index_name,
-                revision_index_length)
+    def refresh_revision_signature_indices(self):
+        """Refresh the mappings for revisions."""
+        index_map, index_list = self._make_index_map('.rix')
         if self.repo._revision_all_indices is None:
             # create a pack map for the autopack code - XXX finish
             # making a clear managed list of packs, indices and use
             # that in these mapping classes
-            self.repo._revision_pack_map = self._make_index_map('.rix')[0]
+            self.repo._revision_pack_map = index_map
         else:
-            del self.repo._revision_pack_map[self._new_pack.revision_index]
-            self.repo._revision_pack_map[rev_index] = (self._pack_tuple(new_name))
+            # refresh the revision pack map dict without replacing the instance.
+            self.repo._revision_pack_map.clear()
+            self.repo._revision_pack_map.update(index_map)
             # revisions 'knit' accessed : update it.
-            self.repo._revision_all_indices.insert_index(0, rev_index)
-            # remove the write buffering index. XXX: API break
-            # - clearly we need a remove_index call too.
-            del self.repo._revision_all_indices._indices[1]
+            # XXX: API break - clearly a 'replace' method would be good?
+            self.repo._revision_all_indices._indices[:] = index_list
             # reset the knit access writer
             self.repo._revision_knit_access.set_writer(None, None, (None, None))
 
-        # write a signatures index (might be empty)
-        new_index_name = self._new_pack.signature_index_name(new_name)
-        sig_index = GraphIndex(self._index_transport, new_index_name,
-            signature_index_length)
+        index_map, index_list = self._make_index_map('.six')
         if self.repo._signature_all_indices is not None:
-            # sigatures 'knit' accessed : update it.
-            self.repo._signature_all_indices.insert_index(0, sig_index)
-            # remove the write buffering index. XXX: API break
-            # - clearly we need a remove_index call too.
-            del self.repo._signature_all_indices._indices[1]
+            # signature 'knit' accessed : update it.
+            # XXX: API break - clearly a 'replace' method would be good?
+            self.repo._signature_all_indices._indices[:] = index_list
             # reset the knit access writer
             self.repo._signature_knit_access.set_writer(None, None, (None, None))
-        return rev_index, sig_index,
 
-    def flush_inventory_index(self, new_name, inventory_index_length):
-        """Write the index out to new_name."""
-        # write an index (might be empty)
-        new_index_name = self._new_pack.inventory_index_name(new_name)
-        inv_index = GraphIndex(self._index_transport, new_index_name,
-            inventory_index_length)
+    def refresh_inventory_index(self):
+        """Refresh the inventory access index mappings."""
+        index_map, index_list = self._make_index_map('.iix')
         if self.repo._inv_all_indices is not None:
-            # inv 'knit' has been used, replace the mutated memory index
-            # with the new on-disk one. XXX: is this really a good idea?
-            # perhaps just keep using the memory one ?
-            self.repo._inv_all_indices.insert_index(0, inv_index)
-            # remove the write buffering index. XXX: API break
-            # - clearly we need a remove_index call too.
-            del self.repo._inv_all_indices._indices[1]
+            # refresh the pack map dict without replacing the instance.
+            self.repo._inv_pack_map.clear()
+            self.repo._inv_pack_map.update(index_map)
+            # invs 'knit' accessed : update it.
+            # XXX: API break - clearly a 'replace' method would be good?
+            self.repo._inv_all_indices._indices[:] = index_list
+            # reset the knit access writer
             self.repo._inv_knit_access.set_writer(None, None, (None, None))
         else:
+            # inventory knit not used, ensure the pack map is regenerated at
+            # next use.
             self.repo._inv_pack_map = None
-        return inv_index
 
-    def flush_text_index(self, new_name, text_index_length):
-        """Write the index out to new_name."""
-        # write a revision index (might be empty)
-        new_index_name = self._new_pack.text_index_name(new_name)
-        txt_index = GraphIndex(self._index_transport, new_index_name,
-            text_index_length)
+    def refresh_text_index(self):
+        """Refresh the text index mappings."""
+        index_map, index_list = self._make_index_map('.tix')
         self.repo.weave_store._setup_knit(False)
         if self.repo._text_all_indices is not None:
-            # text 'knits' have been used, replace the mutated memory index
-            # with the new on-disk one. XXX: is this really a good idea?
-            # perhaps just keep using the memory one ?
-            self.repo._text_all_indices.insert_index(0, txt_index)
-            # remove the write buffering index. XXX: API break
-            # - clearly we need a remove_index call too.
-            del self.repo._text_all_indices._indices[1]
-        return txt_index
+            # refresh the pack map dict without replacing the instance.
+            self.repo._text_pack_map.clear()
+            self.repo._text_pack_map.update(index_map)
+            # invs 'knit' accessed : update it.
+            # XXX: API break - clearly a 'replace' method would be good?
+            self.repo._text_all_indices._indices[:] = index_list
 
     def create_pack_from_packs(self, packs, suffix, revision_ids=None):
         """Create a new pack by reading data from other packs.
@@ -499,7 +513,7 @@
             self._pack_transport, upload_suffix=suffix)
         random_name = new_pack.random_name
         if 'fetch' in debug.debug_flags:
-            plain_pack_list = ['%s%s' % (a_pack.transport.base, a_pack.name)
+            plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
                 for a_pack in packs]
             if revision_ids is not None:
                 rev_count = len(revision_ids)
@@ -618,7 +632,6 @@
             bytes = ''.join(buffer)
             write_stream.write(bytes)
             new_pack._hash.update(bytes)
-        new_name = new_pack._hash.hexdigest()
         if not new_pack.data_inserted():
             # nothing was copied, discard the new pack.
             self._upload_transport.delete(random_name)
@@ -626,21 +639,19 @@
         # write indices
         new_pack.finish()
         # add to name
-        self.allocate(new_name, new_pack.index_sizes[0],
+        self.allocate(new_pack, new_pack.name, new_pack.index_sizes[0],
             new_pack.index_sizes[1], new_pack.index_sizes[2],
             new_pack.index_sizes[3])
         # rename into place. XXX: should rename each index too rather than just
         # uploading blind under the chosen name.
         write_stream.close()
-        self._upload_transport.rename(random_name, '../packs/' + new_name + '.pack')
-        result = ExistingPack(self._upload_transport.clone('../packs/'), new_name,
-            new_pack.revision_index, new_pack.inventory_index, new_pack.text_index,
-            new_pack.signature_index)
+        self._upload_transport.rename(random_name, '../packs/' + new_pack.name + '.pack')
+        result = new_pack
         if 'fetch' in debug.debug_flags:
             # XXX: size might be interesting?
             mutter('%s: create_pack: pack renamed into place: %s%s->%s%s t+%6.3fs',
                 time.ctime(), self._upload_transport.base, random_name,
-                result.transport, result.name,
+                result.pack_transport, result.name,
                 time.time() - new_pack.start_time)
         if 'fetch' in debug.debug_flags:
             # XXX: size might be interesting?
@@ -864,7 +875,7 @@
             self.add_pack_to_memory(result)
             return result
 
-    def allocate(self, name, revision_index_length, inventory_index_length,
+    def allocate(self, a_new_pack, name, revision_index_length, inventory_index_length,
         text_index_length, signature_index_length):
         """Allocate name in the list of packs.
 
@@ -950,7 +961,7 @@
         :param return: None.
         """
         for pack in packs:
-            pack.transport.rename(pack.file_name(),
+            pack.pack_transport.rename(pack.file_name(),
                 '../obsolete_packs/' + pack.file_name())
             # TODO: Probably needs to know all possible indexes for this pack
             # - or maybe list the directory and move all indexes matching this
@@ -1045,7 +1056,7 @@
         for pack in packs:
             index = getattr(pack, index_attribute)
             indices.append(index)
-            pack_map[index] = (pack.transport, pack.file_name())
+            pack_map[index] = (pack.pack_transport, pack.file_name())
         return pack_map, indices
 
     def _index_contents(self, pack_map, key_filter=None):
@@ -1127,33 +1138,27 @@
     def _commit_write_group(self):
         if self._new_pack.data_inserted():
             self._open_pack_writer.end()
-            new_name = self._new_pack._hash.hexdigest()
             self._new_pack.finish()
-            txt_index = \
-                self.flush_text_index(new_name, self._new_pack.index_sizes[2])
-            inv_index = \
-                self.flush_inventory_index(new_name, self._new_pack.index_sizes[1])
-            rev_index, sig_index = \
-                self.flush_revision_signature_indices(new_name,
-                self._new_pack.index_sizes[0], self._new_pack.index_sizes[3])
-            new_pack = ExistingPack(self._upload_transport.clone('../packs/'),
-                new_name, rev_index, inv_index, txt_index, sig_index)
             self._write_stream.close()
             self._upload_transport.rename(self.repo._open_pack_tuple[1],
-                '../packs/' + new_name + '.pack')
+                '../packs/' + self._new_pack.name + '.pack')
             # If this fails, its a hash collision. We should:
             # - determine if its a collision or
             # - the same content or
             # - the existing name is not the actual hash - e.g.
             #   its a deliberate attack or data corruption has
             #   occuring during the write of that file.
-            self.allocate(new_name, self._new_pack.index_sizes[0],
+            self.allocate(self._new_pack, self._new_pack.name, self._new_pack.index_sizes[0],
                 self._new_pack.index_sizes[1], self._new_pack.index_sizes[2],
                 self._new_pack.index_sizes[3])
             self.repo._open_pack_tuple = None
             self._new_pack = None
             if not self.autopack():
                 self._save_pack_names()
+            # now setup the maps we need to access data again.
+            self.refresh_text_index()
+            self.refresh_inventory_index()
+            self.refresh_revision_signature_indices()
         else:
             # remove the pending upload
             self._upload_transport.delete(self.repo._open_pack_tuple[1])
@@ -1358,7 +1363,7 @@
         if for_write:
             # a reused knit object for commit specifically.
             self.repo._text_knit = self.get_weave_or_empty(
-                'all-texts', self.repo.get_transaction(), for_write)
+                'all-texts', None, for_write)
         else:
             self.repo._text_knit = None
 



More information about the bazaar-commits mailing list