Rev 2751: Combine duplicated _commit_write_group and get _get_bytes_by_hash_multi working in http://sourcefrog.net/bzr/pack-hashes
Martin Pool
mbp at sourcefrog.net
Thu Aug 30 05:33:42 BST 2007
At http://sourcefrog.net/bzr/pack-hashes
------------------------------------------------------------
revno: 2751
revision-id: mbp at sourcefrog.net-20070830043341-01q2hwpw6d78btta
parent: mbp at sourcefrog.net-20070830002223-5fmig065hwpk1sbd
committer: Martin Pool <mbp at sourcefrog.net>
branch nick: pack-hashes
timestamp: Thu 2007-08-30 14:33:41 +1000
message:
Combine duplicated _commit_write_group and get _get_bytes_by_hash_multi working
modified:
bzrlib/repofmt/pack_repo.py pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
bzrlib/tests/test_pack_repository.py test_pack_repository-20070828111851-nof5soh31tidz2dq-1
=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py 2007-08-30 00:10:52 +0000
+++ b/bzrlib/repofmt/pack_repo.py 2007-08-30 04:33:41 +0000
@@ -23,6 +23,7 @@
from bzrlib import (
debug,
+ osutils,
pack,
ui,
)
@@ -34,7 +35,10 @@
GraphIndexPrefixAdapter,
)
from bzrlib.knit import KnitGraphIndex, _PackAccess, _KnitData
-from bzrlib.pack import ContainerWriter
+from bzrlib.pack import (
+ ContainerWriter,
+ make_readv_reader,
+ )
from bzrlib.store import revision
""")
from bzrlib import (
@@ -68,6 +72,7 @@
_HASH_INDEX_SUFFIX = '.hix'
+
class Pack(object):
"""An in memory proxy for a .pack and its indices."""
@@ -1190,6 +1195,48 @@
value = "%d %d" % (offset, length)
self._hash_write_index.add_nodes([(key, value)])
+ def _get_bytes_by_hash_multi(self, hashes):
+ """Look up objects by hash.
+
+ :param hashes: Sequence of hashes.
+ :returns: Map from hash to bytes.
+ """
+ # TODO: don't load the overall index every time, just get it once and
+ # hold it during the lock duration
+ #
+ # TODO: look in the currently active writer too?
+ index_to_pack, indices = self._make_index_map(_HASH_INDEX_SUFFIX)
+ combined = CombinedGraphIndex(indices)
+ r = {}
+ # index keys are tuples; we just want a string
+ search = [(key,) for key in hashes]
+ missing_keys = set(hashes)
+ for index, key, value in combined.iter_entries(search):
+ # nb they might be returned out of order
+ found_hash = key[0]
+ missing_keys.remove(found_hash)
+ offset, length = map(int, value.split(' '))
+ transport, pack_name = index_to_pack[index]
+ # TODO: is there already some code that will group together all
+ # the retrievals from a pack and turn them into one readv per
+ # pack?
+ #
+ # TODO: do a readv, not individual reads
+ assert found_hash not in r
+ reader = make_readv_reader(transport, pack_name, [(offset, length)])
+ for record_reader in reader.iter_records():
+ found_names, read_fn = record_reader
+ result = read_fn(None) # read all
+ assert osutils.sha_string(result) == found_hash
+ r[found_hash] = result
+ if missing_keys:
+ raise errors.BzrError("keys %r were not found in %r" %
+ (missing_keys, combined))
+ return r
+
+ def _get_bytes_by_hash(self, h):
+ return self._get_bytes_by_hash_multi([h])[h]
+
def _make_index_map(self, suffix):
"""Return information on existing indexes.
@@ -1218,7 +1265,7 @@
def _commit_hash_index(self, new_pack_name):
new_hash_index_name = new_pack_name + _HASH_INDEX_SUFFIX
- self._index_transport.put_file(new_hash_index_name,
+ self._index_transport.put_file_non_atomic(new_hash_index_name,
self._hash_write_index.finish())
del self._hash_write_index
@@ -1249,6 +1296,53 @@
self._open_pack_hash = None
self._abort_hash_index()
+ def _commit_write_group(self):
+ data_inserted = (self._revision_store.data_inserted() or
+ self.weave_store.data_inserted() or
+ self._inv_thunk.data_inserted() or
+ self._hash_write_index.key_count() > 0)
+ if data_inserted:
+ self._open_pack_writer.end()
+ new_name = self._open_pack_hash.hexdigest()
+ new_pack = Pack()
+ new_pack.name = new_name
+ new_pack.transport = self._upload_transport.clone('../packs/')
+ # To populate:
+ # new_pack.revision_index =
+ # new_pack.inventory_index =
+ # new_pack.text_index =
+ # new_pack.signature_index =
+ self.weave_store.flush(new_name, new_pack)
+ self._inv_thunk.flush(new_name, new_pack)
+ self._revision_store.flush(new_name, new_pack)
+ self._commit_hash_index(new_name)
+ self._write_stream.close()
+ self._upload_transport.rename(self._open_pack_tuple[1],
+ '../packs/' + new_name + '.pack')
+ # If this fails, its a hash collision. We should:
+ # - determine if its a collision or
+ # - the same content or
+ # - the existing name is not the actual hash - e.g.
+ # its a deliberate attack or data corruption has
+ # occuring during the write of that file.
+ self._packs.allocate(new_name, new_pack.revision_index_length,
+ new_pack.inventory_index_length, new_pack.text_index_length,
+ new_pack.signature_index_length)
+ self._open_pack_tuple = None
+ if not self._packs.autopack():
+ self._packs.save()
+ else:
+ # remove the pending upload
+ self._upload_transport.delete(self._open_pack_tuple[1])
+ self._revision_store.reset()
+ self.weave_store.reset()
+ self._inv_thunk.reset()
+ # forget what names there are - should just refresh and deal with the
+ # delta.
+ self._packs.reset()
+ self._open_pack_hash = None
+ self._write_stream = None
+
class GraphKnitRepository1(_GraphKnitRepositoryBase, KnitRepository):
"""Experimental graph-knit using repository."""
@@ -1280,51 +1374,6 @@
# forget what names there are
self._packs.reset()
- def _commit_write_group(self):
- data_inserted = (self._revision_store.data_inserted() or
- self.weave_store.data_inserted() or
- self._inv_thunk.data_inserted())
- if data_inserted:
- self._open_pack_writer.end()
- new_name = self._open_pack_hash.hexdigest()
- new_pack = Pack()
- new_pack.name = new_name
- new_pack.transport = self._upload_transport.clone('../packs/')
- # To populate:
- # new_pack.revision_index =
- # new_pack.inventory_index =
- # new_pack.text_index =
- # new_pack.signature_index =
- self.weave_store.flush(new_name, new_pack)
- self._inv_thunk.flush(new_name, new_pack)
- self._revision_store.flush(new_name, new_pack)
- self._write_stream.close()
- self._upload_transport.rename(self._open_pack_tuple[1],
- '../packs/' + new_name + '.pack')
- # If this fails, its a hash collision. We should:
- # - determine if its a collision or
- # - the same content or
- # - the existing name is not the actual hash - e.g.
- # its a deliberate attack or data corruption has
- # occuring during the write of that file.
- self._packs.allocate(new_name, new_pack.revision_index_length,
- new_pack.inventory_index_length, new_pack.text_index_length,
- new_pack.signature_index_length)
- self._open_pack_tuple = None
- if not self._packs.autopack():
- self._packs.save()
- else:
- # remove the pending upload
- self._upload_transport.delete(self._open_pack_tuple[1])
- self._revision_store.reset()
- self.weave_store.reset()
- self._inv_thunk.reset()
- # forget what names there are - should just refresh and deal with the
- # delta.
- self._packs.reset()
- self._open_pack_hash = None
- self._write_stream = None
-
def get_inventory_weave(self):
return self._inv_thunk.get_weave()
@@ -1385,51 +1434,6 @@
# forget what names there are
self._packs.reset()
- def _commit_write_group(self):
- data_inserted = (self._revision_store.data_inserted() or
- self.weave_store.data_inserted() or
- self._inv_thunk.data_inserted())
- if data_inserted:
- self._open_pack_writer.end()
- new_name = self._open_pack_hash.hexdigest()
- new_pack = Pack()
- new_pack.name = new_name
- new_pack.transport = self._upload_transport.clone('../packs/')
- # To populate:
- # new_pack.revision_index =
- # new_pack.inventory_index =
- # new_pack.text_index =
- # new_pack.signature_index =
- self.weave_store.flush(new_name, new_pack)
- self._inv_thunk.flush(new_name, new_pack)
- self._revision_store.flush(new_name, new_pack)
- self._write_stream.close()
- self._upload_transport.rename(self._open_pack_tuple[1],
- '../packs/' + new_name + '.pack')
- # If this fails, its a hash collision. We should:
- # - determine if its a collision or
- # - the same content or
- # - the existing name is not the actual hash - e.g.
- # its a deliberate attack or data corruption has
- # occuring during the write of that file.
- self._packs.allocate(new_name, new_pack.revision_index_length,
- new_pack.inventory_index_length, new_pack.text_index_length,
- new_pack.signature_index_length)
- self._open_pack_tuple = None
- if not self._packs.autopack():
- self._packs.save()
- else:
- # remove the pending upload
- self._upload_transport.delete(self._open_pack_tuple[1])
- self._revision_store.reset()
- self.weave_store.reset()
- self._inv_thunk.reset()
- # forget what names there are - should just refresh and deal with the
- # delta.
- self._packs.reset()
- self._open_pack_hash = None
- self._write_stream = None
-
def get_inventory_weave(self):
return self._inv_thunk.get_weave()
=== modified file 'bzrlib/tests/test_pack_repository.py'
--- a/bzrlib/tests/test_pack_repository.py 2007-08-30 00:08:46 +0000
+++ b/bzrlib/tests/test_pack_repository.py 2007-08-30 04:33:41 +0000
@@ -51,6 +51,5 @@
# finish storing
repo.commit_write_group()
# try to get it back
- return
result = repo._get_bytes_by_hash(stuff_hash)
- self.assertEquals(stuff. result)
+ self.assertEquals(stuff, result)
More information about the bazaar-commits
mailing list