Rev 5122: we can now round-trip content via data_access. in http://bazaar.launchpad.net/~jameinel/bzr/2.2.0b2-pack-collection
John Arbash Meinel
john at arbash-meinel.com
Wed Jun 16 19:59:50 BST 2010
At http://bazaar.launchpad.net/~jameinel/bzr/2.2.0b2-pack-collection
------------------------------------------------------------
revno: 5122
revision-id: john at arbash-meinel.com-20100616185940-nppw91i1763jbflk
parent: john at arbash-meinel.com-20100616174337-2o56z0n1tqnn4mm5
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: 2.2.0b2-pack-collection
timestamp: Wed 2010-06-16 13:59:40 -0500
message:
we can now round-trip content via data_access.
-------------- next part --------------
=== modified file 'bzrlib/pack_collection.py'
--- a/bzrlib/pack_collection.py 2010-06-16 17:43:37 +0000
+++ b/bzrlib/pack_collection.py 2010-06-16 18:59:40 +0000
@@ -125,15 +125,24 @@
This is where the policy exists as to whether we put the new content into
'upload' and then rename it into 'packs/' when finished.
+
+ AggregateIndex
+ Used to help bridge the gap between many pack files and indexes and a
+ VersionedFile api. This collects the indexes into a CombinedGraphIndex, and
+ provides a mapping from an Index instance back to the pack file it
+ corresponds to. (So that index queries give you a memo that you can use to
+ get the raw content out of the pack file.)
"""
import cStringIO
import struct
from bzrlib import (
+ errors,
index as _mod_index,
knit,
- errors,
+ osutils,
+ pack as _mod_pack,
)
@@ -464,6 +473,23 @@
class NewPack(Pack):
+ def __init__(self, pack_policy, pack_collection):
+ # TODO: upload_suffix, file_mode
+ self._pack_policy = pack_policy
+ self._pack_collection = pack_collection
+ transport, name = self._pack_policy.get_info_for_new_pack()
+ super(NewPack, self).__init__(transport, name)
+ self._hash = osutils.md5()
+ self.write_stream = transport.open_write_stream(self.name) #, XXX: mode)
+ self._writer = _mod_pack.ContainerWriter(self._write_data)
+ self._writer.begin()
+ self._state = 'open'
+ # self._finished = False
+
+ def _write_data(self, bytes, flush=False):
+ self.write_stream.write(bytes)
+ self._hash.update(bytes)
+
def abort(self):
pass
@@ -488,6 +514,10 @@
def set_write_cache_size(self, size):
pass
+ def get_writer(self):
+ """Get the ContainerWriter for data_access to send content to."""
+ return self._writer
+
class PackPolicy(object):
"""Description of how we want to manage pack files.
@@ -498,6 +528,8 @@
_pack_class = Pack # XXX: ExistingPack?
_new_pack_factory = NewPack
+ _index_class = None # GraphIndex / BTreeGraphIndex
+ _index_builder_factory = None # GraphIndexBuilder / BTreeBuilder
def __init__(self):
pass
@@ -513,13 +545,23 @@
"""
raise NotImplementedError(self.create_pack_from_memo)
- def start_new_pack(self):
+ def start_new_pack(self, pack_collection):
"""Start a new Pack file, ready for writing.
+ :param pack_collection: A PackCollection instance that this pack will
+ be added to.
:return: A NewPack instance.
"""
raise NotImplementedError(self.start_new_pack)
+ def get_info_for_new_pack(self):
+ """A new pack has been created, give it a transport and a name.
+
+ :return: Information for the instance of _new_pack_factory to determine
+ where it should exist. (transport, name)
+ """
+ raise NotImplementedError(self.get_info_for_new_pack)
+
def update_callbacks(self, new_pack):
"""Update appropriate callback functionality for a new pack file.
@@ -606,7 +648,8 @@
# allow writing: queue writes to a new index
# Updates the index to packs mapping as a side effect,
self.add_index(index, pack)
- # self.data_access.set_writer(pack._writer, index, pack.access_tuple())
+ self.data_access.set_writer(pack.get_writer(), index,
+ pack.access_tuple())
self.add_callback = index.add_nodes
# def clear(self):
@@ -713,13 +756,18 @@
def start_write_group(self):
"""Prepare this collection for being able to insert new data.
"""
+ # XXX: start_write_group does not yet have direct tests
# XXX: We need some sort of locking policy.
# if not self.repo.is_write_locked():
# raise errors.NotWriteLocked()
+ # XXX: Custom exception here? Or at least a testable one
assert self._new_pack is None
- self._new_pack = self.pack_policy.start_new_pack()
+ self._new_pack = self.pack_policy.start_new_pack(self)
self._aggregate_indexes_from_pack(new_pack, writable=True)
self.pack_policy.update_callbacks(self._new_pack)
+ # TODO: When/where does this get done? Maybe in start_new_pack...
+ # self._new_pack.text_index.set_optimize(
+ # combine_backing_indices=False)
def _flush_write_group(self):
"""If we have a write group active, flush the state to disk."""
=== modified file 'bzrlib/tests/test_pack_collection.py'
--- a/bzrlib/tests/test_pack_collection.py 2010-06-16 16:16:59 +0000
+++ b/bzrlib/tests/test_pack_collection.py 2010-06-16 18:59:40 +0000
@@ -22,6 +22,7 @@
bencode,
btree_index,
errors,
+ osutils,
pack_collection,
tests,
)
@@ -355,6 +356,7 @@
"""
_index_class = btree_index.BTreeGraphIndex
+ _index_builder_factory = btree_index.BTreeBuilder
def __init__(self, transport):
self.transport = transport
@@ -362,7 +364,7 @@
def open_pack_from_memo(self, name, memo):
info = bencode.bdecode(memo)
assert isinstance(info, dict)
- p = pack_collection.Pack(self.transport, name)
+ p = self._pack_class(self.transport, name)
for index_name, info in info.iteritems():
size, = info
index = self._index_class(self.transport, name, size=size)
@@ -370,6 +372,25 @@
p._index_info = info
return p
+ def start_new_pack(self, pack_collection):
+ new_pack = self._new_pack_factory(self, pack_collection)
+ # TODO: upload_suffix, file_mode)
+ # TODO: Where should the list of indexes exist?
+ # Is it part of the Policy, or is it part of _new_pack_factory?
+ # It could even be passed in from PackCollection....
+ # IMO, I think the PackPolicy is probably the best place to put
+ # it, as it is a good source for customization.
+ for index_name in ['t_index']:
+ index_builder = self._index_builder_factory()
+ new_pack._indexes[index_name] = index_builder
+ return new_pack
+
+ def get_info_for_new_pack(self):
+ # TODO: upload_suffix?
+ name = osutils.rand_chars(20) + '.pack'
+ return self.transport, name
+
+
class TestAggregateIndex(tests.TestCaseWithMemoryTransport):
@@ -400,28 +421,28 @@
def test_add_writable_index(self):
agg_index = pack_collection.AggregateIndex()
- builder = btree_index.BTreeBuilder()
- trans_obj = self.get_transport()
- pack_obj = pack_collection.Pack(trans_obj, 'fake-name')
+ policy = SingleTransportPackPolicy(self.get_transport())
+ new_pack = policy.start_new_pack(None)
+ builder = new_pack._indexes['t_index']
self.assertIs(None, agg_index.add_callback)
- agg_index.add_writable_index(builder, pack_obj)
+ agg_index.add_writable_index(builder, new_pack)
# Can't use 'is' because apparently a new object is returned as the
# reference...?
self.assertEqual(builder.add_nodes, agg_index.add_callback)
self.assertIs(builder, agg_index.combined_index._indices[0])
- self.assertEqual({builder: (trans_obj, 'fake-name')},
+ self.assertEqual({builder: new_pack.access_tuple()},
agg_index.index_to_pack_access)
def test_double_add_writable(self):
agg_index = pack_collection.AggregateIndex()
- builder = btree_index.BTreeBuilder()
- trans_obj = self.get_transport()
- pack_obj = pack_collection.Pack(trans_obj, 'fake-name')
- agg_index.add_writable_index(builder, pack_obj)
+ policy = SingleTransportPackPolicy(self.get_transport())
+ new_pack = policy.start_new_pack(None)
+ builder = new_pack._indexes['t_index']
+ agg_index.add_writable_index(builder, new_pack)
# Illegal to call it twice
builder2 = btree_index.BTreeBuilder()
self.assertRaises(RuntimeError,
- agg_index.add_writable_index, builder2, pack_obj)
+ agg_index.add_writable_index, builder2, new_pack)
def test_data_access(self):
def reload_func():
@@ -432,12 +453,29 @@
flush_func=flush_func)
index = btree_index.BTreeBuilder()
trans_obj = self.get_transport()
- pack_obj = pack_collection.Pack(trans_obj, 'temp-name')
+ pack_obj = pack_collection.Pack(trans_obj, 'fake-name')
agg_index.add_index(index, pack_obj)
data_access = agg_index.data_access
self.assertIs(reload_func, data_access._reload_func)
self.assertIs(flush_func, data_access._flush_func)
-
+
+ def test_writable_data(self):
+ agg_index = pack_collection.AggregateIndex()
+ policy = SingleTransportPackPolicy(self.get_transport())
+ new_pack = policy.start_new_pack(None)
+ builder = new_pack._indexes['t_index']
+ agg_index.add_writable_index(builder, new_pack)
+ # The index has been added to the aggregate index correctly
+ self.assertIs(builder, agg_index.combined_index._indices[0])
+ # And the data_access now has something it can write to
+ data_access = agg_index.data_access
+ self.assertIs(builder, data_access._write_index)
+ self.assertIs(new_pack._writer, data_access._container_writer)
+ index_memos = data_access.add_raw_records([(('key',), 10)],
+ '1234567890')
+ data_access.flush()
+ self.assertEqual(['1234567890'],
+ list(data_access.get_raw_records(index_memos)))
class TestPackCollection(tests.TestCaseWithMemoryTransport):
More information about the bazaar-commits
mailing list