Rev 2814: Allow pack repositories to have multiple writers active at one time, for greater concurrency. in http://people.ubuntu.com/~robertc/baz2.0/repository

Robert Collins robertc at robertcollins.net
Fri Oct 12 06:37:42 BST 2007


At http://people.ubuntu.com/~robertc/baz2.0/repository

------------------------------------------------------------
revno: 2814
revision-id: robertc at robertcollins.net-20071012053736-y2su3gx25bhxzeos
parent: robertc at robertcollins.net-20071011065113-88v6jetl13d4fpyv
committer: Robert Collins <robertc at robertcollins.net>
branch nick: repository
timestamp: Fri 2007-10-12 15:37:36 +1000
message:
  Allow pack repositories to have multiple writers active at one time, for greater concurrency.
modified:
  bzrlib/repofmt/pack_repo.py    pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
  bzrlib/repository.py           rev_storage.py-20051111201905-119e9401e46257e3
  bzrlib/tests/repository_implementations/test_break_lock.py test_break_lock.py-20060504111704-ee09a107f9f42e43
  bzrlib/tests/test_repository.py test_repository.py-20060131075918-65c555b881612f4d
=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py	2007-10-11 06:51:13 +0000
+++ b/bzrlib/repofmt/pack_repo.py	2007-10-12 05:37:36 +0000
@@ -34,6 +34,7 @@
     GraphIndexPrefixAdapter,
     )
 from bzrlib.knit import KnitGraphIndex, _PackAccess, _KnitData
+from bzrlib.osutils import rand_chars
 from bzrlib.pack import ContainerWriter
 from bzrlib.store import revision
 """)
@@ -145,6 +146,8 @@
         self.packs = []
         # name:Pack mapping
         self._packs = {}
+        # the previous pack-names content
+        self._packs_at_load = None
 
     def add_pack_to_memory(self, pack):
         """Make a Pack object available to the repository to satisfy queries.
@@ -292,7 +295,7 @@
         if revision_ids is not None and len(revision_ids) == 0:
             # silly fetch request.
             return None
-        random_name = self.repo.control_files._lock.nonce + suffix
+        random_name = self._random_name() + suffix
         if 'fetch' in debug.debug_flags:
             plain_pack_list = ['%s%s' % (a_pack.transport.base, a_pack.name)
                 for a_pack in packs]
@@ -506,6 +509,14 @@
         for revision_count, packs in pack_operations:
             self._obsolete_packs(packs)
 
+    def lock_names(self):
+        """Acquire the mutex around the pack-names index.
+        
+        This cannot be used in the middle of a read-only transaction on the
+        repository.
+        """
+        self.repo.control_files.lock_write()
+
     def pack(self):
         """Pack the pack collection totally."""
         self.ensure_loaded()
@@ -667,12 +678,12 @@
     def ensure_loaded(self):
         if self._names is None:
             self._names = {}
-            for index, key, value in \
-                GraphIndex(self.transport, 'pack-names', None
-                    ).iter_all_entries():
+            self._packs_at_load = set()
+            for index, key, value in self._iter_disk_pack_index():
                 name = key[0]
                 sizes = [int(digits) for digits in value.split(' ')]
                 self._names[name] = sizes
+                self._packs_at_load.add((key, value))
 
     def get_pack_by_name(self, name):
         """Get a Pack object by name.
@@ -711,6 +722,16 @@
         self._names[name] = (revision_index_length, inventory_index_length,
             text_index_length, signature_index_length)
 
+    def _iter_disk_pack_index(self):
+        """Iterate over the contents of the pack-names index.
+        
+        This is used when loading the list from disk, and before writing to
+        detect updates from others during our write operation.
+        :return: An iterator of the index contents.
+        """
+        return GraphIndex(self.transport, 'pack-names', None
+                ).iter_all_entries()
+
     def _make_index_map(self, suffix):
         """Return information on existing indexes.
 
@@ -805,6 +826,7 @@
         self._names = None
         self.packs = []
         self._packs = {}
+        self._packs_at_load = None
 
     def _make_index_to_pack_map(self, pack_details, index_suffix):
         """Given a list (transport,name), return a map of (index)->(transport, name)."""
@@ -854,19 +876,51 @@
         else:
             return all_index.iter_entries(key_filter)
 
+    def _random_name(self):
+        """Return a random name."""
+        return rand_chars(20)
+
+    def release_names(self):
+        """Release the mutex around the pack-names index."""
+        self.repo.control_files.unlock()
+
     def _save_pack_names(self):
