Rev 2751: Combine duplicated _commit_write_group and get _get_bytes_by_hash_multi working in http://sourcefrog.net/bzr/pack-hashes

Martin Pool mbp at sourcefrog.net
Thu Aug 30 05:33:42 BST 2007


At http://sourcefrog.net/bzr/pack-hashes

------------------------------------------------------------
revno: 2751
revision-id: mbp at sourcefrog.net-20070830043341-01q2hwpw6d78btta
parent: mbp at sourcefrog.net-20070830002223-5fmig065hwpk1sbd
committer: Martin Pool <mbp at sourcefrog.net>
branch nick: pack-hashes
timestamp: Thu 2007-08-30 14:33:41 +1000
message:
  Combine duplicated _commit_write_group and get _get_bytes_by_hash_multi working
modified:
  bzrlib/repofmt/pack_repo.py    pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
  bzrlib/tests/test_pack_repository.py test_pack_repository-20070828111851-nof5soh31tidz2dq-1
=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py	2007-08-30 00:10:52 +0000
+++ b/bzrlib/repofmt/pack_repo.py	2007-08-30 04:33:41 +0000
@@ -23,6 +23,7 @@
 
 from bzrlib import (
         debug,
+        osutils,
         pack,
         ui,
         )
@@ -34,7 +35,10 @@
     GraphIndexPrefixAdapter,
     )
 from bzrlib.knit import KnitGraphIndex, _PackAccess, _KnitData
-from bzrlib.pack import ContainerWriter
+from bzrlib.pack import (
+        ContainerWriter,
+        make_readv_reader,
+        )
 from bzrlib.store import revision
 """)
 from bzrlib import (
@@ -68,6 +72,7 @@
 
 _HASH_INDEX_SUFFIX = '.hix'
 
+
 class Pack(object):
     """An in memory proxy for a .pack and its indices."""
 
@@ -1190,6 +1195,48 @@
         value = "%d %d" % (offset, length)
         self._hash_write_index.add_nodes([(key, value)])
 
+    def _get_bytes_by_hash_multi(self, hashes):
+        """Look up objects by hash.
+
+        :param hashes: Sequence of hashes.
+        :returns: Map from hash to bytes.
+        """
+        # TODO: don't load the overall index every time, just get it once and
+        # hold it during the lock duration
+        #
+        # TODO: look in the currently active writer too?
+        index_to_pack, indices = self._make_index_map(_HASH_INDEX_SUFFIX)
+        combined = CombinedGraphIndex(indices)
+        r = {}
+        # index keys are tuples; we just want a string
+        search = [(key,) for key in hashes]
+        missing_keys = set(hashes)
+        for index, key, value in combined.iter_entries(search):
+            # nb they might be returned out of order
+            found_hash = key[0]
+            missing_keys.remove(found_hash)
+            offset, length = map(int, value.split(' '))
+            transport, pack_name = index_to_pack[index]
+            # TODO: is there already some code that will group together all
+            # the retrievals from a pack and turn them into one readv per
+            # pack?
+            #
+            # TODO: do a readv, not individual reads
+            assert found_hash not in r
+            reader = make_readv_reader(transport, pack_name, [(offset, length)])
+            for record_reader in reader.iter_records():
+                found_names, read_fn = record_reader
+                result = read_fn(None) # read all
+                assert osutils.sha_string(result) == found_hash
+            r[found_hash] = result
+        if missing_keys:
+            raise errors.BzrError("keys %r were not found in %r" %
+                    (missing_keys, combined))
+        return r
+
+    def _get_bytes_by_hash(self, h):
+        return self._get_bytes_by_hash_multi([h])[h]
+
     def _make_index_map(self, suffix):
         """Return information on existing indexes.
 
@@ -1218,7 +1265,7 @@
 
     def _commit_hash_index(self, new_pack_name):
         new_hash_index_name = new_pack_name + _HASH_INDEX_SUFFIX
