Rev 2835: Revision index management looking sane for packs. in http://people.ubuntu.com/~robertc/baz2.0/repository

Robert Collins robertc at robertcollins.net
Wed Oct 17 04:30:25 BST 2007


At http://people.ubuntu.com/~robertc/baz2.0/repository

------------------------------------------------------------
revno: 2835
revision-id: robertc at robertcollins.net-20071017033004-ahzli9m9pmpoqwji
parent: robertc at robertcollins.net-20071017014801-c6u8dl03zk1gko3r
committer: Robert Collins <robertc at robertcollins.net>
branch nick: repository
timestamp: Wed 2007-10-17 13:30:04 +1000
message:
  Revision index management looking sane for packs.
modified:
  bzrlib/repofmt/pack_repo.py    pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
  bzrlib/tests/test_repository.py test_repository.py-20060131075918-65c555b881612f4d
=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py	2007-10-17 01:48:01 +0000
+++ b/bzrlib/repofmt/pack_repo.py	2007-10-17 03:30:04 +0000
@@ -395,11 +395,14 @@
         self.index_to_pack.update(index_to_pack)
         # XXX: API break - clearly a 'replace' method would be good?
         self.combined_index._indices[:] = indices
+        # the current add nodes callback for the current writable index if
+        # there is one.
+        self.add_callback = None
 
     def add_index(self, index, pack):
         """Add index to the aggregate, which is an index for Pack pack.
         
-        :param index: An index from the pack object.
+        :param index: An index from the pack parameter.
         :param pack: A Pack instance.
         """
         # expose it to the index map
@@ -407,6 +410,39 @@
         # put it at the front of the linear index list
         self.combined_index.insert_index(0, index)
 
+    def add_writable_index(self, index, pack):
+        """Add an index which is able to have data added to it.
+        
+        :param index: An index from the pack parameter.
+        :param pack: A Pack instance.
+        """
+        assert self.add_callback is None
+        # allow writing: queue writes to a new index
+        self.add_index(index, pack)
+        # Updates the index to packs mapping as a side effect,
+        self.knit_access.set_writer(pack._writer, index, pack.access_tuple())
+        self.add_callback = index.add_nodes
+
+    def clear(self):
+        """Reset all the aggregate data to nothing."""
+        self.knit_access.set_writer(None, None, (None, None))
+        self.index_to_pack.clear()
+        del self.combined_index._indices[:]
+        self.add_callback = None
+
+    def remove_index(self, index, pack):
+        """Remove index from the indices used to answer queries.
+        
+        :param index: An index from the pack parameter.
+        :param pack: A Pack instance.
+        """
+        del self.index_to_pack[index]
+        self.combined_index._indices.remove(index)
+        if (self.add_callback is not None and
+            getattr(index, 'add_nodes', None) == self.add_callback):
+            self.add_callback = None
+            self.knit_access.set_writer(None, None, (None, None))
+
 
 class RepositoryPackCollection(object):
     """Management of packs within a repository."""
@@ -436,7 +472,7 @@
         # when a pack is being created by this object, the state of that pack.
         self._new_pack = None
         # aggregated revision index data
-        self.revision_index = None
+        self.revision_index = AggregateIndex()
 
     def add_pack_to_memory(self, pack):
         """Make a Pack object available to the repository to satisfy queries.
@@ -446,8 +482,7 @@
         self.packs.append(pack)
         assert pack.name not in self._packs
         self._packs[pack.name] = pack
-        if self.revision_index is not None:
-            self.revision_index.add_index(pack.revision_index, pack)
+        self.revision_index.add_index(pack.revision_index, pack)
         if self.repo._inv_all_indices is not None:
             # inv 'knit' has been used : update it.
             self.repo._inv_all_indices.insert_index(0,
@@ -502,8 +537,7 @@
 
         :return: True if packing took place.
         """
-        if self.revision_index is None:
-            self.refresh_revision_indices()
+        # XXX: Should not be needed when the management of indices is sane.
         total_revisions = self.revision_index.combined_index.key_count()
         total_packs = len(self._names)
         if self._max_pack_count(total_revisions) >= total_packs:
@@ -536,23 +570,6 @@
         self._execute_pack_operations(pack_operations)
         return True
 
-    def refresh_revision_indices(self):
-        """Refresh the mappings for revisions.
-        
-        This sets up mappings for only the existing packs, never in-progress
-        packs.
-        """
-        index_map, index_list = self._packs_list_to_pack_map_and_index_list(
-            self.all_packs(), 'revision_index')
-        if self.revision_index is None:
-            # setup indices.
-            self.revision_index = AggregateIndex()
-        else:
-            # reset the knit access writer
-            self.revision_index.knit_access.set_writer(None, None, (None, None))
-        # replace all the index data with the current canonical list.
-        self.revision_index.replace_indices(index_map, index_list)
-
     def refresh_signature_indices(self):
         index_map, index_list = self._packs_list_to_pack_map_and_index_list(
             self.all_packs(), 'signature_index')
