Rev 5122: we can now round-trip content via data_access. in http://bazaar.launchpad.net/~jameinel/bzr/2.2.0b2-pack-collection

John Arbash Meinel john at arbash-meinel.com
Wed Jun 16 19:59:50 BST 2010


At http://bazaar.launchpad.net/~jameinel/bzr/2.2.0b2-pack-collection

------------------------------------------------------------
revno: 5122
revision-id: john at arbash-meinel.com-20100616185940-nppw91i1763jbflk
parent: john at arbash-meinel.com-20100616174337-2o56z0n1tqnn4mm5
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: 2.2.0b2-pack-collection
timestamp: Wed 2010-06-16 13:59:40 -0500
message:
  we can now round-trip content via data_access.
-------------- next part --------------
=== modified file 'bzrlib/pack_collection.py'
--- a/bzrlib/pack_collection.py	2010-06-16 17:43:37 +0000
+++ b/bzrlib/pack_collection.py	2010-06-16 18:59:40 +0000
@@ -125,15 +125,24 @@
 
     This is where the policy exists as to whether we put the new content into
     'upload' and then rename it into 'packs/' when finished.
+
+ AggregateIndex
+    Used to help bridge the gap between many pack files and indexes and a
+    VersionedFile api. This collects the indexes into a CombinedGraphIndex, and
+    provides a mapping from an Index instance back to the pack file it
+    corresponds to. (So that index queries give you a memo that you can use to
+    get the raw content out of the pack file.)
 """
 
 import cStringIO
 import struct
 
 from bzrlib import (
+    errors,
     index as _mod_index,
     knit,
-    errors,
+    osutils,
+    pack as _mod_pack,
     )
 
 
@@ -464,6 +473,23 @@
 
 class NewPack(Pack):
 
+    def __init__(self, pack_policy, pack_collection):
+        # TODO: upload_suffix, file_mode
+        self._pack_policy = pack_policy
+        self._pack_collection = pack_collection
+        transport, name = self._pack_policy.get_info_for_new_pack()
+        super(NewPack, self).__init__(transport, name)
+        self._hash = osutils.md5()
+        self.write_stream = transport.open_write_stream(self.name) #, XXX: mode)
+        self._writer = _mod_pack.ContainerWriter(self._write_data)
+        self._writer.begin()
+        self._state = 'open'
+        # self._finished = False
+
+    def _write_data(self, bytes, flush=False):
+        self.write_stream.write(bytes)
+        self._hash.update(bytes)
+
     def abort(self):
         pass
 
@@ -488,6 +514,10 @@
     def set_write_cache_size(self, size):
         pass
 
+    def get_writer(self):
+        """Get the ContainerWriter for data_access to send content to."""
+        return self._writer
+
 
 class PackPolicy(object):
     """Description of how we want to manage pack files.
@@ -498,6 +528,8 @@
 
     _pack_class = Pack # XXX: ExistingPack?
     _new_pack_factory = NewPack
+    _index_class = None # GraphIndex / BTreeGraphIndex
+    _index_builder_factory = None # GraphIndexBuilder / BTreeBuilder
 
     def __init__(self):
         pass
@@ -513,13 +545,23 @@
         """
         raise NotImplementedError(self.create_pack_from_memo)
 
-    def start_new_pack(self):
+    def start_new_pack(self, pack_collection):
         """Start a new Pack file, ready for writing.
 
+        :param pack_collection: A PackCollection instance that this pack will
+            be added to.
         :return: A NewPack instance.
         """
         raise NotImplementedError(self.start_new_pack)
 
+    def get_info_for_new_pack(self):
+        """A new pack has been created, give it a transport and a name.
+        
+        :return: Information for the instance of _new_pack_factory to determine
+            where it should exist. (transport, name)
+        """
+        raise NotImplementedError(self.get_info_for_new_pack)
+
     def update_callbacks(self, new_pack):
         """Update appropriate callback functionality for a new pack file.
 
@@ -606,7 +648,8 @@
         # allow writing: queue writes to a new index
         # Updates the index to packs mapping as a side effect,
         self.add_index(index, pack)
-    #     self.data_access.set_writer(pack._writer, index, pack.access_tuple())
+        self.data_access.set_writer(pack.get_writer(), index,
+                                    pack.access_tuple())
         self.add_callback = index.add_nodes
 
     # def clear(self):
@@ -713,13 +756,18 @@
     def start_write_group(self):
         """Prepare this collection for being able to insert new data.
         """
+        # XXX: start_write_group does not yet have direct tests
         # XXX: We need some sort of locking policy.
         # if not self.repo.is_write_locked():
         #     raise errors.NotWriteLocked()
+        # XXX: Custom exception here? Or at least a testable one
         assert self._new_pack is None
-        self._new_pack = self.pack_policy.start_new_pack()
+        self._new_pack = self.pack_policy.start_new_pack(self)
         self._aggregate_indexes_from_pack(new_pack, writable=True)
         self.pack_policy.update_callbacks(self._new_pack)
+        # TODO: When/where does this get done? Maybe in start_new_pack...
+        #    self._new_pack.text_index.set_optimize(
+        #           combine_backing_indices=False)
 
     def _flush_write_group(self):
         """If we have a write group active, flush the state to disk."""

