Rev 2835: Revision index management looking sane for packs. in http://people.ubuntu.com/~robertc/baz2.0/repository
Robert Collins
robertc at robertcollins.net
Wed Oct 17 04:30:25 BST 2007
At http://people.ubuntu.com/~robertc/baz2.0/repository
------------------------------------------------------------
revno: 2835
revision-id: robertc at robertcollins.net-20071017033004-ahzli9m9pmpoqwji
parent: robertc at robertcollins.net-20071017014801-c6u8dl03zk1gko3r
committer: Robert Collins <robertc at robertcollins.net>
branch nick: repository
timestamp: Wed 2007-10-17 13:30:04 +1000
message:
Revision index management looking sane for packs.
modified:
bzrlib/repofmt/pack_repo.py pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
bzrlib/tests/test_repository.py test_repository.py-20060131075918-65c555b881612f4d
=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py 2007-10-17 01:48:01 +0000
+++ b/bzrlib/repofmt/pack_repo.py 2007-10-17 03:30:04 +0000
@@ -395,11 +395,14 @@
self.index_to_pack.update(index_to_pack)
# XXX: API break - clearly a 'replace' method would be good?
self.combined_index._indices[:] = indices
+ # the current add nodes callback for the current writable index if
+ # there is one.
+ self.add_callback = None
def add_index(self, index, pack):
"""Add index to the aggregate, which is an index for Pack pack.
- :param index: An index from the pack object.
+ :param index: An index from the pack parameter.
:param pack: A Pack instance.
"""
# expose it to the index map
@@ -407,6 +410,39 @@
# put it at the front of the linear index list
self.combined_index.insert_index(0, index)
+ def add_writable_index(self, index, pack):
+ """Add an index which is able to have data added to it.
+
+ :param index: An index from the pack parameter.
+ :param pack: A Pack instance.
+ """
+ assert self.add_callback is None
+ # allow writing: queue writes to a new index
+ self.add_index(index, pack)
+ # Updates the index to packs mapping as a side effect,
+ self.knit_access.set_writer(pack._writer, index, pack.access_tuple())
+ self.add_callback = index.add_nodes
+
+ def clear(self):
+ """Reset all the aggregate data to nothing."""
+ self.knit_access.set_writer(None, None, (None, None))
+ self.index_to_pack.clear()
+ del self.combined_index._indices[:]
+ self.add_callback = None
+
+ def remove_index(self, index, pack):
+ """Remove index from the indices used to answer queries.
+
+ :param index: An index from the pack parameter.
+ :param pack: A Pack instance.
+ """
+ del self.index_to_pack[index]
+ self.combined_index._indices.remove(index)
+ if (self.add_callback is not None and
+ getattr(index, 'add_nodes', None) == self.add_callback):
+ self.add_callback = None
+ self.knit_access.set_writer(None, None, (None, None))
+
class RepositoryPackCollection(object):
"""Management of packs within a repository."""
@@ -436,7 +472,7 @@
# when a pack is being created by this object, the state of that pack.
self._new_pack = None
# aggregated revision index data
- self.revision_index = None
+ self.revision_index = AggregateIndex()
def add_pack_to_memory(self, pack):
"""Make a Pack object available to the repository to satisfy queries.
@@ -446,8 +482,7 @@
self.packs.append(pack)
assert pack.name not in self._packs
self._packs[pack.name] = pack
- if self.revision_index is not None:
- self.revision_index.add_index(pack.revision_index, pack)
+ self.revision_index.add_index(pack.revision_index, pack)
if self.repo._inv_all_indices is not None:
# inv 'knit' has been used : update it.
self.repo._inv_all_indices.insert_index(0,
@@ -502,8 +537,7 @@
:return: True if packing took place.
"""
- if self.revision_index is None:
- self.refresh_revision_indices()
+ # XXX: Should not be needed when the management of indices is sane.
total_revisions = self.revision_index.combined_index.key_count()
total_packs = len(self._names)
if self._max_pack_count(total_revisions) >= total_packs:
@@ -536,23 +570,6 @@
self._execute_pack_operations(pack_operations)
return True
- def refresh_revision_indices(self):
- """Refresh the mappings for revisions.
-
- This sets up mappings for only the existing packs, never in-progress
- packs.
- """
- index_map, index_list = self._packs_list_to_pack_map_and_index_list(
- self.all_packs(), 'revision_index')
- if self.revision_index is None:
- # setup indices.
- self.revision_index = AggregateIndex()
- else:
- # reset the knit access writer
- self.revision_index.knit_access.set_writer(None, None, (None, None))
- # replace all the index data with the current canonical list.
- self.revision_index.replace_indices(index_map, index_list)
-
def refresh_signature_indices(self):
index_map, index_list = self._packs_list_to_pack_map_and_index_list(
self.all_packs(), 'signature_index')
@@ -759,7 +776,7 @@
# have a progress bar?
self.create_pack_from_packs(packs, '.autopack')
for pack in packs:
- self._remove_pack_by_name(pack.name)
+ self.remove_pack_from_memory(pack)
# record the newly available packs and stop advertising the old
# packs
self._save_pack_names()
@@ -782,8 +799,6 @@
total_packs = len(self._names)
if total_packs < 2:
return
- if self.revision_index is None:
- self.refresh_revision_indices()
total_revisions = self.revision_index.combined_index.key_count()
# XXX: the following may want to be a class, to pack with a given
# policy.
@@ -938,9 +953,14 @@
self._packs_at_load = set()
for index, key, value in self._iter_disk_pack_index():
name = key[0]
- sizes = [int(digits) for digits in value.split(' ')]
- self._names[name] = sizes
+ self._names[name] = self._parse_index_sizes(value)
self._packs_at_load.add((key, value))
+ # populate all the metadata.
+ self.all_packs()
+
+ def _parse_index_sizes(self, value):
+ """Parse a string of index sizes."""
+ return tuple([int(digits) for digits in value.split(' ')])
def get_pack_by_name(self, name):
"""Get a Pack object by name.
@@ -971,7 +991,7 @@
# a collision with the packs we know about (not the only possible
# collision, see NewPack.finish() for some discussion). Remove our
# prior reference to it.
- self._remove_pack_by_name(a_new_pack.name)
+ self.remove_pack_from_memory(a_new_pack)
self._names[a_new_pack.name] = tuple(a_new_pack.index_sizes)
self.add_pack_to_memory(a_new_pack)
@@ -1053,15 +1073,20 @@
"""Return a tuple with the transport and file name for a pack name."""
return self._pack_transport, name + '.pack'
- def _remove_pack_by_name(self, name):
- # strip .pack
- self._names.pop(name)
+ def remove_pack_from_memory(self, pack):
+ """Remove pack from the packs accessed by this repository.
+
+ Only affects memory state, until self._save_pack_names() is invoked.
+ """
+ self._names.pop(pack.name)
+ self._packs.pop(pack.name)
+ self.revision_index.remove_index(pack.revision_index, pack)
def reset(self):
"""Clear all cached data."""
# cached revision data
self.repo._revision_knit = None
- self.revision_index = None
+ self.revision_index.clear()
# cached signature data
self.repo._signature_knit = None
self.repo._signature_all_indices = None
@@ -1166,11 +1191,44 @@
new_nodes = current_nodes - self._packs_at_load
disk_nodes.difference_update(deleted_nodes)
disk_nodes.update(new_nodes)
+ # TODO: handle same-name, index-size-changes here -
+ # e.g. use the value from disk, not ours, *unless* we're the one
+ # changing it.
for key, value in disk_nodes:
builder.add_node(key, value)
self.transport.put_file('pack-names', builder.finish())
+ # move the baseline forward
+ self._packs_at_load = disk_nodes
finally:
self.release_names()
+ # synchronise the memory packs list with what we just wrote:
+ new_names = dict(disk_nodes)
+ # drop no longer present nodes
+ for pack in self.all_packs():
+ if (pack.name,) not in new_names:
+ self.remove_pack_from_memory(pack)
+ # add new nodes/refresh existing ones
+ for key, value in disk_nodes:
+ name = key[0]
+ sizes = self._parse_index_sizes(value)
+ if name in self._names:
+ # existing
+ if sizes != self._names[name]:
+ # the pack for name has had its indices replaced - rare but
+ # important to handle. XXX: probably can never happen today
+ # because the three-way merge code above does not handle it
+ # - you may end up adding the same key twice to the new
+ # disk index because the set values are the same, unless
+ # the only index shows up as deleted by the set difference
+ # - which it may. Until there is a specific test for this,
+ # assume its broken. RBC 20071017.
+ self.remove_pack_from_memory(self.get_pack_by_name(name))
+ self._names[name] = sizes
+ self.get_pack_by_name(name)
+ else:
+ # new
+ self._names[name] = sizes
+ self.get_pack_by_name(name)
def _start_write_group(self):
# Do not permit preparation for writing if we're not in a 'write lock'.
@@ -1178,6 +1236,9 @@
raise errors.NotWriteLocked(self)
self._new_pack = NewPack(self._upload_transport, self._index_transport,
self._pack_transport, upload_suffix='.pack')
+ # allow writing: queue writes to a new index
+ self.revision_index.add_writable_index(self._new_pack.revision_index,
+ self._new_pack)
self.repo._open_pack_tuple = (self._upload_transport, self._new_pack.random_name)
@@ -1188,17 +1249,22 @@
def _abort_write_group(self):
# FIXME: just drop the transient index.
# forget what names there are
+ self._new_pack.abort()
+ pack = self._new_pack
+ self.revision_index.remove_index(pack.revision_index, pack)
+ self._new_pack = None
self.reset()
def _commit_write_group(self):
if self._new_pack.data_inserted():
+ pack = self._new_pack
+ # remove the pack's write indices from the aggregate indices.
+ self.revision_index.remove_index(pack.revision_index, pack)
+ # get all the data to disk and read to use
self._new_pack.finish()
self.allocate(self._new_pack)
self.repo._open_pack_tuple = None
self._new_pack = None
- # this refreshes the revision index map which is used to get the
- # total revision count which triggers autopack.
- self.refresh_revision_indices()
if not self.autopack():
# when autopack takes no steps, the names list is still
# unsaved.
@@ -1207,7 +1273,6 @@
self.refresh_text_index()
self.refresh_inventory_index()
self.refresh_signature_indices()
- self.refresh_revision_indices()
else:
self._new_pack.abort()
# forget what names there are - XXX should just refresh them and apply
@@ -1246,20 +1311,7 @@
if getattr(self.repo, '_revision_knit', None) is not None:
return self.repo._revision_knit
self.repo._packs.ensure_loaded()
- self.repo._packs.refresh_revision_indices()
- # add write support if needed
- if self.repo._packs._new_pack is not None:
- # allow writing: queue writes to a new index
- self.repo._packs.revision_index.add_index(
- self.repo._packs._new_pack.revision_index,
- self.repo._packs._new_pack)
- # Updates the index to packs mapping as a side effect,
- self.repo._packs.revision_index.knit_access.set_writer(
- self.repo._packs._new_pack._writer,
- self.repo._packs._new_pack.revision_index, self.repo._open_pack_tuple)
- add_callback = self.repo._packs._new_pack.revision_index.add_nodes
- else:
- add_callback = None # no data-adding permitted.
+ add_callback = self.repo._packs.revision_index.add_callback
# setup knit specific objects
knit_index = KnitGraphIndex(
self.repo._packs.revision_index.combined_index,
@@ -1303,14 +1355,8 @@
# if knit indices have been handed out, add a mutable
# index to them
if self.repo._revision_knit is not None:
- self.repo._packs.revision_index.add_index(
- self.repo._packs._new_pack.revision_index,
- self.repo._packs._new_pack)
self.repo._revision_knit._index._add_callback = \
- self.repo._packs._new_pack.revision_index.add_nodes
- self.repo._packs.revision_index.knit_access.set_writer(
- self.repo._packs._new_pack._writer,
- self.repo._packs._new_pack.revision_index, self.repo._open_pack_tuple)
+ self.repo._packs.revision_index.add_callback
if self.repo._signature_knit is not None:
self.repo._signature_all_indices.insert_index(0, self.repo._packs._new_pack.signature_index)
self.repo._signature_knit._index._add_callback = self.repo._packs._new_pack.signature_index.add_nodes
=== modified file 'bzrlib/tests/test_repository.py'
--- a/bzrlib/tests/test_repository.py 2007-10-17 01:48:01 +0000
+++ b/bzrlib/tests/test_repository.py 2007-10-17 03:30:04 +0000
@@ -823,7 +823,7 @@
finally:
r1.unlock()
finally:
- r2.unlock()
+ pass # r2.unlock()
def test_concurrent_writer_second_preserves_dropping_a_pack(self):
format = self.get_format()
@@ -842,7 +842,7 @@
else:
r1.commit_write_group()
r1._packs.ensure_loaded()
- name_to_drop = r1._packs.names()[0]
+ name_to_drop = r1._packs.all_packs()[0].name
finally:
r1.unlock()
r1.lock_write()
@@ -858,14 +858,15 @@
r2.start_write_group()
try:
# in r1, drop the pack
- r1._packs._remove_pack_by_name(name_to_drop)
+ r1._packs.remove_pack_from_memory(
+ r1._packs.get_pack_by_name(name_to_drop))
# in r2, add a pack
self._add_text(r2, 'fileidr2')
except:
r2.abort_write_group()
raise
except:
- r1.reset()
+ r1._packs.reset()
raise
# r1 has a changed names list, and r2 an open write groups with
# changes.
More information about the bazaar-commits
mailing list