Rev 2814: Allow pack repositories to have multiple writers active at one time, for greater concurrency. in http://people.ubuntu.com/~robertc/baz2.0/repository
Robert Collins
robertc at robertcollins.net
Fri Oct 12 06:37:42 BST 2007
At http://people.ubuntu.com/~robertc/baz2.0/repository
------------------------------------------------------------
revno: 2814
revision-id: robertc at robertcollins.net-20071012053736-y2su3gx25bhxzeos
parent: robertc at robertcollins.net-20071011065113-88v6jetl13d4fpyv
committer: Robert Collins <robertc at robertcollins.net>
branch nick: repository
timestamp: Fri 2007-10-12 15:37:36 +1000
message:
Allow pack repositories to have multiple writers active at one time, for greater concurrency.
modified:
bzrlib/repofmt/pack_repo.py pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
bzrlib/repository.py rev_storage.py-20051111201905-119e9401e46257e3
bzrlib/tests/repository_implementations/test_break_lock.py test_break_lock.py-20060504111704-ee09a107f9f42e43
bzrlib/tests/test_repository.py test_repository.py-20060131075918-65c555b881612f4d
=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py 2007-10-11 06:51:13 +0000
+++ b/bzrlib/repofmt/pack_repo.py 2007-10-12 05:37:36 +0000
@@ -34,6 +34,7 @@
GraphIndexPrefixAdapter,
)
from bzrlib.knit import KnitGraphIndex, _PackAccess, _KnitData
+from bzrlib.osutils import rand_chars
from bzrlib.pack import ContainerWriter
from bzrlib.store import revision
""")
@@ -145,6 +146,8 @@
self.packs = []
# name:Pack mapping
self._packs = {}
+ # the previous pack-names content
+ self._packs_at_load = None
def add_pack_to_memory(self, pack):
"""Make a Pack object available to the repository to satisfy queries.
@@ -292,7 +295,7 @@
if revision_ids is not None and len(revision_ids) == 0:
# silly fetch request.
return None
- random_name = self.repo.control_files._lock.nonce + suffix
+ random_name = self._random_name() + suffix
if 'fetch' in debug.debug_flags:
plain_pack_list = ['%s%s' % (a_pack.transport.base, a_pack.name)
for a_pack in packs]
@@ -506,6 +509,14 @@
for revision_count, packs in pack_operations:
self._obsolete_packs(packs)
+ def lock_names(self):
+ """Acquire the mutex around the pack-names index.
+
+ This cannot be used in the middle of a read-only transaction on the
+ repository.
+ """
+ self.repo.control_files.lock_write()
+
def pack(self):
"""Pack the pack collection totally."""
self.ensure_loaded()
@@ -667,12 +678,12 @@
def ensure_loaded(self):
if self._names is None:
self._names = {}
- for index, key, value in \
- GraphIndex(self.transport, 'pack-names', None
- ).iter_all_entries():
+ self._packs_at_load = set()
+ for index, key, value in self._iter_disk_pack_index():
name = key[0]
sizes = [int(digits) for digits in value.split(' ')]
self._names[name] = sizes
+ self._packs_at_load.add((key, value))
def get_pack_by_name(self, name):
"""Get a Pack object by name.
@@ -711,6 +722,16 @@
self._names[name] = (revision_index_length, inventory_index_length,
text_index_length, signature_index_length)
+ def _iter_disk_pack_index(self):
+ """Iterate over the contents of the pack-names index.
+
+ This is used when loading the list from disk, and before writing to
+ detect updates from others during our write operation.
+ :return: An iterator of the index contents.
+ """
+ return GraphIndex(self.transport, 'pack-names', None
+ ).iter_all_entries()
+
def _make_index_map(self, suffix):
"""Return information on existing indexes.
@@ -805,6 +826,7 @@
self._names = None
self.packs = []
self._packs = {}
+ self._packs_at_load = None
def _make_index_to_pack_map(self, pack_details, index_suffix):
"""Given a list (transport,name), return a map of (index)->(transport, name)."""
@@ -854,19 +876,51 @@
else:
return all_index.iter_entries(key_filter)
+ def _random_name(self):
+ """Return a random name."""
+ return rand_chars(20)
+
+ def release_names(self):
+ """Release the mutex around the pack-names index."""
+ self.repo.control_files.unlock()
+
def _save_pack_names(self):
- builder = GraphIndexBuilder()
- for name, sizes in self._names.iteritems():
- builder.add_node((name, ), ' '.join(str(size) for size in sizes))
- self.transport.put_file('pack-names', builder.finish())
+ """Save the list of packs.
+
+ This will take out the mutex around the pack names list for the
+ duration of the method call. If concurrent updates have been made, a
+ three-way merge between the current list and the current in memory list
+ is performed.
+ """
+ self.lock_names()
+ try:
+ builder = GraphIndexBuilder()
+ # load the disk nodes across
+ disk_nodes = set()
+ for index, key, value in self._iter_disk_pack_index():
+ disk_nodes.add((key, value))
+ # do a two-way diff against our original content
+ current_nodes = set()
+ for name, sizes in self._names.iteritems():
+ current_nodes.add(
+ ((name, ), ' '.join(str(size) for size in sizes)))
+ deleted_nodes = self._packs_at_load - current_nodes
+ new_nodes = current_nodes - self._packs_at_load
+ disk_nodes.difference_update(deleted_nodes)
+ disk_nodes.update(new_nodes)
+ for key, value in disk_nodes:
+ builder.add_node(key, value)
+ self.transport.put_file('pack-names', builder.finish())
+ finally:
+ self.release_names()
def setup(self):
# cannot add names if we're not in a 'write lock'.
- if self.repo.control_files._lock_mode != 'w':
+ if not self.repo.is_write_locked():
raise errors.NotWriteLocked(self)
def _start_write_group(self):
- random_name = self.repo.control_files._lock.nonce
+ random_name = self._random_name()
self.repo._open_pack_tuple = (self._upload_transport, random_name + '.pack')
write_stream = self._upload_transport.open_write_stream(random_name + '.pack')
self._write_stream = write_stream
@@ -984,7 +1038,7 @@
self.repo._revision_knit = knit.KnitVersionedFile(
'revisions', self.transport.clone('..'),
self.repo.control_files._file_mode,
- create=False, access_mode=self.repo.control_files._lock_mode,
+ create=False, access_mode=self.repo._access_mode(),
index=knit_index, delta=False, factory=knit.KnitPlainFactory(),
access_method=knit_access)
return self.repo._revision_knit
@@ -1011,7 +1065,7 @@
self.repo._signature_knit = knit.KnitVersionedFile(
'signatures', self.transport.clone('..'),
self.repo.control_files._file_mode,
- create=False, access_mode=self.repo.control_files._lock_mode,
+ create=False, access_mode=self.repo._access_mode(),
index=knit_index, delta=False, factory=knit.KnitPlainFactory(),
access_method=knit_access)
return self.repo._signature_knit
@@ -1376,14 +1430,30 @@
self._revision_store = GraphKnitRevisionStore(self, index_transport, self._revision_store)
self.weave_store = GraphKnitTextStore(self, index_transport, self.weave_store)
self._inv_thunk = InventoryKnitThunk(self, index_transport)
+ # True when the repository object is 'write locked' (as opposed to the
+ # physical lock only taken out around changes to the pack-names list.)
+ # Another way to represent this would be a decorator around the control
+ # files object that presents logical locks as physical ones - if this
+ # gets ugly consider that alternative design. RBC 20071011
+ self._write_lock_count = 0
+ self._transaction = None
# for tests
self._reconcile_does_inventory_gc = False
def _abort_write_group(self):
self._packs._abort_write_group()
+ def _access_mode(self):
+ """Return 'w' or 'r' for depending on whether a write lock is active.
+
+ This method is a helper for the Knit-thunking support objects.
+ """
+ if self.is_write_locked():
+ return 'w'
+ return 'r'
+
def _refresh_data(self):
- if self.control_files._lock_count==1:
+ if self._write_lock_count == 1 or self.control_files._lock_count==1:
self._revision_store.reset()
self.weave_store.reset()
self._inv_thunk.reset()
@@ -1399,6 +1469,42 @@
def get_inventory_weave(self):
return self._inv_thunk.get_weave()
+ def get_transaction(self):
+ if self._write_lock_count:
+ return self._transaction
+ else:
+ return self.control_files.get_transaction()
+
+ def is_locked(self):
+ return self._write_lock_count or self.control_files.is_locked()
+
+ def is_write_locked(self):
+ return self._write_lock_count
+
+ def lock_write(self, token=None):
+ if not self._write_lock_count and self.is_locked():
+ raise errors.ReadOnlyError(self)
+ self._write_lock_count += 1
+ if self._write_lock_count == 1:
+ from bzrlib import transactions
+ self._transaction = transactions.WriteTransaction()
+ self._refresh_data()
+
+ def lock_read(self):
+ if self._write_lock_count:
+ self._write_lock_count += 1
+ else:
+ self.control_files.lock_read()
+ self._refresh_data()
+
+ def leave_lock_in_place(self):
+ # not supported - raise an error
+ raise NotImplementedError(self.leave_lock_in_place)
+
+ def dont_leave_lock_in_place(self):
+ # not supported - raise an error
+ raise NotImplementedError(self.dont_leave_lock_in_place)
+
@needs_write_lock
def pack(self):
"""Compress the data within the repository.
@@ -1416,6 +1522,19 @@
reconciler.reconcile()
return reconciler
+ def unlock(self):
+ if self._write_lock_count == 1 and self._write_group is not None:
+ raise errors.BzrError(
+ 'Must end write groups before releasing write locks.')
+ if self._write_lock_count:
+ self._write_lock_count -= 1
+ if not self._write_lock_count:
+ transaction = self._transaction
+ self._transaction = None
+ transaction.finish()
+ else:
+ self.control_files.unlock()
+
class RepositoryFormatPack(MetaDirRepositoryFormat):
"""Format logic for pack structured repositories.
=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py 2007-10-11 04:54:04 +0000
+++ b/bzrlib/repository.py 2007-10-12 05:37:36 +0000
@@ -657,6 +657,10 @@
def is_locked(self):
return self.control_files.is_locked()
+ def is_write_locked(self):
+ """Return True if this object is write locked."""
+ return self.is_locked() and self.control_files._lock_mode == 'w'
+
def lock_write(self, token=None):
"""Lock this repository for writing.
@@ -884,7 +888,7 @@
:return: None.
"""
- if not self.is_locked() or self.control_files._lock_mode != 'w':
+ if not self.is_write_locked():
raise errors.NotWriteLocked(self)
if self._write_group:
raise errors.BzrError('already in a write group')
=== modified file 'bzrlib/tests/repository_implementations/test_break_lock.py'
--- a/bzrlib/tests/repository_implementations/test_break_lock.py 2006-10-11 23:08:27 +0000
+++ b/bzrlib/tests/repository_implementations/test_break_lock.py 2007-10-12 05:37:36 +0000
@@ -52,10 +52,11 @@
def test_locked(self):
# break_lock when locked should
self.repo.lock_write()
- try:
- self.unused_repo.break_lock()
- except NotImplementedError:
- # repository does not support break_lock
+ self.assertEqual(self.repo.get_physical_lock_status(),
+ self.unused_repo.get_physical_lock_status())
+ if not self.unused_repo.get_physical_lock_status():
+ # 'lock_write' has not taken a physical mutex out.
self.repo.unlock()
return
+ self.unused_repo.break_lock()
self.assertRaises(errors.LockBroken, self.repo.unlock)
=== modified file 'bzrlib/tests/test_repository.py'
--- a/bzrlib/tests/test_repository.py 2007-10-11 05:00:13 +0000
+++ b/bzrlib/tests/test_repository.py 2007-10-12 05:37:36 +0000
@@ -694,7 +694,7 @@
self.assertFalse(t.has('no-working-trees'))
self.check_databases(t)
- def test_add_revision_creates_pack_indices(self):
+ def test_adding_revision_creates_pack_indices(self):
format = self.get_format()
tree = self.make_branch_and_tree('.', format=format)
trans = tree.branch.repository.bzrdir.get_repository_transport(None)
@@ -765,6 +765,167 @@
self.assertEqual(1, len(list(index.iter_all_entries())))
self.assertEqual(2, len(tree.branch.repository.all_revision_ids()))
+ def test_pack_repositories_support_multiple_write_locks(self):
+ format = self.get_format()
+ self.make_repository('.', shared=True, format=format)
+ r1 = repository.Repository.open('.')
+ r2 = repository.Repository.open('.')
+ r1.lock_write()
+ self.addCleanup(r1.unlock)
+ r2.lock_write()
+ r2.unlock()
+
+ def _add_text(self, repo, fileid):
+ """Add a text to the repository within a write group."""
+ vf =repo.weave_store.get_weave(fileid, repo.get_transaction())
+ vf.add_lines('samplerev+' + fileid, [], [])
+
+ def test_concurrent_writers_merge_new_packs(self):
+ format = self.get_format()
+ self.make_repository('.', shared=True, format=format)
+ r1 = repository.Repository.open('.')
+ r2 = repository.Repository.open('.')
+ r1.lock_write()
+ try:
+ # access enough data to load the names list
+ list(r1.all_revision_ids())
+ r2.lock_write()
+ try:
+ # access enough data to load the names list
+ list(r2.all_revision_ids())
+ r1.start_write_group()
+ try:
+ r2.start_write_group()
+ try:
+ self._add_text(r1, 'fileidr1')
+ self._add_text(r2, 'fileidr2')
+ except:
+ r2.abort_write_group()
+ raise
+ except:
+ r1.abort_write_group()
+ raise
+ # both r1 and r2 have open write groups with data in them
+ # created while the other's write group was open.
+ # Commit both which requires a merge to the pack-names.
+ try:
+ r1.commit_write_group()
+ except:
+ r2.abort_write_group()
+ raise
+ r2.commit_write_group()
+ # Now both repositories should now about both names
+ r1._packs.ensure_loaded()
+ r2._packs.ensure_loaded()
+ self.assertEqual(r1._packs.names(), r2._packs.names())
+ self.assertEqual(2, len(r1._packs.names()))
+ finally:
+ r1.unlock()
+ finally:
+ r2.unlock()
+
+ def test_concurrent_writer_second_preserves_dropping_a_pack(self):
+ format = self.get_format()
+ self.make_repository('.', shared=True, format=format)
+ r1 = repository.Repository.open('.')
+ r2 = repository.Repository.open('.')
+ # add a pack to drop
+ r1.lock_write()
+ try:
+ r1.start_write_group()
+ try:
+ self._add_text(r1, 'fileidr1')
+ except:
+ r1.abort_write_group()
+ raise
+ else:
+ r1.commit_write_group()
+ r1._packs.ensure_loaded()
+ name_to_drop = r1._packs.names()[0]
+ finally:
+ r1.unlock()
+ r1.lock_write()
+ try:
+ # access enough data to load the names list
+ list(r1.all_revision_ids())
+ r2.lock_write()
+ try:
+ # access enough data to load the names list
+ list(r2.all_revision_ids())
+ r1._packs.ensure_loaded()
+ try:
+ r2.start_write_group()
+ try:
+ # in r1, drop the pack
+ r1._packs._remove_pack_by_name(name_to_drop)
+ # in r2, add a pack
+ self._add_text(r2, 'fileidr2')
+ except:
+ r2.abort_write_group()
+ raise
+ except:
+ r1.reset()
+ raise
+ # r1 has a changed names list, and r2 an open write groups with
+ # changes.
+ # save r1, and then commit the r2 write coupe, which requires a
+ # merge to the pack-names, which should not reinstate
+ # name_to_drop
+ try:
+ r1._packs._save_pack_names()
+ r1._packs.reset()
+ except:
+ r2.abort_write_group()
+ raise
+ try:
+ r2.commit_write_group()
+ except:
+ r2.abort_write_group()
+ raise
+ # Now both repositories should now about just one name.
+ r1._packs.ensure_loaded()
+ r2._packs.ensure_loaded()
+ self.assertEqual(r1._packs.names(), r2._packs.names())
+ self.assertEqual(1, len(r1._packs.names()))
+ self.assertFalse(name_to_drop in r1._packs.names())
+ finally:
+ r1.unlock()
+ finally:
+ r2.unlock()
+
+ def test_lock_write_does_not_physically_lock(self):
+ repo = self.make_repository('.', format=self.get_format())
+ repo.lock_write()
+ self.addCleanup(repo.unlock)
+ self.assertFalse(repo.get_physical_lock_status())
+
+ def prepare_for_break_lock(self):
+ old_factory = bzrlib.ui.ui_factory
+ def restoreFactory():
+ bzrlib.ui.ui_factory = old_factory
+ self.addCleanup(restoreFactory)
+ bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
+ bzrlib.ui.ui_factory.stdin = StringIO("y\n")
+
+ def test_break_lock_breaks_physical_lock(self):
+ repo = self.make_repository('.', format=self.get_format())
+ repo._packs.lock_names()
+ repo2 = repository.Repository.open('.')
+ self.assertTrue(repo.get_physical_lock_status())
+ self.prepare_for_break_lock()
+ repo2.break_lock()
+ self.assertFalse(repo.get_physical_lock_status())
+
+ def test_broken_physical_locks_error_on_release_names_lock(self):
+ repo = self.make_repository('.', format=self.get_format())
+ repo._packs.lock_names()
+ self.assertTrue(repo.get_physical_lock_status())
+ repo2 = repository.Repository.open('.')
+ self.prepare_for_break_lock()
+ repo2.break_lock()
+ self.assertRaises(errors.LockBroken, repo._packs.release_names)
+
+
class TestExperimentalSubtrees(TestExperimentalNoSubtrees):
def get_format(self):
More information about the bazaar-commits
mailing list