-        builder = GraphIndexBuilder()
-        for name, sizes in self._names.iteritems():
-            builder.add_node((name, ), ' '.join(str(size) for size in sizes))
-        self.transport.put_file('pack-names', builder.finish())
+        """Save the list of packs.
+
+        This will take out the mutex around the pack names list for the
+        duration of the method call. If concurrent updates have been made, a
+        three-way merge between the current list and the current in memory list
+        is performed.
+        """
+        self.lock_names()
+        try:
+            builder = GraphIndexBuilder()
+            # load the disk nodes across
+            disk_nodes = set()
+            for index, key, value in self._iter_disk_pack_index():
+                disk_nodes.add((key, value))
+            # do a two-way diff against our original content
+            current_nodes = set()
+            for name, sizes in self._names.iteritems():
+                current_nodes.add(
+                    ((name, ), ' '.join(str(size) for size in sizes)))
+            deleted_nodes = self._packs_at_load - current_nodes
+            new_nodes = current_nodes - self._packs_at_load
+            disk_nodes.difference_update(deleted_nodes)
+            disk_nodes.update(new_nodes)
+            for key, value in disk_nodes:
+                builder.add_node(key, value)
+            self.transport.put_file('pack-names', builder.finish())
+        finally:
+            self.release_names()
 
     def setup(self):
         # cannot add names if we're not in a 'write lock'.
-        if self.repo.control_files._lock_mode != 'w':
+        if not self.repo.is_write_locked():
             raise errors.NotWriteLocked(self)
 
     def _start_write_group(self):
-        random_name = self.repo.control_files._lock.nonce
+        random_name = self._random_name()
         self.repo._open_pack_tuple = (self._upload_transport, random_name + '.pack')
         write_stream = self._upload_transport.open_write_stream(random_name + '.pack')
         self._write_stream = write_stream
@@ -984,7 +1038,7 @@
         self.repo._revision_knit = knit.KnitVersionedFile(
             'revisions', self.transport.clone('..'),
             self.repo.control_files._file_mode,
-            create=False, access_mode=self.repo.control_files._lock_mode,
+            create=False, access_mode=self.repo._access_mode(),
             index=knit_index, delta=False, factory=knit.KnitPlainFactory(),
             access_method=knit_access)
         return self.repo._revision_knit
@@ -1011,7 +1065,7 @@
         self.repo._signature_knit = knit.KnitVersionedFile(
             'signatures', self.transport.clone('..'),
             self.repo.control_files._file_mode,
-            create=False, access_mode=self.repo.control_files._lock_mode,
+            create=False, access_mode=self.repo._access_mode(),
             index=knit_index, delta=False, factory=knit.KnitPlainFactory(),
             access_method=knit_access)
         return self.repo._signature_knit
@@ -1376,14 +1430,30 @@
         self._revision_store = GraphKnitRevisionStore(self, index_transport, self._revision_store)
         self.weave_store = GraphKnitTextStore(self, index_transport, self.weave_store)
         self._inv_thunk = InventoryKnitThunk(self, index_transport)
+        # True when the repository object is 'write locked' (as opposed to the
+        # physical lock only taken out around changes to the pack-names list.) 
+        # Another way to represent this would be a decorator around the control
+        # files object that presents logical locks as physical ones - if this
+        # gets ugly consider that alternative design. RBC 20071011
+        self._write_lock_count = 0
+        self._transaction = None
         # for tests
         self._reconcile_does_inventory_gc = False
 
     def _abort_write_group(self):
         self._packs._abort_write_group()
 
+    def _access_mode(self):
+        """Return 'w' or 'r' for depending on whether a write lock is active.
+        
+        This method is a helper for the Knit-thunking support objects.
+        """
+        if self.is_write_locked():
+            return 'w'
+        return 'r'
+
     def _refresh_data(self):
-        if self.control_files._lock_count==1:
+        if self._write_lock_count == 1 or self.control_files._lock_count==1:
             self._revision_store.reset()
             self.weave_store.reset()
             self._inv_thunk.reset()
@@ -1399,6 +1469,42 @@
     def get_inventory_weave(self):
         return self._inv_thunk.get_weave()
 
