Rev 2716: Slightly broken, but branch and fetch performance is now roughly on par (for bzr.dev) with knits - should be much faster for large repos. in http://people.ubuntu.com/~robertc/baz2.0/repository
Robert Collins
robertc at robertcollins.net
Tue Aug 14 08:18:07 BST 2007
At http://people.ubuntu.com/~robertc/baz2.0/repository
------------------------------------------------------------
revno: 2716
revision-id: robertc at robertcollins.net-20070814071804-71d340lob7qv3wep
parent: robertc at robertcollins.net-20070813045212-hzoaikwhwp4ojy29
committer: Robert Collins <robertc at robertcollins.net>
branch nick: repository
timestamp: Tue 2007-08-14 17:18:04 +1000
message:
Slightly broken, but branch and fetch performance is now roughly on par (for bzr.dev) with knits - should be much faster for large repos.
modified:
bzrlib/branch.py branch.py-20050309040759-e4baf4e0d046576e
bzrlib/repofmt/pack_repo.py pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
bzrlib/repository.py rev_storage.py-20051111201905-119e9401e46257e3
=== modified file 'bzrlib/branch.py'
--- a/bzrlib/branch.py 2007-07-27 13:02:00 +0000
+++ b/bzrlib/branch.py 2007-08-14 07:18:04 +0000
@@ -2093,14 +2093,18 @@
if revision_id is None:
revno, revision_id = self.last_revision_info()
else:
- # To figure out the revno for a random revision, we need to build
- # the revision history, and count its length.
- # We don't care about the order, just how long it is.
- # Alternatively, we could start at the current location, and count
- # backwards. But there is no guarantee that we will find it since
- # it may be a merged revision.
- revno = len(list(self.repository.iter_reverse_revision_history(
- revision_id)))
+ # usually when revision_id is supplied it is the tip - try
+ # for the fast_path
+ revno, tip_revision_id = self.last_revision_info()
+ if tip_revision_id != revision_id:
+ # To figure out the revno for a random revision, we need to
+ # build the revision history, and count its length. We don't
+ # care about the order, just how long it is. Alternatively, we
+ # could start at the current location, and count backwards. But
+ # there is no guarantee that we will find it since it may be a
+ # merged revision.
+ revno = len(list(self.repository.iter_reverse_revision_history(
+ revision_id)))
destination.set_last_revision_info(revno, revision_id)
def _make_tags(self):
=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py 2007-08-13 04:45:41 +0000
+++ b/bzrlib/repofmt/pack_repo.py 2007-08-14 07:18:04 +0000
@@ -22,6 +22,7 @@
from bzrlib import (
pack,
+ ui,
)
from bzrlib.index import (
GraphIndex,
@@ -30,7 +31,7 @@
CombinedGraphIndex,
GraphIndexPrefixAdapter,
)
-from bzrlib.knit import KnitGraphIndex, _PackAccess
+from bzrlib.knit import KnitGraphIndex, _PackAccess, _KnitData
from bzrlib.pack import ContainerWriter
from bzrlib.store import revision
""")
@@ -59,11 +60,69 @@
from bzrlib.trace import mutter, note, warning
+class Pack(object):
+ """An in memory proxy for a .pack and its indices."""
+
+ def __init__(self):
+ self.revision_index = None
+ self.inventory_index = None
+ self.text_index = None
+ self.signature_index = None
+ self.name = None
+ self.transport = None
+
+ def get_revision_count(self):
+ return len(list(self.revision_index.iter_all_entries()))
+
+
class RepositoryPackCollection(object):
def __init__(self, repo, transport):
self.repo = repo
self.transport = transport
+ self.packs = []
+
+ def add_pack_to_memory(self, pack):
+ """Make a Pack object available to the repository to satisfy queries.
+
+ :param pack: A Pack object.
+ """
+ self.packs.append(pack)
+ if self.repo._revision_all_indices is None:
+ # to make this function more useful, perhaps we should make an
+ # all_indices object in future?
+ pass
+ else:
+ self.repo._revision_pack_map[pack.revision_index] = (
+ pack.transport, pack.name)
+ self.repo._revision_all_indices.insert_index(0, pack.revision_index)
+ if self.repo._inv_all_indices is not None:
+ # inv 'knit' has been used : update it.
+ self.repo._inv_all_indices.insert_index(0,
+ pack.inventory_index)
+ if self.repo._text_all_indices is not None:
+ # text 'knits' have been used : update it.
+ self.repo._text_all_indices.insert_index(0,
+ pack.text_index)
+ if self.repo._signature_all_indices is not None:
+ # sigatures 'knit' accessed : update it.
+ self.repo._signature_all_indices.insert_index(0,
+ pack.signature_index)
+
+ def all_pack_details(self):
+ """Return a list of all the packs as transport,name tuples.
+
+ :return: A list of (transport, name) tuples for all the packs in the
+ repository.
+ """
+ # XXX: fix me, should be direct rather than indirect
+ if self.repo._revision_all_indices is None:
+ # trigger creation of the all revision index.
+ self.repo._revision_store.get_revision_file(self.repo.get_transaction())
+ result = []
+ for index, transport_and_name in self.repo._revision_pack_map.iteritems():
+ result.append(transport_and_name)
+ return result
def autopack(self):
"""Pack the pack collection incrementally.
@@ -119,6 +178,88 @@
self._execute_pack_operations(pack_operations)
return True
+ def create_pack_from_packs(self, revision_index_map, inventory_index_map,
+ text_index_map, signature_index_map, revision_ids=None):
+ """Create a new pack by reading data from other packs.
+
+ This does little more than a bulk copy of data. One key difference
+ is that data with the same item key across multiple packs is elided
+ from the output. The new pack is written into the current pack store
+ along with its indices, and the name added to the pack names. The
+ source packs are not altered.
+
+ :param revision_index_map: A revision index map.
+ :param inventory_index_map: A inventory index map.
+ :param text_index_map: A text index map.
+ :param signature_index_map: A signature index map.
+ :param revision_ids: Either None, to copy all data, or a list
+ of revision_ids to limit the copied data to the data they
+ introduced.
+ :return: The number of revisions copied.
+ """
+ # open a pack - using the same name as the last temporary file
+ # - which has already been flushed, so its safe.
+ # XXX: - duplicate code warning with start_write_group; fix before
+ # considering 'done'.
+ if getattr(self.repo, '_open_pack_tuple', None) is not None:
+ raise errors.BzrError('call to create_pack_from_packs while '
+ 'another pack is being written.')
+ random_name = self.repo.control_files._lock.nonce + '.autopack'
+ write_stream = self.repo._upload_transport.open_file_stream(random_name)
+ pack_hash = md5.new()
+ def write_data(bytes, update=pack_hash.update):
+ write_stream(bytes)
+ update(bytes)
+ writer = pack.ContainerWriter(write_data)
+ writer.begin()
+ # open new indices
+ revision_index = InMemoryGraphIndex(reference_lists=1)
+ inv_index = InMemoryGraphIndex(reference_lists=2)
+ text_index = InMemoryGraphIndex(reference_lists=2, key_elements=2)
+ signature_index = InMemoryGraphIndex(reference_lists=0)
+ # select revision keys
+ revision_nodes = self._index_contents(revision_index_map)
+ # copy revision keys and adjust values
+ self._copy_nodes_graph(revision_nodes, revision_index_map, writer, revision_index)
+ # select inventory keys
+ inv_nodes = self._index_contents(inventory_index_map)
+ # copy inventory keys and adjust values
+ self._copy_nodes_graph(inv_nodes, inventory_index_map, writer, inv_index)
+ # select text keys
+ text_nodes = self._index_contents(text_index_map)
+ # copy text keys and adjust values
+ self._copy_nodes_graph(text_nodes, text_index_map, writer, text_index)
+ # select signature keys
+ signature_nodes = self._index_contents(signature_index_map)
+ # copy signature keys and adjust values
+ self._copy_nodes(signature_nodes, signature_index_map, writer, signature_index)
+ # finish the pack
+ writer.end()
+ new_name = pack_hash.hexdigest()
+ # add to names
+ self.allocate(new_name)
+ # rename into place
+ self.repo._upload_transport.close_file_stream(random_name)
+ self.repo._upload_transport.rename(random_name, '../packs/' + new_name + '.pack')
+ # write indices
+ index_transport = self.repo._upload_transport.clone('../indices')
+ rev_index_name = self.repo._revision_store.name_to_revision_index_name(new_name)
+ index_transport.put_file(rev_index_name, revision_index.finish())
+ inv_index_name = self.repo._inv_thunk.name_to_inv_index_name(new_name)
+ index_transport.put_file(inv_index_name, inv_index.finish())
+ text_index_name = self.repo.weave_store.name_to_text_index_name(new_name)
+ index_transport.put_file(text_index_name, text_index.finish())
+ signature_index_name = self.repo._revision_store.name_to_signature_index_name(new_name)
+ index_transport.put_file(signature_index_name, signature_index.finish())
+ result = Pack()
+ result.revision_index = revision_index
+ result.inventory_index = inv_index
+ result.text_index = text_index
+ result.signature_index = signature_index
+ result.name = new_name
+ result.transport = self.repo._upload_transport.clone('../packs/')
+ return result
+
def _execute_pack_operations(self, pack_operations):
"""Execute a series of pack operations.
@@ -218,61 +359,16 @@
in use.
:return: None
"""
- # open new pack - using the same name as the last temporary file
- # - which has already been flushed, so its safe.
- # XXX: - duplicate code warning with start_write_group; fix before
- # considering 'done'.
- random_name = self.repo.control_files._lock.nonce + '.autopack'
- write_stream = self.repo._upload_transport.open_file_stream(random_name)
- pack_hash = md5.new()
- def write_data(bytes, update=pack_hash.update):
- write_stream(bytes)
- update(bytes)
- writer = pack.ContainerWriter(write_data)
- writer.begin()
- # open new indices
- revision_index = InMemoryGraphIndex(reference_lists=1)
- inv_index = InMemoryGraphIndex(reference_lists=2)
- text_index = InMemoryGraphIndex(reference_lists=2, key_elements=2)
- signature_index = InMemoryGraphIndex(reference_lists=0)
# select revision keys
revision_index_map = self._revision_index_map(pack_details)
- revision_nodes = self._index_contents(revision_index_map)
- # copy revision keys and adjust values
- self._copy_nodes_graph(revision_nodes, revision_index_map, writer, revision_index)
# select inventory keys
inv_index_map = self._inv_index_map(pack_details)
- inv_nodes = self._index_contents(inv_index_map)
- # copy inventory keys and adjust values
- self._copy_nodes_graph(inv_nodes, inv_index_map, writer, inv_index)
# select text keys
text_index_map = self._text_index_map(pack_details)
- text_nodes = self._index_contents(text_index_map)
- # copy text keys and adjust values
- self._copy_nodes_graph(text_nodes, text_index_map, writer, text_index)
# select signature keys
signature_index_map = self._signature_index_map(pack_details)
- signature_nodes = self._index_contents(signature_index_map)
- # copy signature keys and adjust values
- self._copy_nodes(signature_nodes, signature_index_map, writer, signature_index)
- # finish the pack
- writer.end()
- new_name = pack_hash.hexdigest()
- # add to names
- self.allocate(new_name)
- # rename into place
- self.repo._upload_transport.close_file_stream(random_name)
- self.repo._upload_transport.rename(random_name, '../packs/' + new_name + '.pack')
- # write indices
- index_transport = self.repo._upload_transport.clone('../indices')
- rev_index_name = self.repo._revision_store.name_to_revision_index_name(new_name)
- index_transport.put_file(rev_index_name, revision_index.finish())
- inv_index_name = self.repo._inv_thunk.name_to_inv_index_name(new_name)
- index_transport.put_file(inv_index_name, inv_index.finish())
- text_index_name = self.repo.weave_store.name_to_text_index_name(new_name)
- index_transport.put_file(text_index_name, text_index.finish())
- signature_index_name = self.repo._revision_store.name_to_signature_index_name(new_name)
- index_transport.put_file(signature_index_name, signature_index.finish())
+ self.create_pack_from_packs(revision_index_map, inv_index_map,
+ text_index_map, signature_index_map)
def _copy_nodes(self, nodes, index_map, writer, write_index):
# plan a readv on each source pack:
@@ -307,6 +403,8 @@
write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
def _copy_nodes_graph(self, nodes, index_map, writer, write_index):
+ # for record verification
+ knit_data = _KnitData(None)
# plan a readv on each source pack:
# group by pack
nodes = sorted(nodes)
@@ -335,6 +433,8 @@
for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
izip(reader.iter_records(), pack_readv_requests):
raw_data = read_func(None)
+ df, _ = knit_data._parse_record_header(key[-1], raw_data)
+ df.close()
pos, size = writer.add_bytes_record(raw_data, names)
write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
@@ -410,6 +510,7 @@
def reset(self):
self._names = None
+ self.packs = []
def _inv_index_map(self, pack_details):
"""Get a map of inv index -> packs for pack_details."""
@@ -986,6 +1087,7 @@
self._upload_transport.close_file_stream(self._open_pack_tuple[1])
self._upload_transport.rename(self._open_pack_tuple[1],
'../packs/' + new_name + '.pack')
+ self._open_pack_tuple = None
if not self._packs.autopack():
self._packs.save()
else:
@@ -1100,6 +1202,7 @@
self._upload_transport.close_file_stream(self._open_pack_tuple[1])
self._upload_transport.rename(self._open_pack_tuple[1],
'../packs/' + new_name + '.pack')
+ self._open_pack_tuple = None
if not self._packs.autopack():
self._packs.save()
else:
=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py 2007-08-13 04:45:41 +0000
+++ b/bzrlib/repository.py 2007-08-14 07:18:04 +0000
@@ -1854,16 +1854,41 @@
@needs_write_lock
def fetch(self, revision_id=None, pb=None):
"""See InterRepository.fetch()."""
- from bzrlib.fetch import KnitRepoFetcher
mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
self.source, self.source._format, self.target, self.target._format)
# TODO: jam 20070210 This should be an assert, not a translate
revision_id = osutils.safe_revision_id(revision_id)
- f = KnitRepoFetcher(to_repository=self.target,
- from_repository=self.source,
- last_revision=revision_id,
- pb=pb)
- return f.count_copied, f.failed_revisions
+ self.count_copied = 0
+ if revision_id is None:
+ # nothing to do.
+ return
+ if _mod_revision.is_null(revision_id):
+ # TODO:
+ # everything to do - use pack logic
+ # to fetch from all packs to one without
+ # inventory parsing etc.
+ # till then:
+ revision_ids = self.source.all_revision_ids()
+ else:
+ try:
+ revision_ids = self.missing_revision_ids(revision_id)
+ except errors.NoSuchRevision:
+ raise errors.InstallFailed([revision_id])
+ packs = self.source._packs.all_pack_details()
+ revision_index_map = self.source._packs._revision_index_map(packs)
+ inventory_index_map = self.source._packs._inv_index_map(packs)
+ text_index_map = self.source._packs._text_index_map(packs)
+ signature_index_map = self.source._packs._signature_index_map(packs)
+ pack = self.target._packs.create_pack_from_packs(
+ revision_index_map,
+ inventory_index_map,
+ text_index_map,
+ signature_index_map,
+ revision_ids
+ )
+ self.target._packs.save()
+ self.target._packs.add_pack_to_memory(pack)
+ return pack.get_revision_count()
@needs_read_lock
def missing_revision_ids(self, revision_id=None):
@@ -2017,6 +2042,7 @@
InterRepository.register_optimiser(InterKnitRepo)
InterRepository.register_optimiser(InterModel1and2)
InterRepository.register_optimiser(InterKnit1and2)
+InterRepository.register_optimiser(InterPackRepo)
InterRepository.register_optimiser(InterRemoteRepository)
More information about the bazaar-commits
mailing list