@@ -759,7 +776,7 @@
             # have a progress bar?
             self.create_pack_from_packs(packs, '.autopack')
             for pack in packs:
-                self._remove_pack_by_name(pack.name)
+                self.remove_pack_from_memory(pack)
         # record the newly available packs and stop advertising the old
         # packs
         self._save_pack_names()
@@ -782,8 +799,6 @@
             total_packs = len(self._names)
             if total_packs < 2:
                 return
-            if self.revision_index is None:
-                self.refresh_revision_indices()
             total_revisions = self.revision_index.combined_index.key_count()
             # XXX: the following may want to be a class, to pack with a given
             # policy.
@@ -938,9 +953,14 @@
             self._packs_at_load = set()
             for index, key, value in self._iter_disk_pack_index():
                 name = key[0]
-                sizes = [int(digits) for digits in value.split(' ')]
-                self._names[name] = sizes
+                self._names[name] = self._parse_index_sizes(value)
                 self._packs_at_load.add((key, value))
+        # populate all the metadata.
+        self.all_packs()
+
+    def _parse_index_sizes(self, value):
+        """Parse a string of index sizes."""
+        return tuple([int(digits) for digits in value.split(' ')])
 
     def get_pack_by_name(self, name):
         """Get a Pack object by name.
@@ -971,7 +991,7 @@
             # a collision with the packs we know about (not the only possible
             # collision, see NewPack.finish() for some discussion). Remove our
             # prior reference to it.
-            self._remove_pack_by_name(a_new_pack.name)
+            self.remove_pack_from_memory(a_new_pack)
         self._names[a_new_pack.name] = tuple(a_new_pack.index_sizes)
         self.add_pack_to_memory(a_new_pack)
 
@@ -1053,15 +1073,20 @@
         """Return a tuple with the transport and file name for a pack name."""
         return self._pack_transport, name + '.pack'
 
-    def _remove_pack_by_name(self, name):
-        # strip .pack
-        self._names.pop(name)
+    def remove_pack_from_memory(self, pack):
+        """Remove pack from the packs accessed by this repository.
+        
+        Only affects memory state, until self._save_pack_names() is invoked.
+        """
+        self._names.pop(pack.name)
+        self._packs.pop(pack.name)
+        self.revision_index.remove_index(pack.revision_index, pack)
 
     def reset(self):
         """Clear all cached data."""
         # cached revision data
         self.repo._revision_knit = None
-        self.revision_index = None
+        self.revision_index.clear()
         # cached signature data
         self.repo._signature_knit = None
         self.repo._signature_all_indices = None
@@ -1166,11 +1191,44 @@
             new_nodes = current_nodes - self._packs_at_load
             disk_nodes.difference_update(deleted_nodes)
             disk_nodes.update(new_nodes)
+            # TODO: handle same-name, index-size-changes here - 
+            # e.g. use the value from disk, not ours, *unless* we're the one
+            # changing it.
             for key, value in disk_nodes:
                 builder.add_node(key, value)
             self.transport.put_file('pack-names', builder.finish())
+            # move the baseline forward
+            self._packs_at_load = disk_nodes
         finally:
             self.release_names()
+        # synchronise the memory packs list with what we just wrote:
+        new_names = dict(disk_nodes)
+        # drop no longer present nodes
+        for pack in self.all_packs():
+            if (pack.name,) not in new_names:
+                self.remove_pack_from_memory(pack)
+        # add new nodes/refresh existing ones
+        for key, value in disk_nodes:
+            name = key[0]
+            sizes = self._parse_index_sizes(value)
+            if name in self._names:
+                # existing
+                if sizes != self._names[name]:
+                    # the pack for name has had its indices replaced - rare but
+                    # important to handle. XXX: probably can never happen today
+                    # because the three-way merge code above does not handle it
+                    # - you may end up adding the same key twice to the new
+                    # disk index because the set values are the same, unless
+                    # the only index shows up as deleted by the set difference
+                    # - which it may. Until there is a specific test for this,
+                    # assume its broken. RBC 20071017.
+                    self.remove_pack_from_memory(self.get_pack_by_name(name))
+                    self._names[name] = sizes
+                    self.get_pack_by_name(name)
+            else:
+                # new
+                self._names[name] = sizes
+                self.get_pack_by_name(name)
 
     def _start_write_group(self):
         # Do not permit preparation for writing if we're not in a 'write lock'.
@@ -1178,6 +1236,9 @@
             raise errors.NotWriteLocked(self)
         self._new_pack = NewPack(self._upload_transport, self._index_transport,
             self._pack_transport, upload_suffix='.pack')
+        # allow writing: queue writes to a new index
+        self.revision_index.add_writable_index(self._new_pack.revision_index,
+            self._new_pack)
 
         self.repo._open_pack_tuple = (self._upload_transport, self._new_pack.random_name)
 
@@ -1188,17 +1249,22 @@
     def _abort_write_group(self):
         # FIXME: just drop the transient index.
         # forget what names there are
+        self._new_pack.abort()
+        pack = self._new_pack
+        self.revision_index.remove_index(pack.revision_index, pack)
+        self._new_pack = None
         self.reset()
 
     def _commit_write_group(self):
         if self._new_pack.data_inserted():
+            pack = self._new_pack
+            # remove the pack's write indices from the aggregate indices.
+            self.revision_index.remove_index(pack.revision_index, pack)
+            # get all the data to disk and read to use
             self._new_pack.finish()
             self.allocate(self._new_pack)
             self.repo._open_pack_tuple = None
             self._new_pack = None
-            # this refreshes the revision index map which is used to get the
-            # total revision count which triggers autopack.
-            self.refresh_revision_indices()
             if not self.autopack():
                 # when autopack takes no steps, the names list is still
                 # unsaved.
@@ -1207,7 +1273,6 @@
             self.refresh_text_index()
             self.refresh_inventory_index()
             self.refresh_signature_indices()
-            self.refresh_revision_indices()
         else:
             self._new_pack.abort()
         # forget what names there are - XXX should just refresh them and apply
@@ -1246,20 +1311,7 @@
         if getattr(self.repo, '_revision_knit', None) is not None:
             return self.repo._revision_knit
         self.repo._packs.ensure_loaded()
-        self.repo._packs.refresh_revision_indices()
-        # add write support if needed
-        if self.repo._packs._new_pack is not None:
-            # allow writing: queue writes to a new index
-            self.repo._packs.revision_index.add_index(
-                self.repo._packs._new_pack.revision_index,
-                self.repo._packs._new_pack)
-            # Updates the index to packs mapping as a side effect,
-            self.repo._packs.revision_index.knit_access.set_writer(
-                self.repo._packs._new_pack._writer,
-                self.repo._packs._new_pack.revision_index, self.repo._open_pack_tuple)
-            add_callback = self.repo._packs._new_pack.revision_index.add_nodes
-        else:
-            add_callback = None # no data-adding permitted.
+        add_callback = self.repo._packs.revision_index.add_callback
         # setup knit specific objects
         knit_index = KnitGraphIndex(
             self.repo._packs.revision_index.combined_index,
@@ -1303,14 +1355,8 @@
         # if knit indices have been handed out, add a mutable
         # index to them
         if self.repo._revision_knit is not None:
-            self.repo._packs.revision_index.add_index(
-                self.repo._packs._new_pack.revision_index,
-                self.repo._packs._new_pack)
             self.repo._revision_knit._index._add_callback = \
-                self.repo._packs._new_pack.revision_index.add_nodes
-            self.repo._packs.revision_index.knit_access.set_writer(
-                self.repo._packs._new_pack._writer,
-                self.repo._packs._new_pack.revision_index, self.repo._open_pack_tuple)
+                self.repo._packs.revision_index.add_callback
         if self.repo._signature_knit is not None:
             self.repo._signature_all_indices.insert_index(0, self.repo._packs._new_pack.signature_index)
             self.repo._signature_knit._index._add_callback = self.repo._packs._new_pack.signature_index.add_nodes

=== modified file 'bzrlib/tests/test_repository.py'
--- a/bzrlib/tests/test_repository.py	2007-10-17 01:48:01 +0000
+++ b/bzrlib/tests/test_repository.py	2007-10-17 03:30:04 +0000
@@ -823,7 +823,7 @@
             finally:
                 r1.unlock()
         finally:
-            r2.unlock()
+            pass # r2.unlock()
 
     def test_concurrent_writer_second_preserves_dropping_a_pack(self):
         format = self.get_format()
@@ -842,7 +842,7 @@
             else:
                 r1.commit_write_group()
             r1._packs.ensure_loaded()
-            name_to_drop = r1._packs.names()[0]
+            name_to_drop = r1._packs.all_packs()[0].name
         finally:
             r1.unlock()
         r1.lock_write()
@@ -858,14 +858,15 @@
                     r2.start_write_group()
                     try:
                         # in r1, drop the pack
-                        r1._packs._remove_pack_by_name(name_to_drop)
+                        r1._packs.remove_pack_from_memory(
+                            r1._packs.get_pack_by_name(name_to_drop))
                         # in r2, add a pack
                         self._add_text(r2, 'fileidr2')
                     except:
                         r2.abort_write_group()
                         raise
                 except:
-                    r1.reset()
+                    r1._packs.reset()
                     raise
                 # r1 has a changed names list, and r2 an open write groups with
                 # changes.



More information about the bazaar-commits mailing list