+    def get_transaction(self):
+        if self._write_lock_count:
+            return self._transaction
+        else:
+            return self.control_files.get_transaction()
+
+    def is_locked(self):
+        return self._write_lock_count or self.control_files.is_locked()
+
+    def is_write_locked(self):
+        return self._write_lock_count
+
+    def lock_write(self, token=None):
+        if not self._write_lock_count and self.is_locked():
+            raise errors.ReadOnlyError(self)
+        self._write_lock_count += 1
+        if self._write_lock_count == 1:
+            from bzrlib import transactions
+            self._transaction = transactions.WriteTransaction()
+        self._refresh_data()
+
+    def lock_read(self):
+        if self._write_lock_count:
+            self._write_lock_count += 1
+        else:
+            self.control_files.lock_read()
+        self._refresh_data()
+
+    def leave_lock_in_place(self):
+        # not supported - raise an error
+        raise NotImplementedError(self.leave_lock_in_place)
+
+    def dont_leave_lock_in_place(self):
+        # not supported - raise an error
+        raise NotImplementedError(self.dont_leave_lock_in_place)
+
     @needs_write_lock
     def pack(self):
         """Compress the data within the repository.
@@ -1416,6 +1522,19 @@
         reconciler.reconcile()
         return reconciler
 
+    def unlock(self):
+        if self._write_lock_count == 1 and self._write_group is not None:
+            raise errors.BzrError(
+                'Must end write groups before releasing write locks.')
+        if self._write_lock_count:
+            self._write_lock_count -= 1
+            if not self._write_lock_count:
+                transaction = self._transaction
+                self._transaction = None
+                transaction.finish()
+        else:
+            self.control_files.unlock()
+
 
 class RepositoryFormatPack(MetaDirRepositoryFormat):
     """Format logic for pack structured repositories.

=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py	2007-10-11 04:54:04 +0000
+++ b/bzrlib/repository.py	2007-10-12 05:37:36 +0000
@@ -657,6 +657,10 @@
     def is_locked(self):
         return self.control_files.is_locked()
 
+    def is_write_locked(self):
+        """Return True if this object is write locked."""
+        return self.is_locked() and self.control_files._lock_mode == 'w'
+
     def lock_write(self, token=None):
         """Lock this repository for writing.
 
@@ -884,7 +888,7 @@
 
         :return: None.
         """
-        if not self.is_locked() or self.control_files._lock_mode != 'w':
+        if not self.is_write_locked():
             raise errors.NotWriteLocked(self)
         if self._write_group:
             raise errors.BzrError('already in a write group')

=== modified file 'bzrlib/tests/repository_implementations/test_break_lock.py'
--- a/bzrlib/tests/repository_implementations/test_break_lock.py	2006-10-11 23:08:27 +0000
+++ b/bzrlib/tests/repository_implementations/test_break_lock.py	2007-10-12 05:37:36 +0000
@@ -52,10 +52,11 @@
     def test_locked(self):
         # break_lock when locked should
         self.repo.lock_write()
-        try:
-            self.unused_repo.break_lock()
-        except NotImplementedError:
-            # repository does not support break_lock
+        self.assertEqual(self.repo.get_physical_lock_status(),
+            self.unused_repo.get_physical_lock_status())
+        if not self.unused_repo.get_physical_lock_status():
+            # 'lock_write' has not taken a physical mutex out.
             self.repo.unlock()
             return
+        self.unused_repo.break_lock()
         self.assertRaises(errors.LockBroken, self.repo.unlock)

=== modified file 'bzrlib/tests/test_repository.py'
--- a/bzrlib/tests/test_repository.py	2007-10-11 05:00:13 +0000
+++ b/bzrlib/tests/test_repository.py	2007-10-12 05:37:36 +0000
@@ -694,7 +694,7 @@
         self.assertFalse(t.has('no-working-trees'))
         self.check_databases(t)
 
-    def test_add_revision_creates_pack_indices(self):
+    def test_adding_revision_creates_pack_indices(self):
         format = self.get_format()
         tree = self.make_branch_and_tree('.', format=format)
         trans = tree.branch.repository.bzrdir.get_repository_transport(None)
@@ -765,6 +765,167 @@
         self.assertEqual(1, len(list(index.iter_all_entries())))
         self.assertEqual(2, len(tree.branch.repository.all_revision_ids()))
 