=== modified file 'bzrlib/tests/test_pack_collection.py'
--- a/bzrlib/tests/test_pack_collection.py	2010-06-16 16:16:59 +0000
+++ b/bzrlib/tests/test_pack_collection.py	2010-06-16 18:59:40 +0000
@@ -22,6 +22,7 @@
     bencode,
     btree_index,
     errors,
+    osutils,
     pack_collection,
     tests,
     )
@@ -355,6 +356,7 @@
     """
 
     _index_class = btree_index.BTreeGraphIndex
+    _index_builder_factory = btree_index.BTreeBuilder
 
     def __init__(self, transport):
         self.transport = transport
@@ -362,7 +364,7 @@
     def open_pack_from_memo(self, name, memo):
         info = bencode.bdecode(memo)
         assert isinstance(info, dict)
-        p = pack_collection.Pack(self.transport, name)
+        p = self._pack_class(self.transport, name)
         for index_name, info in info.iteritems():
             size, = info
             index = self._index_class(self.transport, name, size=size)
@@ -370,6 +372,25 @@
         p._index_info = info
         return p
 
+    def start_new_pack(self, pack_collection):
+        new_pack = self._new_pack_factory(self, pack_collection)
+            # TODO: upload_suffix, file_mode)
+        # TODO: Where should the list of indexes exist?
+        #       Is it part of the Policy, or is it part of _new_pack_factory?
+        #       It could even be passed in from PackCollection....
+        #       IMO, I think the PackPolicy is probably the best place to put
+        #       it, as it is a good source for customization.
+        for index_name in ['t_index']:
+            index_builder = self._index_builder_factory()
+            new_pack._indexes[index_name] = index_builder
+        return new_pack
+
+    def get_info_for_new_pack(self):
+        # TODO: upload_suffix?
+        name = osutils.rand_chars(20) + '.pack'
+        return self.transport, name
+
+
 
 class TestAggregateIndex(tests.TestCaseWithMemoryTransport):
 
@@ -400,28 +421,28 @@
 
     def test_add_writable_index(self):
         agg_index = pack_collection.AggregateIndex()
-        builder = btree_index.BTreeBuilder()
-        trans_obj = self.get_transport()
-        pack_obj = pack_collection.Pack(trans_obj, 'fake-name')
+        policy = SingleTransportPackPolicy(self.get_transport())
+        new_pack = policy.start_new_pack(None)
+        builder = new_pack._indexes['t_index']
         self.assertIs(None, agg_index.add_callback)
-        agg_index.add_writable_index(builder, pack_obj)
+        agg_index.add_writable_index(builder, new_pack)
         # Can't use 'is' because apparently a new object is returned as the
         # reference...?
         self.assertEqual(builder.add_nodes, agg_index.add_callback)
         self.assertIs(builder, agg_index.combined_index._indices[0])
-        self.assertEqual({builder: (trans_obj, 'fake-name')},
+        self.assertEqual({builder: new_pack.access_tuple()},
                          agg_index.index_to_pack_access)
 
     def test_double_add_writable(self):
         agg_index = pack_collection.AggregateIndex()
-        builder = btree_index.BTreeBuilder()
-        trans_obj = self.get_transport()
-        pack_obj = pack_collection.Pack(trans_obj, 'fake-name')
-        agg_index.add_writable_index(builder, pack_obj)
+        policy = SingleTransportPackPolicy(self.get_transport())
+        new_pack = policy.start_new_pack(None)
+        builder = new_pack._indexes['t_index']
+        agg_index.add_writable_index(builder, new_pack)
         # Illegal to call it twice
         builder2 = btree_index.BTreeBuilder()
         self.assertRaises(RuntimeError,
-                          agg_index.add_writable_index, builder2, pack_obj)
+                          agg_index.add_writable_index, builder2, new_pack)
 
     def test_data_access(self):
         def reload_func():
@@ -432,12 +453,29 @@
                                                    flush_func=flush_func)
         index = btree_index.BTreeBuilder()
         trans_obj = self.get_transport()
-        pack_obj = pack_collection.Pack(trans_obj, 'temp-name')
+        pack_obj = pack_collection.Pack(trans_obj, 'fake-name')
         agg_index.add_index(index, pack_obj)
         data_access = agg_index.data_access
         self.assertIs(reload_func, data_access._reload_func)
         self.assertIs(flush_func, data_access._flush_func)
-        
+
+    def test_writable_data(self):
+        agg_index = pack_collection.AggregateIndex()
+        policy = SingleTransportPackPolicy(self.get_transport())
+        new_pack = policy.start_new_pack(None)
+        builder = new_pack._indexes['t_index']
+        agg_index.add_writable_index(builder, new_pack)
+        # The index has been added to the aggregate index correctly
+        self.assertIs(builder, agg_index.combined_index._indices[0])
+        # And the data_access now has something it can write to
+        data_access = agg_index.data_access
+        self.assertIs(builder, data_access._write_index)
+        self.assertIs(new_pack._writer, data_access._container_writer)
+        index_memos = data_access.add_raw_records([(('key',), 10)],
+                                                  '1234567890')
+        data_access.flush()
+        self.assertEqual(['1234567890'],
+                         list(data_access.get_raw_records(index_memos)))
 
 
 class TestPackCollection(tests.TestCaseWithMemoryTransport):



More information about the bazaar-commits mailing list