Rev 2826: Make NewPack reopen the index files, separating out the task of refreshing the index maps in the repository and managing the completion of writing a single pack to disk. in http://people.ubuntu.com/~robertc/baz2.0/repository
Robert Collins
robertc at robertcollins.net
Tue Oct 16 03:48:30 BST 2007
At http://people.ubuntu.com/~robertc/baz2.0/repository
------------------------------------------------------------
revno: 2826
revision-id: robertc at robertcollins.net-20071016024822-vsaucu2p3gnz3b6m
parent: robertc at robertcollins.net-20071016002033-cszqh7o0e9wym4vd
committer: Robert Collins <robertc at robertcollins.net>
branch nick: repository
timestamp: Tue 2007-10-16 12:48:22 +1000
message:
Make NewPack reopen the index files, separating out the task of refreshing the index maps in the repository and managing the completion of writing a single pack to disk.
modified:
bzrlib/repofmt/pack_repo.py pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py 2007-10-16 00:20:33 +0000
+++ b/bzrlib/repofmt/pack_repo.py 2007-10-16 02:48:22 +0000
@@ -118,37 +118,46 @@
self.text_index = text_index
self.signature_index = signature_index
+ def file_name(self):
+ """Get the file name for the pack on disk."""
+ return self.name + '.pack'
+
def get_revision_count(self):
return self.revision_index.key_count()
def inventory_index_name(self, name):
"""The inv index is the name + .iix."""
- return name + '.iix'
+ return self.index_name('inventory', name)
def revision_index_name(self, name):
"""The revision index is the name + .rix."""
- return name + '.rix'
+ return self.index_name('revision', name)
def signature_index_name(self, name):
"""The signature index is the name + .six."""
- return name + '.six'
+ return self.index_name('signature', name)
def text_index_name(self, name):
"""The text index is the name + .tix."""
- return name + '.tix'
+ return self.index_name('text', name)
class ExistingPack(Pack):
"""An in memory proxy for an exisiting .pack and its disk indices."""
- def __init__(self, transport, name, revision_index, inventory_index,
+ def __init__(self, pack_transport, name, revision_index, inventory_index,
text_index, signature_index):
+ """Create an ExistingPack object.
+
+ :param pack_transport: The transport where the pack file resides.
+ :param name: The name of the pack on disk in the pack_transport.
+ """
Pack.__init__(self, revision_index, inventory_index, text_index,
signature_index)
self.name = name
- self.transport = transport
+ self.pack_transport = pack_transport
assert None not in (revision_index, inventory_index, text_index,
- signature_index, name, transport)
+ signature_index, name, pack_transport)
def __eq__(self, other):
return self.__dict__ == other.__dict__
@@ -160,14 +169,19 @@
return "<bzrlib.repofmt.pack_repo.Pack object at 0x%x, %s, %s" % (
id(self), self.transport, self.name)
- def file_name(self):
- """Get the file name for the pack on disk."""
- return self.name + '.pack'
-
class NewPack(Pack):
"""An in memory proxy for a pack which is being created."""
+ # A map of index 'type' to the file extension and position in the
+ # index_sizes array.
+ indices = {
+ 'revision':('.rix', 0),
+ 'inventory':('.iix', 1),
+ 'text':('.tix', 2),
+ 'signature':('.six', 3),
+ }
+
def __init__(self, upload_transport, index_transport, pack_transport,
upload_suffix=''):
"""Create a NewPack instance.
@@ -233,16 +247,29 @@
"""
self.name = self._hash.hexdigest()
self.index_sizes = [None, None, None, None]
- self._write_index(self.revision_index, 0,
- self.revision_index_name, 'revision')
- self._write_index(self.inventory_index, 1,
- self.inventory_index_name, 'inventory')
- self._write_index(self.text_index, 2,
- self.text_index_name, 'file texts')
- self._write_index(self.signature_index, 3,
- self.signature_index_name, 'revision signatures')
-
- def _write_index(self, index, index_offset, name_getter, label):
+ self._write_index('revision', self.revision_index, 'revision')
+ self._write_index('inventory', self.inventory_index, 'inventory')
+ self._write_index('text', self.text_index, 'file texts')
+ self._write_index('signature', self.signature_index,
+ 'revision signatures')
+
+
+ def make_index(self, index_type):
+ """Construct a GraphIndex object for this packs index 'index_type'."""
+ setattr(self, index_type + '_index',
+ GraphIndex(self.index_transport,
+ self.index_name(index_type, self.name),
+ self.index_sizes[self.index_offset(index_type)]))
+
+ def index_name(self, index_type, name):
+ """Get the disk name of an index type for pack name 'name'."""
+ return name + NewPack.indices[index_type][0]
+
+ def index_offset(self, index_type):
+ """Get the position in a index_size array for a given index type."""
+ return NewPack.indices[index_type][1]
+
+ def _write_index(self, index_type, index, label):
"""Write out an index.
:param index: The index object to serialise.
@@ -250,14 +277,19 @@
:param name_getter: What to use to get the name of the index on disk.
:param label: What label to give the index e.g. 'revision'.
"""
- index_name = name_getter(self.name)
- self.index_sizes[index_offset] = self.index_transport.put_file(
- index_name, index.finish())
+ index_name = self.index_name(index_type, self.name)
+ self.index_sizes[self.index_offset(index_type)] = \
+ self.index_transport.put_file(index_name, index.finish())
if 'fetch' in debug.debug_flags:
# XXX: size might be interesting?
mutter('%s: create_pack: wrote %s index: %s%s t+%6.3fs',
time.ctime(), label, self.upload_transport.base,
self.random_name, time.time() - self.start_time)
+ # As we have no current protection against erroneous additional
+ # insertions, load the index from disk on further use. We should alter
+ # the index layer to make it's finish() error if add_node is
+ # subsequently used. RBC
+ self.make_index(index_type)
class RepositoryPackCollection(object):
@@ -302,13 +334,13 @@
pass
else:
self.repo._revision_pack_map[pack.revision_index] = (
- pack.transport, pack.name + '.pack')
+ pack.pack_transport, pack.name + '.pack')
self.repo._revision_all_indices.insert_index(0, pack.revision_index)
if self.repo._inv_all_indices is not None:
# inv 'knit' has been used : update it.
self.repo._inv_all_indices.insert_index(0,
pack.inventory_index)
- self.repo._inv_pack_map[pack.inventory_index] = pack.transport, pack.name + '.pack'
+ self.repo._inv_pack_map[pack.inventory_index] = pack.pack_transport, pack.name + '.pack'
if self.repo._text_all_indices is not None:
# text 'knits' have been used : update it.
self.repo._text_all_indices.insert_index(0,
@@ -393,78 +425,60 @@
self._execute_pack_operations(pack_operations)
return True
- def flush_revision_signature_indices(self, new_name, revision_index_length,
- signature_index_length):
- """Write out pending indices."""
- # write a revision index (might be empty)
- new_index_name = self._new_pack.revision_index_name(new_name)
- rev_index = GraphIndex(self._index_transport, new_index_name,
- revision_index_length)
+ def refresh_revision_signature_indices(self):
+ """Refresh the mappings for revisions."""
+ index_map, index_list = self._make_index_map('.rix')
if self.repo._revision_all_indices is None:
# create a pack map for the autopack code - XXX finish
# making a clear managed list of packs, indices and use
# that in these mapping classes
- self.repo._revision_pack_map = self._make_index_map('.rix')[0]
+ self.repo._revision_pack_map = index_map
else:
- del self.repo._revision_pack_map[self._new_pack.revision_index]
- self.repo._revision_pack_map[rev_index] = (self._pack_tuple(new_name))
+ # refresh the revision pack map dict without replacing the instance.
+ self.repo._revision_pack_map.clear()
+ self.repo._revision_pack_map.update(index_map)
# revisions 'knit' accessed : update it.
- self.repo._revision_all_indices.insert_index(0, rev_index)
- # remove the write buffering index. XXX: API break
- # - clearly we need a remove_index call too.
- del self.repo._revision_all_indices._indices[1]
+ # XXX: API break - clearly a 'replace' method would be good?
+ self.repo._revision_all_indices._indices[:] = index_list
# reset the knit access writer
self.repo._revision_knit_access.set_writer(None, None, (None, None))
- # write a signatures index (might be empty)
- new_index_name = self._new_pack.signature_index_name(new_name)
- sig_index = GraphIndex(self._index_transport, new_index_name,
- signature_index_length)
+ index_map, index_list = self._make_index_map('.six')
if self.repo._signature_all_indices is not None:
- # sigatures 'knit' accessed : update it.
- self.repo._signature_all_indices.insert_index(0, sig_index)
- # remove the write buffering index. XXX: API break
- # - clearly we need a remove_index call too.
- del self.repo._signature_all_indices._indices[1]
+ # signature 'knit' accessed : update it.
+ # XXX: API break - clearly a 'replace' method would be good?
+ self.repo._signature_all_indices._indices[:] = index_list
# reset the knit access writer
self.repo._signature_knit_access.set_writer(None, None, (None, None))
- return rev_index, sig_index,
- def flush_inventory_index(self, new_name, inventory_index_length):
- """Write the index out to new_name."""
- # write an index (might be empty)
- new_index_name = self._new_pack.inventory_index_name(new_name)
- inv_index = GraphIndex(self._index_transport, new_index_name,
- inventory_index_length)
+ def refresh_inventory_index(self):
+ """Refresh the inventory access index mappings."""
+ index_map, index_list = self._make_index_map('.iix')
if self.repo._inv_all_indices is not None:
- # inv 'knit' has been used, replace the mutated memory index
- # with the new on-disk one. XXX: is this really a good idea?
- # perhaps just keep using the memory one ?
- self.repo._inv_all_indices.insert_index(0, inv_index)
- # remove the write buffering index. XXX: API break
- # - clearly we need a remove_index call too.
- del self.repo._inv_all_indices._indices[1]
+ # refresh the pack map dict without replacing the instance.
+ self.repo._inv_pack_map.clear()
+ self.repo._inv_pack_map.update(index_map)
+ # invs 'knit' accessed : update it.
+ # XXX: API break - clearly a 'replace' method would be good?
+ self.repo._inv_all_indices._indices[:] = index_list
+ # reset the knit access writer
self.repo._inv_knit_access.set_writer(None, None, (None, None))
else:
+ # inventory knit not used, ensure the pack map is regenerated at
+ # next use.
self.repo._inv_pack_map = None
- return inv_index
- def flush_text_index(self, new_name, text_index_length):
- """Write the index out to new_name."""
- # write a revision index (might be empty)
- new_index_name = self._new_pack.text_index_name(new_name)
- txt_index = GraphIndex(self._index_transport, new_index_name,
- text_index_length)
+ def refresh_text_index(self):
+ """Refresh the text index mappings."""
+ index_map, index_list = self._make_index_map('.tix')
self.repo.weave_store._setup_knit(False)
if self.repo._text_all_indices is not None:
- # text 'knits' have been used, replace the mutated memory index
- # with the new on-disk one. XXX: is this really a good idea?
- # perhaps just keep using the memory one ?
- self.repo._text_all_indices.insert_index(0, txt_index)
- # remove the write buffering index. XXX: API break
- # - clearly we need a remove_index call too.
- del self.repo._text_all_indices._indices[1]
- return txt_index
+ # refresh the pack map dict without replacing the instance.
+ self.repo._text_pack_map.clear()
+ self.repo._text_pack_map.update(index_map)
+ # invs 'knit' accessed : update it.
+ # XXX: API break - clearly a 'replace' method would be good?
+ self.repo._text_all_indices._indices[:] = index_list
def create_pack_from_packs(self, packs, suffix, revision_ids=None):
"""Create a new pack by reading data from other packs.
@@ -499,7 +513,7 @@
self._pack_transport, upload_suffix=suffix)
random_name = new_pack.random_name
if 'fetch' in debug.debug_flags:
- plain_pack_list = ['%s%s' % (a_pack.transport.base, a_pack.name)
+ plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
for a_pack in packs]
if revision_ids is not None:
rev_count = len(revision_ids)
@@ -618,7 +632,6 @@
bytes = ''.join(buffer)
write_stream.write(bytes)
new_pack._hash.update(bytes)
- new_name = new_pack._hash.hexdigest()
if not new_pack.data_inserted():
# nothing was copied, discard the new pack.
self._upload_transport.delete(random_name)
@@ -626,21 +639,19 @@
# write indices
new_pack.finish()
# add to name
- self.allocate(new_name, new_pack.index_sizes[0],
+ self.allocate(new_pack, new_pack.name, new_pack.index_sizes[0],
new_pack.index_sizes[1], new_pack.index_sizes[2],
new_pack.index_sizes[3])
# rename into place. XXX: should rename each index too rather than just
# uploading blind under the chosen name.
write_stream.close()
- self._upload_transport.rename(random_name, '../packs/' + new_name + '.pack')
- result = ExistingPack(self._upload_transport.clone('../packs/'), new_name,
- new_pack.revision_index, new_pack.inventory_index, new_pack.text_index,
- new_pack.signature_index)
+ self._upload_transport.rename(random_name, '../packs/' + new_pack.name + '.pack')
+ result = new_pack
if 'fetch' in debug.debug_flags:
# XXX: size might be interesting?
mutter('%s: create_pack: pack renamed into place: %s%s->%s%s t+%6.3fs',
time.ctime(), self._upload_transport.base, random_name,
- result.transport, result.name,
+ result.pack_transport, result.name,
time.time() - new_pack.start_time)
if 'fetch' in debug.debug_flags:
# XXX: size might be interesting?
@@ -864,7 +875,7 @@
self.add_pack_to_memory(result)
return result
- def allocate(self, name, revision_index_length, inventory_index_length,
+ def allocate(self, a_new_pack, name, revision_index_length, inventory_index_length,
text_index_length, signature_index_length):
"""Allocate name in the list of packs.
@@ -950,7 +961,7 @@
:param return: None.
"""
for pack in packs:
- pack.transport.rename(pack.file_name(),
+ pack.pack_transport.rename(pack.file_name(),
'../obsolete_packs/' + pack.file_name())
# TODO: Probably needs to know all possible indexes for this pack
# - or maybe list the directory and move all indexes matching this
@@ -1045,7 +1056,7 @@
for pack in packs:
index = getattr(pack, index_attribute)
indices.append(index)
- pack_map[index] = (pack.transport, pack.file_name())
+ pack_map[index] = (pack.pack_transport, pack.file_name())
return pack_map, indices
def _index_contents(self, pack_map, key_filter=None):
@@ -1127,33 +1138,27 @@
def _commit_write_group(self):
if self._new_pack.data_inserted():
self._open_pack_writer.end()
- new_name = self._new_pack._hash.hexdigest()
self._new_pack.finish()
- txt_index = \
- self.flush_text_index(new_name, self._new_pack.index_sizes[2])
- inv_index = \
- self.flush_inventory_index(new_name, self._new_pack.index_sizes[1])
- rev_index, sig_index = \
- self.flush_revision_signature_indices(new_name,
- self._new_pack.index_sizes[0], self._new_pack.index_sizes[3])
- new_pack = ExistingPack(self._upload_transport.clone('../packs/'),
- new_name, rev_index, inv_index, txt_index, sig_index)
self._write_stream.close()
self._upload_transport.rename(self.repo._open_pack_tuple[1],
- '../packs/' + new_name + '.pack')
+ '../packs/' + self._new_pack.name + '.pack')
# If this fails, its a hash collision. We should:
# - determine if its a collision or
# - the same content or
# - the existing name is not the actual hash - e.g.
# its a deliberate attack or data corruption has
# occuring during the write of that file.
- self.allocate(new_name, self._new_pack.index_sizes[0],
+ self.allocate(self._new_pack, self._new_pack.name, self._new_pack.index_sizes[0],
self._new_pack.index_sizes[1], self._new_pack.index_sizes[2],
self._new_pack.index_sizes[3])
self.repo._open_pack_tuple = None
self._new_pack = None
if not self.autopack():
self._save_pack_names()
+ # now setup the maps we need to access data again.
+ self.refresh_text_index()
+ self.refresh_inventory_index()
+ self.refresh_revision_signature_indices()
else:
# remove the pending upload
self._upload_transport.delete(self.repo._open_pack_tuple[1])
@@ -1358,7 +1363,7 @@
if for_write:
# a reused knit object for commit specifically.
self.repo._text_knit = self.get_weave_or_empty(
- 'all-texts', self.repo.get_transaction(), for_write)
+ 'all-texts', None, for_write)
else:
self.repo._text_knit = None
More information about the bazaar-commits
mailing list