Rev 4738: (jam) More robust overlapping autopacks handling in file:///home/pqm/archives/thelove/bzr/2.0/

Canonical.com Patch Queue Manager pqm at pqm.ubuntu.com
Mon Mar 1 16:42:12 GMT 2010


At file:///home/pqm/archives/thelove/bzr/2.0/

------------------------------------------------------------
revno: 4738 [merge]
revision-id: pqm at pqm.ubuntu.com-20100301164210-xxeli4qqjkkli083
parent: pqm at pqm.ubuntu.com-20100301111518-5z90yv8zjgjcmbam
parent: v.ladeuil+lp at free.fr-20100301153054-nccxlxvw1q7wskum
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: 2.0
timestamp: Mon 2010-03-01 16:42:10 +0000
message:
  (jam) More robust overlapping autopacks handling
modified:
  NEWS                           NEWS-20050323055033-4e00b5db738777ff
  bzrlib/repofmt/groupcompress_repo.py repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
  bzrlib/repofmt/pack_repo.py    pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
  bzrlib/tests/test_repository.py test_repository.py-20060131075918-65c555b881612f4d
=== modified file 'NEWS'
--- a/NEWS	2010-03-01 11:15:18 +0000
+++ b/NEWS	2010-03-01 15:30:54 +0000
@@ -17,6 +17,13 @@
 * Avoid ``malloc(0)`` in ``patiencediff``, which is non-portable.
   (Martin Pool, #331095)
 
+* Concurrent autopacking is more resilient to already-renamed pack files.
+  If we find that a file we are about to obsolete is already obsoleted, we
+  do not try to rename it, and we leave the file in ``obsolete_packs``.
+  The code is also fault tolerant if a file goes missing, assuming that
+  another process already removed the file.
+  (John Arbash Meinel, Gareth White, #507557)
+
 * Cope with the lockdir ``held/info`` file being empty, which seems to
   happen fairly often if the process is suddenly interrupted while taking
   a lock.

=== modified file 'bzrlib/repofmt/groupcompress_repo.py'
--- a/bzrlib/repofmt/groupcompress_repo.py	2009-12-16 20:20:04 +0000
+++ b/bzrlib/repofmt/groupcompress_repo.py	2010-01-21 16:59:21 +0000
@@ -1,4 +1,4 @@
-# Copyright (C) 2008, 2009 Canonical Ltd
+# Copyright (C) 2008, 2009, 2010 Canonical Ltd
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -704,10 +704,11 @@
                 self._remove_pack_from_memory(pack)
         # record the newly available packs and stop advertising the old
         # packs
-        result = self._save_pack_names(clear_obsolete_packs=True)
-        # Move the old packs out of the way now they are no longer referenced.
-        for revision_count, packs in pack_operations:
-            self._obsolete_packs(packs)
+        to_be_obsoleted = []
+        for _, packs in pack_operations:
+            to_be_obsoleted.extend(packs)
+        result = self._save_pack_names(clear_obsolete_packs=True,
+                                       obsolete_packs=to_be_obsoleted)
         return result
 
 

=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py	2010-01-16 20:46:03 +0000
+++ b/bzrlib/repofmt/pack_repo.py	2010-01-21 19:26:40 +0000
@@ -1541,10 +1541,11 @@
                 self._remove_pack_from_memory(pack)
         # record the newly available packs and stop advertising the old
         # packs
-        result = self._save_pack_names(clear_obsolete_packs=True)
-        # Move the old packs out of the way now they are no longer referenced.
-        for revision_count, packs in pack_operations:
-            self._obsolete_packs(packs)
+        to_be_obsoleted = []
+        for _, packs in pack_operations:
+            to_be_obsoleted.extend(packs)
+        result = self._save_pack_names(clear_obsolete_packs=True,
+                                       obsolete_packs=to_be_obsoleted)
         return result
 
     def _flush_new_pack(self):
@@ -1784,8 +1785,13 @@
         :param return: None.
         """
         for pack in packs:
-            pack.pack_transport.rename(pack.file_name(),
-                '../obsolete_packs/' + pack.file_name())
+            try:
+                pack.pack_transport.rename(pack.file_name(),
+                    '../obsolete_packs/' + pack.file_name())
+            except (errors.PathError, errors.TransportError), e:
+                # TODO: Should these be warnings or mutters?
+                mutter("couldn't rename obsolete pack, skipping it:\n%s"
+                       % (e,))
             # TODO: Probably needs to know all possible indices for this pack
             # - or maybe list the directory and move all indices matching this
             # name whether we recognize it or not?
@@ -1793,8 +1799,12 @@
             if self.chk_index is not None:
                 suffixes.append('.cix')
             for suffix in suffixes:
-                self._index_transport.rename(pack.name + suffix,
-                    '../obsolete_packs/' + pack.name + suffix)
+                try:
+                    self._index_transport.rename(pack.name + suffix,
+                        '../obsolete_packs/' + pack.name + suffix)
+                except (errors.PathError, errors.TransportError), e:
+                    mutter("couldn't rename obsolete index, skipping it:\n%s"
+                           % (e,))
 
     def pack_distribution(self, total_revisions):
         """Generate a list of the number of revisions to put in each pack.
@@ -1872,6 +1882,7 @@
         disk_nodes = set()
         for index, key, value in self._iter_disk_pack_index():
             disk_nodes.add((key, value))
+        orig_disk_nodes = set(disk_nodes)
 
         # do a two-way diff against our original content
         current_nodes = set()
@@ -1890,7 +1901,7 @@
         disk_nodes.difference_update(deleted_nodes)
         disk_nodes.update(new_nodes)
 
-        return disk_nodes, deleted_nodes, new_nodes
+        return disk_nodes, deleted_nodes, new_nodes, orig_disk_nodes
 
     def _syncronize_pack_names_from_disk_nodes(self, disk_nodes):
         """Given the correct set of pack files, update our saved info.
@@ -1936,7 +1947,7 @@
                 added.append(name)
         return removed, added, modified
 
-    def _save_pack_names(self, clear_obsolete_packs=False):
+    def _save_pack_names(self, clear_obsolete_packs=False, obsolete_packs=None):
         """Save the list of packs.
 
         This will take out the mutex around the pack names list for the
@@ -1946,12 +1957,16 @@
 
         :param clear_obsolete_packs: If True, clear out the contents of the
             obsolete_packs directory.
+        :param obsolete_packs: Packs that are obsolete once the new pack-names
+            file has been written.
         :return: A list of the names saved that were not previously on disk.
         """
+        already_obsolete = []
         self.lock_names()
         try:
             builder = self._index_builder_class()
-            disk_nodes, deleted_nodes, new_nodes = self._diff_pack_names()
+            (disk_nodes, deleted_nodes, new_nodes,
+             orig_disk_nodes) = self._diff_pack_names()
             # TODO: handle same-name, index-size-changes here -
             # e.g. use the value from disk, not ours, *unless* we're the one
             # changing it.
@@ -1959,14 +1974,25 @@
                 builder.add_node(key, value)
             self.transport.put_file('pack-names', builder.finish(),
                 mode=self.repo.bzrdir._get_file_mode())
-            # move the baseline forward
             self._packs_at_load = disk_nodes
             if clear_obsolete_packs:
-                self._clear_obsolete_packs()
+                to_preserve = None
+                if obsolete_packs:
+                    to_preserve = set([o.name for o in obsolete_packs])
+                already_obsolete = self._clear_obsolete_packs(to_preserve)
         finally:
             self._unlock_names()
         # synchronise the memory packs list with what we just wrote:
         self._syncronize_pack_names_from_disk_nodes(disk_nodes)
+        if obsolete_packs:
+            # TODO: We could add one more condition here. "if o.name not in
+            #       orig_disk_nodes and o != the new_pack we haven't written to
+            #       disk yet. However, the new pack object is not easily
+            #       accessible here (it would have to be passed through the
+            #       autopacking code, etc.)
+            obsolete_packs = [o for o in obsolete_packs
+                              if o.name not in already_obsolete]
+            self._obsolete_packs(obsolete_packs)
         return [new_node[0][0] for new_node in new_nodes]
 
     def reload_pack_names(self):
@@ -1987,13 +2013,12 @@
         if first_read:
             return True
         # out the new value.
-        disk_nodes, deleted_nodes, new_nodes = self._diff_pack_names()
+        (disk_nodes, deleted_nodes, new_nodes,
+         orig_disk_nodes) = self._diff_pack_names()
         # _packs_at_load is meant to be the explicit list of names in
         # 'pack-names' at then start. As such, it should not contain any
         # pending names that haven't been written out yet.
-        pack_names_nodes = disk_nodes.difference(new_nodes)
-        pack_names_nodes.update(deleted_nodes)
-        self._packs_at_load = pack_names_nodes
+        self._packs_at_load = orig_disk_nodes
         (removed, added,
          modified) = self._syncronize_pack_names_from_disk_nodes(disk_nodes)
         if removed or added or modified:
@@ -2008,15 +2033,28 @@
             raise
         raise errors.RetryAutopack(self.repo, False, sys.exc_info())
 
-    def _clear_obsolete_packs(self):
+    def _clear_obsolete_packs(self, preserve=None):
         """Delete everything from the obsolete-packs directory.
+
+        :return: A list of pack identifiers (the filename without '.pack') that
+            were found in obsolete_packs.
         """
+        found = []
         obsolete_pack_transport = self.transport.clone('obsolete_packs')
+        if preserve is None:
+            preserve = set()
         for filename in obsolete_pack_transport.list_dir('.'):
+            name, ext = osutils.splitext(filename)
+            if ext == '.pack':
+                found.append(name)
+            if name in preserve:
+                continue
             try:
                 obsolete_pack_transport.delete(filename)
             except (errors.PathError, errors.TransportError), e:
-                warning("couldn't delete obsolete pack, skipping it:\n%s" % (e,))
+                warning("couldn't delete obsolete pack, skipping it:\n%s"
+                        % (e,))
+        return found
 
     def _start_write_group(self):
         # Do not permit preparation for writing if we're not in a 'write lock'.

=== modified file 'bzrlib/tests/test_repository.py'
--- a/bzrlib/tests/test_repository.py	2010-01-16 20:46:03 +0000
+++ b/bzrlib/tests/test_repository.py	2010-01-21 19:24:26 +0000
@@ -1082,6 +1082,31 @@
         packs.ensure_loaded()
         return tree, r, packs, [rev1, rev2, rev3]
 
+    def test__clear_obsolete_packs(self):
+        packs = self.get_packs()
+        obsolete_pack_trans = packs.transport.clone('obsolete_packs')
+        obsolete_pack_trans.put_bytes('a-pack.pack', 'content\n')
+        obsolete_pack_trans.put_bytes('a-pack.rix', 'content\n')
+        obsolete_pack_trans.put_bytes('a-pack.iix', 'content\n')
+        obsolete_pack_trans.put_bytes('another-pack.pack', 'foo\n')
+        obsolete_pack_trans.put_bytes('not-a-pack.rix', 'foo\n')
+        res = packs._clear_obsolete_packs()
+        self.assertEqual(['a-pack', 'another-pack'], sorted(res))
+        self.assertEqual([], obsolete_pack_trans.list_dir('.'))
+
+    def test__clear_obsolete_packs_preserve(self):
+        packs = self.get_packs()
+        obsolete_pack_trans = packs.transport.clone('obsolete_packs')
+        obsolete_pack_trans.put_bytes('a-pack.pack', 'content\n')
+        obsolete_pack_trans.put_bytes('a-pack.rix', 'content\n')
+        obsolete_pack_trans.put_bytes('a-pack.iix', 'content\n')
+        obsolete_pack_trans.put_bytes('another-pack.pack', 'foo\n')
+        obsolete_pack_trans.put_bytes('not-a-pack.rix', 'foo\n')
+        res = packs._clear_obsolete_packs(preserve=set(['a-pack']))
+        self.assertEqual(['a-pack', 'another-pack'], sorted(res))
+        self.assertEqual(['a-pack.iix', 'a-pack.pack', 'a-pack.rix'],
+                         sorted(obsolete_pack_trans.list_dir('.')))
+
     def test__max_pack_count(self):
         """The maximum pack count is a function of the number of revisions."""
         # no revisions - one pack, so that we can have a revision free repo
@@ -1107,6 +1132,28 @@
         # check some arbitrary big numbers
         self.assertEqual(25, packs._max_pack_count(112894))
 
+    def test__obsolete_packs(self):
+        tree, r, packs, revs = self.make_packs_and_alt_repo(write_lock=True)
+        names = packs.names()
+        pack = packs.get_pack_by_name(names[0])
+        # Schedule this one for removal
+        packs._remove_pack_from_memory(pack)
+        # Simulate a concurrent update by renaming the .pack file and one of
+        # the indices
+        packs.transport.rename('packs/%s.pack' % (names[0],),
+                               'obsolete_packs/%s.pack' % (names[0],))
+        packs.transport.rename('indices/%s.iix' % (names[0],),
+                               'obsolete_packs/%s.iix' % (names[0],))
+        # Now trigger the obsoletion, and ensure that all the remaining files
+        # are still renamed
+        packs._obsolete_packs([pack])
+        self.assertEqual([n + '.pack' for n in names[1:]],
+                         sorted(packs._pack_transport.list_dir('.')))
+        # names[0] should not be present in the index anymore
+        self.assertEqual(names[1:],
+            sorted(set([osutils.splitext(n)[0] for n in
+                        packs._index_transport.list_dir('.')])))
+
     def test_pack_distribution_zero(self):
         packs = self.get_packs()
         self.assertEqual([0], packs.pack_distribution(0))
@@ -1300,7 +1347,7 @@
         removed_pack = packs.get_pack_by_name(to_remove_name)
         packs._remove_pack_from_memory(removed_pack)
         names = packs.names()
-        all_nodes, deleted_nodes, new_nodes = packs._diff_pack_names()
+        all_nodes, deleted_nodes, new_nodes, _ = packs._diff_pack_names()
         new_names = set([x[0][0] for x in new_nodes])
         self.assertEqual(names, sorted([x[0][0] for x in all_nodes]))
         self.assertEqual(set(names) - set(orig_names), new_names)
@@ -1311,7 +1358,7 @@
         reloaded_names = packs.names()
         self.assertEqual(orig_at_load, packs._packs_at_load)
         self.assertEqual(names, reloaded_names)
-        all_nodes, deleted_nodes, new_nodes = packs._diff_pack_names()
+        all_nodes, deleted_nodes, new_nodes, _ = packs._diff_pack_names()
         new_names = set([x[0][0] for x in new_nodes])
         self.assertEqual(names, sorted([x[0][0] for x in all_nodes]))
         self.assertEqual(set(names) - set(orig_names), new_names)
@@ -1319,6 +1366,21 @@
         self.assertEqual([to_remove_name],
                          sorted([x[0][0] for x in deleted_nodes]))
 
+    def test_autopack_obsoletes_new_pack(self):
+        tree, r, packs, revs = self.make_packs_and_alt_repo(write_lock=True)
+        packs._max_pack_count = lambda x: 1
+        packs.pack_distribution = lambda x: [10]
+        r.start_write_group()
+        r.revisions.insert_record_stream([versionedfile.FulltextContentFactory(
+            ('bogus-rev',), (), None, 'bogus-content\n')])
+        # This should trigger an autopack, which will combine everything into a
+        # single pack file.
+        new_names = r.commit_write_group()
+        names = packs.names()
+        self.assertEqual(1, len(names))
+        self.assertEqual([names[0] + '.pack'],
+                         packs._pack_transport.list_dir('.'))
+
     def test_autopack_reloads_and_stops(self):
         tree, r, packs, revs = self.make_packs_and_alt_repo(write_lock=True)
         # After we have determined what needs to be autopacked, trigger a
@@ -1336,6 +1398,38 @@
         self.assertEqual(tree.branch.repository._pack_collection.names(),
                          packs.names())
 
+    def test__save_pack_names(self):
+        tree, r, packs, revs = self.make_packs_and_alt_repo(write_lock=True)
+        names = packs.names()
+        pack = packs.get_pack_by_name(names[0])
+        packs._remove_pack_from_memory(pack)
+        packs._save_pack_names(obsolete_packs=[pack])
+        cur_packs = packs._pack_transport.list_dir('.')
+        self.assertEqual([n + '.pack' for n in names[1:]], sorted(cur_packs))
+        # obsolete_packs will also have stuff like .rix and .iix present.
+        obsolete_packs = packs.transport.list_dir('obsolete_packs')
+        obsolete_names = set([osutils.splitext(n)[0] for n in obsolete_packs])
+        self.assertEqual([pack.name], sorted(obsolete_names))
+
+    def test__save_pack_names_already_obsoleted(self):
+        tree, r, packs, revs = self.make_packs_and_alt_repo(write_lock=True)
+        names = packs.names()
+        pack = packs.get_pack_by_name(names[0])
+        packs._remove_pack_from_memory(pack)
+        # We are going to simulate a concurrent autopack by manually obsoleting
+        # the pack directly.
+        packs._obsolete_packs([pack])
+        packs._save_pack_names(clear_obsolete_packs=True,
+                               obsolete_packs=[pack])
+        cur_packs = packs._pack_transport.list_dir('.')
+        self.assertEqual([n + '.pack' for n in names[1:]], sorted(cur_packs))
+        # Note that while we set clear_obsolete_packs=True, it should not
+        # delete a pack file that we have also scheduled for obsoletion.
+        obsolete_packs = packs.transport.list_dir('obsolete_packs')
+        obsolete_names = set([osutils.splitext(n)[0] for n in obsolete_packs])
+        self.assertEqual([pack.name], sorted(obsolete_names))
+
+
 
 class TestPack(TestCaseWithTransport):
     """Tests for the Pack object."""




More information about the bazaar-commits mailing list