-        self._index_transport.put_file(new_hash_index_name,
+        self._index_transport.put_file_non_atomic(new_hash_index_name,
                 self._hash_write_index.finish())
         del self._hash_write_index
                     
@@ -1249,6 +1296,53 @@
         self._open_pack_hash = None
         self._abort_hash_index()
 
+    def _commit_write_group(self):
+        data_inserted = (self._revision_store.data_inserted() or
+            self.weave_store.data_inserted() or 
+            self._inv_thunk.data_inserted() or
+            self._hash_write_index.key_count() > 0)
+        if data_inserted:
+            self._open_pack_writer.end()
+            new_name = self._open_pack_hash.hexdigest()
+            new_pack = Pack()
+            new_pack.name = new_name
+            new_pack.transport = self._upload_transport.clone('../packs/')
+            # To populate:
+            # new_pack.revision_index = 
+            # new_pack.inventory_index = 
+            # new_pack.text_index = 
+            # new_pack.signature_index = 
+            self.weave_store.flush(new_name, new_pack)
+            self._inv_thunk.flush(new_name, new_pack)
+            self._revision_store.flush(new_name, new_pack)
+            self._commit_hash_index(new_name)
+            self._write_stream.close()
+            self._upload_transport.rename(self._open_pack_tuple[1],
+                '../packs/' + new_name + '.pack')
+            # If this fails, its a hash collision. We should:
+            # - determine if its a collision or
+            # - the same content or
+            # - the existing name is not the actual hash - e.g.
+            #   its a deliberate attack or data corruption has
+            #   occuring during the write of that file.
+            self._packs.allocate(new_name, new_pack.revision_index_length,
+                new_pack.inventory_index_length, new_pack.text_index_length,
+                new_pack.signature_index_length)
+            self._open_pack_tuple = None
+            if not self._packs.autopack():
+                self._packs.save()
+        else:
+            # remove the pending upload
+            self._upload_transport.delete(self._open_pack_tuple[1])
+        self._revision_store.reset()
+        self.weave_store.reset()
+        self._inv_thunk.reset()
+        # forget what names there are - should just refresh and deal with the
+        # delta.
+        self._packs.reset()
+        self._open_pack_hash = None
+        self._write_stream = None
+
 
 class GraphKnitRepository1(_GraphKnitRepositoryBase, KnitRepository):
     """Experimental graph-knit using repository."""
@@ -1280,51 +1374,6 @@
             # forget what names there are
             self._packs.reset()
 
-    def _commit_write_group(self):
-        data_inserted = (self._revision_store.data_inserted() or
-            self.weave_store.data_inserted() or 
-            self._inv_thunk.data_inserted())
-        if data_inserted:
-            self._open_pack_writer.end()
-            new_name = self._open_pack_hash.hexdigest()
-            new_pack = Pack()
-            new_pack.name = new_name
-            new_pack.transport = self._upload_transport.clone('../packs/')
-            # To populate:
-            # new_pack.revision_index = 
-            # new_pack.inventory_index = 
-            # new_pack.text_index = 
-            # new_pack.signature_index = 
-            self.weave_store.flush(new_name, new_pack)
-            self._inv_thunk.flush(new_name, new_pack)
-            self._revision_store.flush(new_name, new_pack)
-            self._write_stream.close()
-            self._upload_transport.rename(self._open_pack_tuple[1],
-                '../packs/' + new_name + '.pack')
-            # If this fails, its a hash collision. We should:
-            # - determine if its a collision or
-            # - the same content or
-            # - the existing name is not the actual hash - e.g.
-            #   its a deliberate attack or data corruption has
-            #   occuring during the write of that file.
-            self._packs.allocate(new_name, new_pack.revision_index_length,
-                new_pack.inventory_index_length, new_pack.text_index_length,
-                new_pack.signature_index_length)
-            self._open_pack_tuple = None
-            if not self._packs.autopack():
-                self._packs.save()
-        else:
-            # remove the pending upload
-            self._upload_transport.delete(self._open_pack_tuple[1])
-        self._revision_store.reset()
-        self.weave_store.reset()
-        self._inv_thunk.reset()
-        # forget what names there are - should just refresh and deal with the
-        # delta.
-        self._packs.reset()
-        self._open_pack_hash = None
-        self._write_stream = None
-
     def get_inventory_weave(self):
         return self._inv_thunk.get_weave()
 
@@ -1385,51 +1434,6 @@
             # forget what names there are
             self._packs.reset()
 
-    def _commit_write_group(self):
-        data_inserted = (self._revision_store.data_inserted() or
-            self.weave_store.data_inserted() or 
-            self._inv_thunk.data_inserted())
-        if data_inserted:
-            self._open_pack_writer.end()
-            new_name = self._open_pack_hash.hexdigest()
-            new_pack = Pack()
-            new_pack.name = new_name
-            new_pack.transport = self._upload_transport.clone('../packs/')
-            # To populate:
-            # new_pack.revision_index = 
-            # new_pack.inventory_index = 
-            # new_pack.text_index = 
-            # new_pack.signature_index = 
-            self.weave_store.flush(new_name, new_pack)
-            self._inv_thunk.flush(new_name, new_pack)
-            self._revision_store.flush(new_name, new_pack)
-            self._write_stream.close()
-            self._upload_transport.rename(self._open_pack_tuple[1],
-                '../packs/' + new_name + '.pack')
-            # If this fails, its a hash collision. We should:
-            # - determine if its a collision or
-            # - the same content or
-            # - the existing name is not the actual hash - e.g.
-            #   its a deliberate attack or data corruption has
-            #   occuring during the write of that file.
-            self._packs.allocate(new_name, new_pack.revision_index_length,
-                new_pack.inventory_index_length, new_pack.text_index_length,
-                new_pack.signature_index_length)
-            self._open_pack_tuple = None
-            if not self._packs.autopack():
-                self._packs.save()
-        else:
-            # remove the pending upload
-            self._upload_transport.delete(self._open_pack_tuple[1])
-        self._revision_store.reset()
-        self.weave_store.reset()
-        self._inv_thunk.reset()
-        # forget what names there are - should just refresh and deal with the
-        # delta.
-        self._packs.reset()
-        self._open_pack_hash = None
-        self._write_stream = None
-
     def get_inventory_weave(self):
         return self._inv_thunk.get_weave()
 

=== modified file 'bzrlib/tests/test_pack_repository.py'
--- a/bzrlib/tests/test_pack_repository.py	2007-08-30 00:08:46 +0000
+++ b/bzrlib/tests/test_pack_repository.py	2007-08-30 04:33:41 +0000
@@ -51,6 +51,5 @@
         # finish storing
         repo.commit_write_group()
         # try to get it back
-        return
         result = repo._get_bytes_by_hash(stuff_hash)
-        self.assertEquals(stuff. result)
+        self.assertEquals(stuff, result)




More information about the bazaar-commits mailing list