+    def test_pack_repositories_support_multiple_write_locks(self):
+        format = self.get_format()
+        self.make_repository('.', shared=True, format=format)
+        r1 = repository.Repository.open('.')
+        r2 = repository.Repository.open('.')
+        r1.lock_write()
+        self.addCleanup(r1.unlock)
+        r2.lock_write()
+        r2.unlock()
+
+    def _add_text(self, repo, fileid):
+        """Add a text to the repository within a write group."""
+        vf =repo.weave_store.get_weave(fileid, repo.get_transaction())
+        vf.add_lines('samplerev+' + fileid, [], [])
+
+    def test_concurrent_writers_merge_new_packs(self):
+        format = self.get_format()
+        self.make_repository('.', shared=True, format=format)
+        r1 = repository.Repository.open('.')
+        r2 = repository.Repository.open('.')
+        r1.lock_write()
+        try:
+            # access enough data to load the names list
+            list(r1.all_revision_ids())
+            r2.lock_write()
+            try:
+                # access enough data to load the names list
+                list(r2.all_revision_ids())
+                r1.start_write_group()
+                try:
+                    r2.start_write_group()
+                    try:
+                        self._add_text(r1, 'fileidr1')
+                        self._add_text(r2, 'fileidr2')
+                    except:
+                        r2.abort_write_group()
+                        raise
+                except:
+                    r1.abort_write_group()
+                    raise
+                # both r1 and r2 have open write groups with data in them
+                # created while the other's write group was open.
+                # Commit both which requires a merge to the pack-names.
+                try:
+                    r1.commit_write_group()
+                except:
+                    r2.abort_write_group()
+                    raise
+                r2.commit_write_group()
+                # Now both repositories should now about both names
+                r1._packs.ensure_loaded()
+                r2._packs.ensure_loaded()
+                self.assertEqual(r1._packs.names(), r2._packs.names())
+                self.assertEqual(2, len(r1._packs.names()))
+            finally:
+                r1.unlock()
+        finally:
+            r2.unlock()
+
+    def test_concurrent_writer_second_preserves_dropping_a_pack(self):
+        format = self.get_format()
+        self.make_repository('.', shared=True, format=format)
+        r1 = repository.Repository.open('.')
+        r2 = repository.Repository.open('.')
+        # add a pack to drop
+        r1.lock_write()
+        try:
+            r1.start_write_group()
+            try:
+                self._add_text(r1, 'fileidr1')
+            except:
+                r1.abort_write_group()
+                raise
+            else:
+                r1.commit_write_group()
+            r1._packs.ensure_loaded()
+            name_to_drop = r1._packs.names()[0]
+        finally:
+            r1.unlock()
+        r1.lock_write()
+        try:
+            # access enough data to load the names list
+            list(r1.all_revision_ids())
+            r2.lock_write()
+            try:
+                # access enough data to load the names list
+                list(r2.all_revision_ids())
+                r1._packs.ensure_loaded()
+                try:
+                    r2.start_write_group()
+                    try:
+                        # in r1, drop the pack
+                        r1._packs._remove_pack_by_name(name_to_drop)
+                        # in r2, add a pack
+                        self._add_text(r2, 'fileidr2')
+                    except:
+                        r2.abort_write_group()
+                        raise
+                except:
+                    r1.reset()
+                    raise
+                # r1 has a changed names list, and r2 an open write groups with
+                # changes.
+                # save r1, and then commit the r2 write coupe, which requires a
+                # merge to the pack-names, which should not reinstate
+                # name_to_drop
+                try:
+                    r1._packs._save_pack_names()
+                    r1._packs.reset()
+                except:
+                    r2.abort_write_group()
+                    raise
+                try:
+                    r2.commit_write_group()
+                except:
+                    r2.abort_write_group()
+                    raise
+                # Now both repositories should now about just one name.
+                r1._packs.ensure_loaded()
+                r2._packs.ensure_loaded()
+                self.assertEqual(r1._packs.names(), r2._packs.names())
+                self.assertEqual(1, len(r1._packs.names()))
+                self.assertFalse(name_to_drop in r1._packs.names())
+            finally:
+                r1.unlock()
+        finally:
+            r2.unlock()
+
+    def test_lock_write_does_not_physically_lock(self):
+        repo = self.make_repository('.', format=self.get_format())
+        repo.lock_write()
+        self.addCleanup(repo.unlock)
+        self.assertFalse(repo.get_physical_lock_status())
+
+    def prepare_for_break_lock(self):
+        old_factory = bzrlib.ui.ui_factory
+        def restoreFactory():
+            bzrlib.ui.ui_factory = old_factory
+        self.addCleanup(restoreFactory)
+        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
+        bzrlib.ui.ui_factory.stdin = StringIO("y\n")
+
+    def test_break_lock_breaks_physical_lock(self):
+        repo = self.make_repository('.', format=self.get_format())
+        repo._packs.lock_names()
+        repo2 = repository.Repository.open('.')
+        self.assertTrue(repo.get_physical_lock_status())
+        self.prepare_for_break_lock()
+        repo2.break_lock()
+        self.assertFalse(repo.get_physical_lock_status())
+
+    def test_broken_physical_locks_error_on_release_names_lock(self):
+        repo = self.make_repository('.', format=self.get_format())
+        repo._packs.lock_names()
+        self.assertTrue(repo.get_physical_lock_status())
+        repo2 = repository.Repository.open('.')
+        self.prepare_for_break_lock()
+        repo2.break_lock()
+        self.assertRaises(errors.LockBroken, repo._packs.release_names)
+
+
 class TestExperimentalSubtrees(TestExperimentalNoSubtrees):
 
     def get_format(self):



More information about the bazaar-commits mailing list