Rev 2710: Start of autopacking logic. in http://people.ubuntu.com/~robertc/baz2.0/repository
Robert Collins
robertc at robertcollins.net
Fri Aug 10 07:32:17 BST 2007
At http://people.ubuntu.com/~robertc/baz2.0/repository
------------------------------------------------------------
revno: 2710
revision-id: robertc at robertcollins.net-20070810063212-a25zyg08g7ws7qxw
parent: robertc at robertcollins.net-20070808225838-lsunduwb1okzrexn
committer: Robert Collins <robertc at robertcollins.net>
branch nick: repository
timestamp: Fri 2007-08-10 16:32:12 +1000
message:
Start of autopacking logic.
modified:
bzrlib/repofmt/knitrepo.py knitrepo.py-20070206081537-pyy4a00xdas0j4pf-1
bzrlib/tests/test_repository.py test_repository.py-20060131075918-65c555b881612f4d
doc/developers/repository.txt repository.txt-20070709152006-xkhlek456eclha4u-1
=== modified file 'bzrlib/repofmt/knitrepo.py'
--- a/bzrlib/repofmt/knitrepo.py 2007-08-08 22:58:38 +0000
+++ b/bzrlib/repofmt/knitrepo.py 2007-08-10 06:32:12 +0000
@@ -16,6 +16,8 @@
from bzrlib.lazy_import import lazy_import
lazy_import(globals(), """
+from itertools import izip
+import math
import md5
from bzrlib import (
@@ -291,12 +293,235 @@
return result
-class RepositoryDataNames(object):
+class RepositoryPackCollection(object):
def __init__(self, repo, transport):
self.repo = repo
self.transport = transport
+ def autopack(self):
+ """Pack the pack collection incrementally.
+
+ This will not attempt global reorganisation or recompression,
+ rather it will just ensure that the total number of packs does
+ not grow without bound. It uses the _max_pack_count method to
+ determine if autopacking is needed, and the pack_distribution
+ method to determine the number of revisions in each pack.
+
+ If autopacking takes place then the packs name collection will have
+ been flushed to disk - packing requires updating the name collection
+ in synchronisation with certain steps. Otherwise the names collection
+ is not flushed.
+
+ :return: True if packing took place.
+ """
+ if self.repo._revision_all_indices is None:
+ # trigger creation of the all revision index.
+ self.repo._revision_store.get_revision_file(self.repo.get_transaction())
+ total_revisions = len(list(self.repo._revision_all_indices.iter_all_entries()))
+ # key_count() would be much more efficient
+ total_packs = len(self._names)
+ if self._max_pack_count(total_revisions) >= total_packs:
+ return False
+ # XXX: the following may want to be a class, to pack with a given
+ # policy.
+ mutter('Packing repository %s, which has %d pack files, '
+ 'containing %d revisions into %d packs.', self, total_packs,
+ total_revisions, self._max_pack_count(total_revisions))
+ # determine which packs need changing
+ pack_distribution = self.pack_distribution(total_revisions)
+ existing_packs = []
+ for index, transport_and_name in self.repo._revision_pack_map.iteritems():
+ if index is None:
+ continue
+ revision_count = len(list(index.iter_all_entries()))
+ if revision_count == 0:
+ # revision less packs are not generated by normal operation,
+ # only by operations like sign-my-commits, and thus will not
+ # tend to grow rapdily or without bound like commit containing
+ # packs do - leave them alone as packing them really should
+ # group their data with the relevant commit, and that may
+ # involve rewriting ancient history - which autopack tries to
+ # avoid. Alternatively we could not group the data but treat
+ # each of these as having a single revision, and thus add
+ # one revision for each to the total revision count, to get
+ # a matching distribution.
+ continue
+ existing_packs.append((revision_count, transport_and_name))
+ existing_packs.sort(reverse=True)
+ pack_operations = [[0, []]]
+ # plan out what packs to keep, and what to reorganise
+ while len(existing_packs):
+ # take the largest pack, and if its less than the head of the
+ # distribution chart we will include its contents in the new pack for
+ # that position. If its larger, we remove its size from the
+ # distribution chart
+ next_pack_rev_count, next_pack_details = existing_packs.pop(0)
+ if next_pack_rev_count >= pack_distribution[0]:
+ # don't shrink packs, we want to aggregate them
+ #raise NotImplementedError
+ return False
+ else:
+ # add the revisions we're going to add to the next output pack
+ pack_operations[-1][0] += next_pack_rev_count
+ # allocate this pack to the next pack sub operation
+ pack_operations[-1][1].append(next_pack_details)
+ if pack_operations[-1][0] >= pack_distribution[0]:
+ # this pack is used up, shift left.
+ del pack_distribution[0]
+ pack_operations.append([0, []])
+
+ for revision_count, pack_details in pack_operations:
+ if revision_count == 0:
+ continue
+ # have a progress bar?
+ self._combine_packs(pack_details)
+ for pack_detail in pack_details:
+ self._remove_pack_name(pack_detail[1])
+
+ # record the newly available packs and stop advertising the old
+ # packs
+ self.save()
+
+ # move the old packs out of the way
+ for revision_count, pack_details in pack_operations:
+ self._obsolete_packs(pack_details)
+
+ return True
+
+ def _combine_packs(self, pack_details):
+ """Combine the data from the packs listed in pack_details.
+
+ This does little more than a bulk copy of data. One key difference
+ is that data with the same item key across multiple packs is elided
+ from the output. The new pack is written into the current pack store
+ along with its indices, and the name added to the pack names. The
+ source packs are not altered.
+
+ :param pack_details: A list of tuples with the transport and pack name
+ in use.
+ :return: None
+ """
+ # open new pack - using the same name as the last temporary file
+ # - which has already been flushed, so its safe.
+ # XXX: - duplicate code warning with start_write_group; fix before
+ # considering 'done'.
+ random_name = self.repo.control_files._lock.nonce + '.autopack'
+ write_stream = self.repo._upload_transport.open_file_stream(random_name)
+ pack_hash = md5.new()
+ def write_data(bytes, update=pack_hash.update):
+ write_stream(bytes)
+ update(bytes)
+ writer = pack.ContainerWriter(write_data)
+ writer.begin()
+ # open new indices
+ revision_index = InMemoryGraphIndex(reference_lists=1)
+ inv_index = InMemoryGraphIndex(reference_lists=2)
+ text_index = InMemoryGraphIndex(reference_lists=2, key_elements=2)
+ signature_index = InMemoryGraphIndex(reference_lists=0)
+ # select revision keys
+ revision_index_map = self._revision_index_map(pack_details)
+ revision_nodes = self._index_contents(revision_index_map)
+ # copy revision keys and adjust values
+ self._copy_nodes_graph(revision_nodes, revision_index_map, writer, revision_index)
+ # select inventory keys
+ inv_index_map = self._inv_index_map(pack_details)
+ inv_nodes = self._index_contents(inv_index_map)
+ # copy inventory keys and adjust values
+ self._copy_nodes_graph(inv_nodes, inv_index_map, writer, inv_index)
+ # select text keys
+ text_index_map = self._text_index_map(pack_details)
+ text_nodes = self._index_contents(text_index_map)
+ # copy text keys and adjust values
+ self._copy_nodes_graph(text_nodes, text_index_map, writer, text_index)
+ # select signature keys
+ signature_index_map = self._signature_index_map(pack_details)
+ signature_nodes = self._index_contents(signature_index_map)
+ # copy signature keys and adjust values
+ self._copy_nodes(signature_nodes, signature_index_map, writer, signature_index)
+ # finish the pack
+ writer.end()
+ new_name = pack_hash.hexdigest()
+ # add to names
+ self.allocate(new_name)
+ # rename into place
+ self.repo._upload_transport.close_file_stream(random_name)
+ self.repo._upload_transport.rename(random_name, '../packs/' + new_name + '.pack')
+ # write indices
+ index_transport = self.repo._upload_transport.clone('../indices')
+ rev_index_name = self.repo._revision_store.name_to_revision_index_name(new_name)
+ index_transport.put_file(rev_index_name, revision_index.finish())
+ inv_index_name = self.repo._inv_thunk.name_to_inv_index_name(new_name)
+ index_transport.put_file(inv_index_name, inv_index.finish())
+ text_index_name = self.repo.weave_store.name_to_text_index_name(new_name)
+ index_transport.put_file(text_index_name, text_index.finish())
+ signature_index_name = self.repo._revision_store.name_to_signature_index_name(new_name)
+ index_transport.put_file(signature_index_name, signature_index.finish())
+
+ def _copy_nodes(self, nodes, index_map, writer, write_index):
+ # plan a readv on each source pack:
+ # group by pack
+ nodes = sorted(nodes)
+ # how to map this into knit.py - or knit.py into this?
+ # we don't want the typical knit logic, we want grouping by pack
+ # at this point - perhaps a helper library for the following code
+ # duplication points?
+ request_groups = {}
+ for index, key, value in nodes:
+ if index not in request_groups:
+ request_groups[index] = []
+ request_groups[index].append((key, value))
+ for index, items in request_groups.iteritems():
+ pack_readv_requests = []
+ for key, value in items:
+ # ---- KnitGraphIndex.get_position
+ bits = value[1:].split(' ')
+ offset, length = int(bits[0]), int(bits[1])
+ pack_readv_requests.append((offset, length, (key, value[0])))
+ # linear scan up the pack
+ pack_readv_requests.sort()
+ # copy the data
+ transport, path = index_map[index]
+ reader = pack.make_readv_reader(transport, path,
+ [offset[0:2] for offset in pack_readv_requests])
+ for (names, read_func), (_1, _2, (key, eol_flag)) in \
+ izip(reader.iter_records(), pack_readv_requests):
+ raw_data = read_func(None)
+ pos, size = writer.add_bytes_record(raw_data, names)
+ write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
+
+ def _copy_nodes_graph(self, nodes, index_map, writer, write_index):
+ # plan a readv on each source pack:
+ # group by pack
+ nodes = sorted(nodes)
+ # how to map this into knit.py - or knit.py into this?
+ # we don't want the typical knit logic, we want grouping by pack
+ # at this point - perhaps a helper library for the following code
+ # duplication points?
+ request_groups = {}
+ for index, key, value, references in nodes:
+ if index not in request_groups:
+ request_groups[index] = []
+ request_groups[index].append((key, value, references))
+ for index, items in request_groups.iteritems():
+ pack_readv_requests = []
+ for key, value, references in items:
+ # ---- KnitGraphIndex.get_position
+ bits = value[1:].split(' ')
+ offset, length = int(bits[0]), int(bits[1])
+ pack_readv_requests.append((offset, length, (key, value[0], references)))
+ # linear scan up the pack
+ pack_readv_requests.sort()
+ # copy the data
+ transport, path = index_map[index]
+ reader = pack.make_readv_reader(transport, path,
+ [offset[0:2] for offset in pack_readv_requests])
+ for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
+ izip(reader.iter_records(), pack_readv_requests):
+ raw_data = read_func(None)
+ pos, size = writer.add_bytes_record(raw_data, names)
+ write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
+
def ensure_loaded(self):
if self._names is None:
self._names = set(node[1][0] for node in
@@ -307,13 +532,131 @@
raise errors.DuplicateKey(name)
self._names.add(name)
+ def _max_pack_count(self, total_revisions):
+ """Return the maximum number of packs to use for total revisions.
+
+ :param total_revisions: The total number of revisions in the
+ repository.
+ """
+ if not total_revisions:
+ return 1
+ digits = str(total_revisions)
+ result = 0
+ for digit in digits:
+ result += int(digit)
+ return result
+
def names(self):
"""Provide an order to the underlying names."""
return sorted(self._names)
+ def _obsolete_packs(self, pack_details):
+ """Move a number of packs which have been obsoleted out of the way.
+
+ Each pack and its associated indices are moved out of the way.
+
+ Note: for correctness this function should only be called after a new
+ pack names index has been written without these pack names, and with
+ the names of packs that contain the data previously available via these
+ packs.
+
+ :param pack_details: The transport, name tuples for the packs.
+ :param return: None.
+ """
+ for pack_detail in pack_details:
+ pack_detail[0].rename(pack_detail[1],
+ '../obsolete_packs/' + pack_detail[1])
+ basename = pack_detail[1][:-4]
+ index_transport = pack_detail[0].clone('../indices')
+ for suffix in ('iix', 'six', 'tix', 'rix'):
+ index_transport.rename(basename + suffix,
+ '../obsolete_packs/' + basename + suffix)
+
+ def pack_distribution(self, total_revisions):
+ """Generate a list of the number of revisions to put in each pack.
+
+ :param total_revisions: The total number of revisions in the
+ repository.
+ """
+ if total_revisions == 0:
+ return [0]
+ digits = reversed(str(total_revisions))
+ result = []
+ for exponent, count in enumerate(digits):
+ size = 10 ** exponent
+ for pos in range(int(count)):
+ result.append(size)
+ return list(reversed(result))
+
+ def _remove_pack_name(self, name):
+ # strip .pack
+ self._names.remove(name[:-5])
+
def reset(self):
self._names = None
+ def _inv_index_map(self, pack_details):
+ """Get a map of inv index -> packs for pack_details."""
+ # the simplest thing for now is to create new index objects.
+ # this should really reuse the existing index objects for these
+ # packs - this means making the way they are managed in the repo be
+ # more sane.
+ indices = {}
+ for transport, name in pack_details:
+ index_name = name[:-5]
+ index_name = self.repo._inv_thunk.name_to_inv_index_name(index_name)
+ indices[GraphIndex(transport.clone('../indices'), index_name)] = \
+ (transport, name)
+ return indices
+
+ def _revision_index_map(self, pack_details):
+ """Get a map of revision index -> packs for pack_details."""
+ # the simplest thing for now is to create new index objects.
+ # this should really reuse the existing index objects for these
+ # packs - this means making the way they are managed in the repo be
+ # more sane.
+ indices = {}
+ for transport, name in pack_details:
+ index_name = name[:-5]
+ index_name = self.repo._revision_store.name_to_revision_index_name(index_name)
+ indices[GraphIndex(transport.clone('../indices'), index_name)] = \
+ (transport, name)
+ return indices
+
+ def _signature_index_map(self, pack_details):
+ """Get a map of signature index -> packs for pack_details."""
+ # the simplest thing for now is to create new index objects.
+ # this should really reuse the existing index objects for these
+ # packs - this means making the way they are managed in the repo be
+ # more sane.
+ indices = {}
+ for transport, name in pack_details:
+ index_name = name[:-5]
+ index_name = self.repo._revision_store.name_to_signature_index_name(index_name)
+ indices[GraphIndex(transport.clone('../indices'), index_name)] = \
+ (transport, name)
+ return indices
+
+ def _text_index_map(self, pack_details):
+ """Get a map of text index -> packs for pack_details."""
+ # the simplest thing for now is to create new index objects.
+ # this should really reuse the existing index objects for these
+ # packs - this means making the way they are managed in the repo be
+ # more sane.
+ indices = {}
+ for transport, name in pack_details:
+ index_name = name[:-5]
+ index_name = self.repo.weave_store.name_to_text_index_name(index_name)
+ indices[GraphIndex(transport.clone('../indices'), index_name)] = \
+ (transport, name)
+ return indices
+
+ def _index_contents(self, pack_map):
+ """Get an iterable of the index contents from a pack_map."""
+ indices = [index for index in pack_map.iterkeys()]
+ all_index = CombinedGraphIndex(indices)
+ return all_index.iter_all_entries()
+
def save(self):
builder = GraphIndexBuilder()
for name in self._names:
@@ -356,18 +699,11 @@
"""Get the revision versioned file object."""
if getattr(self.repo, '_revision_knit', None) is not None:
return self.repo._revision_knit
- indices = []
- self.repo._data_names.ensure_loaded()
- pack_map = {}
- for name in self.repo._data_names.names():
- # TODO: maybe this should expose size to us to allow
- # sorting of the indices for better performance ?
- index_name = self.name_to_revision_index_name(name)
- indices.append(GraphIndex(self.transport, index_name))
- pack_map[indices[-1]] = (self.repo._pack_tuple(name))
+ self.repo._packs.ensure_loaded()
+ pack_map, indices = self._make_rev_pack_map()
if self.repo.is_in_write_group():
# allow writing: queue writes to a new index
- indices.append(self.repo._revision_write_index)
+ indices.insert(0, self.repo._revision_write_index)
pack_map[self.repo._revision_write_index] = self.repo._open_pack_tuple
writer = self.repo._open_pack_writer, self.repo._revision_write_index
add_callback = self.repo._revision_write_index.add_nodes
@@ -378,6 +714,7 @@
knit_index = KnitGraphIndex(self.repo._revision_all_indices,
add_callback=add_callback)
knit_access = _PackAccess(pack_map, writer)
+ self.repo._revision_pack_map = pack_map
self.repo._revision_knit_access = knit_access
self.repo._revision_knit = knit.KnitVersionedFile(
'revisions', self.transport.clone('..'),
@@ -387,14 +724,25 @@
access_method=knit_access)
return self.repo._revision_knit
+ def _make_rev_pack_map(self):
+ indices = []
+ pack_map = {}
+ for name in self.repo._packs.names():
+ # TODO: maybe this should expose size to us to allow
+ # sorting of the indices for better performance ?
+ index_name = self.name_to_revision_index_name(name)
+ indices.append(GraphIndex(self.transport, index_name))
+ pack_map[indices[-1]] = (self.repo._pack_tuple(name))
+ return pack_map, indices
+
def get_signature_file(self, transaction):
"""Get the signature versioned file object."""
if getattr(self.repo, '_signature_knit', None) is not None:
return self.repo._signature_knit
indices = []
- self.repo._data_names.ensure_loaded()
+ self.repo._packs.ensure_loaded()
pack_map = {}
- for name in self.repo._data_names.names():
+ for name in self.repo._packs.names():
# TODO: maybe this should expose size to us to allow
# sorting of the indices for better performance ?
index_name = self.name_to_signature_index_name(name)
@@ -402,7 +750,7 @@
pack_map[indices[-1]] = (self.repo._pack_tuple(name))
if self.repo.is_in_write_group():
# allow writing: queue writes to a new index
- indices.append(self.repo._signature_write_index)
+ indices.insert(0, self.repo._signature_write_index)
pack_map[self.repo._signature_write_index] = self.repo._open_pack_tuple
writer = self.repo._open_pack_writer, self.repo._signature_write_index
add_callback = self.repo._signature_write_index.add_nodes
@@ -438,16 +786,24 @@
new_index_name = self.name_to_revision_index_name(new_name)
self.transport.put_file(new_index_name,
self.repo._revision_write_index.finish())
- self.repo._revision_write_index = None
- if self.repo._revision_all_indices is not None:
+ if self.repo._revision_all_indices is None:
+ # create a pack map for the autopack code - XXX finish
+ # making a clear managed list of packs, indices and use
+ # that in these mapping classes
+ self.repo._revision_pack_map = self._make_rev_pack_map()[0]
+ else:
+ del self.repo._revision_pack_map[self.repo._revision_write_index]
+ self.repo._revision_write_index = None
+ new_index = GraphIndex(self.transport, new_index_name)
+ self.repo._revision_pack_map[new_index] = (self.repo._pack_tuple(new_name))
# revisions 'knit' accessed : update it.
- self.repo._revision_all_indices.insert_index(0,
- GraphIndex(self.transport, new_index_name))
+ self.repo._revision_all_indices.insert_index(0, new_index)
# remove the write buffering index. XXX: API break
# - clearly we need a remove_index call too.
- del self.repo._revision_all_indices._indices[-1]
+ del self.repo._revision_all_indices._indices[1]
# reset the knit access writer
self.repo._revision_knit_access.set_writer(None, None, (None, None))
+
# write a signatures index (might be empty)
new_index_name = self.name_to_signature_index_name(new_name)
self.transport.put_file(new_index_name,
@@ -459,7 +815,7 @@
GraphIndex(self.transport, new_index_name))
# remove the write buffering index. XXX: API break
# - clearly we need a remove_index call too.
- del self.repo._signature_all_indices._indices[-1]
+ del self.repo._signature_all_indices._indices[1]
# reset the knit access writer
self.repo._signature_knit_access.set_writer(None, None, (None, None))
@@ -545,9 +901,9 @@
if getattr(self.repo, '_text_all_indices', None) is not None:
return
indices = []
- self.repo._data_names.ensure_loaded()
+ self.repo._packs.ensure_loaded()
self.repo._text_pack_map = {}
- for name in self.repo._data_names.names():
+ for name in self.repo._packs.names():
# TODO: maybe this should expose size to us to allow
# sorting of the indices for better performance ?
index_name = self.name_to_text_index_name(name)
@@ -555,7 +911,7 @@
self.repo._text_pack_map[indices[-1]] = (self.repo._pack_tuple(name))
if self.repo.is_in_write_group():
# allow writing: queue writes to a new index
- indices.append(self.repo._text_write_index)
+ indices.insert(0, self.repo._text_write_index)
self.repo._text_all_indices = CombinedGraphIndex(indices)
def flush(self, new_name):
@@ -573,7 +929,7 @@
GraphIndex(self.transport, new_index_name))
# remove the write buffering index. XXX: API break
# - clearly we need a remove_index call too.
- del self.repo._text_all_indices._indices[-1]
+ del self.repo._text_all_indices._indices[1]
def get_weave_or_empty(self, file_id, transaction):
"""Get a 'Knit' backed by the .tix indices.
@@ -659,9 +1015,9 @@
if getattr(self.repo, '_inv_all_indices', None) is not None:
return
indices = []
- self.repo._data_names.ensure_loaded()
+ self.repo._packs.ensure_loaded()
pack_map = {}
- for name in self.repo._data_names.names():
+ for name in self.repo._packs.names():
# TODO: maybe this should expose size to us to allow
# sorting of the indices for better performance ?
index_name = self.name_to_inv_index_name(name)
@@ -688,7 +1044,7 @@
GraphIndex(self.transport, new_index_name))
# remove the write buffering index. XXX: API break
# - clearly we need a remove_index call too.
- del self.repo._inv_all_indices._indices[-1]
+ del self.repo._inv_all_indices._indices[1]
self.repo._inv_knit_access.set_writer(None, None, (None, None))
self.repo._inv_pack_map = None
@@ -751,7 +1107,7 @@
KnitRepository.__init__(self, _format, a_bzrdir, control_files,
_revision_store, control_store, text_store)
index_transport = control_files._transport.clone('indices')
- self._data_names = RepositoryDataNames(self, index_transport)
+ self._packs = RepositoryPackCollection(self, index_transport)
self._revision_store = GraphKnitRevisionStore(self, index_transport, self._revision_store)
self.weave_store = GraphKnitTextStore(self, index_transport, self.weave_store)
self._inv_thunk = InventoryKnitThunk(self, index_transport)
@@ -764,7 +1120,7 @@
self.weave_store.reset()
self._inv_thunk.reset()
# forget what names there are
- self._data_names.reset()
+ self._packs.reset()
self._open_pack_hash = None
def _pack_tuple(self, name):
@@ -777,7 +1133,7 @@
self.weave_store.reset()
self._inv_thunk.reset()
# forget what names there are
- self._data_names.reset()
+ self._packs.reset()
def _start_write_group(self):
random_name = self.control_files._lock.nonce
@@ -789,7 +1145,7 @@
self._open_pack_hash.update(bytes)
self._open_pack_writer = pack.ContainerWriter(write_data)
self._open_pack_writer.begin()
- self._data_names.setup()
+ self._packs.setup()
self._revision_store.setup()
self.weave_store.setup()
self._inv_thunk.setup()
@@ -807,14 +1163,15 @@
# - the existing name is not the actual hash - e.g.
# its a deliberate attack or data corruption has
# occuring during the write of that file.
- self._data_names.allocate(new_name)
+ self._packs.allocate(new_name)
self.weave_store.flush(new_name)
self._inv_thunk.flush(new_name)
self._revision_store.flush(new_name)
self._upload_transport.close_file_stream(self._open_pack_tuple[1])
self._upload_transport.rename(self._open_pack_tuple[1],
'../packs/' + new_name + '.pack')
- self._data_names.save()
+ if not self._packs.autopack():
+ self._packs.save()
else:
# can the pending upload
self._upload_transport.delete(self._open_pack_tuple[1])
@@ -823,7 +1180,7 @@
self._inv_thunk.reset()
# forget what names there are - should just refresh and deal with the
# delta.
- self._data_names.reset()
+ self._packs.reset()
self._open_pack_hash = None
def get_inventory_weave(self):
@@ -855,7 +1212,7 @@
KnitRepository3.__init__(self, _format, a_bzrdir, control_files,
_revision_store, control_store, text_store)
index_transport = a_bzrdir.get_repository_transport(None).clone('indices')
- self._data_names = RepositoryDataNames(self, index_transport)
+ self._packs = RepositoryPackCollection(self, index_transport)
self._revision_store = GraphKnitRevisionStore(self, index_transport, self._revision_store)
self.weave_store = GraphKnitTextStore(self, index_transport, self.weave_store)
self._inv_thunk = InventoryKnitThunk(self, index_transport)
@@ -868,7 +1225,7 @@
self.weave_store.reset()
self._inv_thunk.reset()
# forget what names there are
- self._data_names.reset()
+ self._packs.reset()
self._open_pack_hash = None
def _pack_tuple(self, name):
@@ -881,7 +1238,7 @@
self.weave_store.reset()
self._inv_thunk.reset()
# forget what names there are
- self._data_names.reset()
+ self._packs.reset()
def _start_write_group(self):
random_name = self.control_files._lock.nonce
@@ -893,7 +1250,7 @@
self._open_pack_hash.update(bytes)
self._open_pack_writer = pack.ContainerWriter(write_data)
self._open_pack_writer.begin()
- self._data_names.setup()
+ self._packs.setup()
self._revision_store.setup()
self.weave_store.setup()
self._inv_thunk.setup()
@@ -911,14 +1268,15 @@
# - the existing name is not the actual hash - e.g.
# its a deliberate attack or data corruption has
# occuring during the write of that file.
- self._data_names.allocate(new_name)
+ self._packs.allocate(new_name)
self.weave_store.flush(new_name)
self._inv_thunk.flush(new_name)
self._revision_store.flush(new_name)
self._upload_transport.close_file_stream(self._open_pack_tuple[1])
self._upload_transport.rename(self._open_pack_tuple[1],
'../packs/' + new_name + '.pack')
- self._data_names.save()
+ if not self._packs.autopack():
+ self._packs.save()
else:
# can the pending upload
self._upload_transport.delete(self._open_pack_tuple[1])
@@ -927,7 +1285,7 @@
self._inv_thunk.reset()
# forget what names there are - should just refresh and deal with the
# delta.
- self._data_names.reset()
+ self._packs.reset()
self._open_pack_hash = None
def get_inventory_weave(self):
@@ -1195,6 +1553,7 @@
mutter('changing to GraphKnit1 repository in %s.', a_bzrdir.transport.base)
repo_transport = a_bzrdir.get_repository_transport(None)
repo_transport.mkdir('indices')
+ repo_transport.mkdir('obsolete_packs')
repo_transport.mkdir('packs')
repo_transport.mkdir('upload')
repo_transport.rmdir('knits')
=== modified file 'bzrlib/tests/test_repository.py'
--- a/bzrlib/tests/test_repository.py 2007-08-08 22:58:38 +0000
+++ b/bzrlib/tests/test_repository.py 2007-08-10 06:32:12 +0000
@@ -652,6 +652,128 @@
self.assertEqual([],
list(GraphIndex(trans.clone('indices'), 'index').iter_all_entries()))
+ def test_commit_across_pack_shape_boundary_autopacks(self):
+ format = self.get_format()
+ tree = self.make_branch_and_tree('.', format=format)
+ trans = tree.branch.repository.bzrdir.get_repository_transport(None)
+ # This test could be a little cheaper by replacing the packs
+ # attribute on the repository to allow a different pack distribution
+ # and max packs policy - so we are hecking the policy is honoured
+ # in the test. But for now 11 commits is not a big deal in a single
+ # test.
+ for x in range(9):
+ tree.commit('commit %s' % x)
+ # there should be 9 packs:
+ index = GraphIndex(trans.clone('indices'), 'index')
+ self.assertEqual(9, len(list(index.iter_all_entries())))
+ # committing one more should coalesce to 1 of 10.
+ tree.commit('commit triggering pack')
+ index = GraphIndex(trans.clone('indices'), 'index')
+ self.assertEqual(1, len(list(index.iter_all_entries())))
+ # packing should not damage data
+ tree = tree.bzrdir.open_workingtree()
+ check_result = tree.branch.repository.check(
+ [tree.branch.last_revision()])
+ # XXX: Todo check packs obsoleted correctly - old packs and indices
+ # in the obsolete_packs directory.
+ large_pack_name = list(index.iter_all_entries())[0][1][0]
+ # finally, committing again should not touch the large pack.
+ tree.commit('commit not triggering pack')
+ index = GraphIndex(trans.clone('indices'), 'index')
+ self.assertEqual(2, len(list(index.iter_all_entries())))
+ pack_names = [node[1][0] for node in index.iter_all_entries()]
+ self.assertTrue(large_pack_name in pack_names)
+
+# TESTS TO WRITE:
+# XXX: signatures must be preserved. add a test.
+# XXX: packs w/o revisions are ignored by autopack
+# XXX: packs w/o revisions are packed by explicit pack
+# XXX: packs bigger than the planned distribution chart
+# are skipped over by autopack, and their revision
+# counts removed from the large end of the distribution
+# chart.
+
+
+
+class TestRepositoryPackCollection(TestCaseWithTransport):
+
+ def get_format(self):
+ return bzrdir.format_registry.make_bzrdir('experimental')
+
+ def test__max_pack_count(self):
+ """The maximum pack count is geared from the number of revisions."""
+ format = self.get_format()
+ repo = self.make_repository('.', format=format)
+ packs = repo._packs
+ # no revisions - one pack, so that we can have a revision free repo
+ # without it blowing up
+ self.assertEqual(1, packs._max_pack_count(0))
+ # after that the sum of the digits, - check the first 1-9
+ self.assertEqual(1, packs._max_pack_count(1))
+ self.assertEqual(2, packs._max_pack_count(2))
+ self.assertEqual(3, packs._max_pack_count(3))
+ self.assertEqual(4, packs._max_pack_count(4))
+ self.assertEqual(5, packs._max_pack_count(5))
+ self.assertEqual(6, packs._max_pack_count(6))
+ self.assertEqual(7, packs._max_pack_count(7))
+ self.assertEqual(8, packs._max_pack_count(8))
+ self.assertEqual(9, packs._max_pack_count(9))
+ # check the boundary cases with two digits for the next decade
+ self.assertEqual(1, packs._max_pack_count(10))
+ self.assertEqual(2, packs._max_pack_count(11))
+ self.assertEqual(10, packs._max_pack_count(19))
+ self.assertEqual(2, packs._max_pack_count(20))
+ self.assertEqual(3, packs._max_pack_count(21))
+ # check some arbitrary big numbers
+ self.assertEqual(25, packs._max_pack_count(112894))
+
+ def test_pack_distribution_zero(self):
+ format = self.get_format()
+ repo = self.make_repository('.', format=format)
+ packs = repo._packs
+ self.assertEqual([0], packs.pack_distribution(0))
+
+ def test_pack_distribution_one_to_nine(self):
+ format = self.get_format()
+ repo = self.make_repository('.', format=format)
+ packs = repo._packs
+ self.assertEqual([1],
+ packs.pack_distribution(1))
+ self.assertEqual([1, 1],
+ packs.pack_distribution(2))
+ self.assertEqual([1, 1, 1],
+ packs.pack_distribution(3))
+ self.assertEqual([1, 1, 1, 1],
+ packs.pack_distribution(4))
+ self.assertEqual([1, 1, 1, 1, 1],
+ packs.pack_distribution(5))
+ self.assertEqual([1, 1, 1, 1, 1, 1],
+ packs.pack_distribution(6))
+ self.assertEqual([1, 1, 1, 1, 1, 1, 1],
+ packs.pack_distribution(7))
+ self.assertEqual([1, 1, 1, 1, 1, 1, 1, 1],
+ packs.pack_distribution(8))
+ self.assertEqual([1, 1, 1, 1, 1, 1, 1, 1, 1],
+ packs.pack_distribution(9))
+
+ def test_pack_distribution_stable_at_boundaries(self):
+ """When there are multi-rev packs the counts are stable."""
+ format = self.get_format()
+ repo = self.make_repository('.', format=format)
+ packs = repo._packs
+ # in 10s:
+ self.assertEqual([10], packs.pack_distribution(10))
+ self.assertEqual([10, 1], packs.pack_distribution(11))
+ self.assertEqual([10, 10], packs.pack_distribution(20))
+ self.assertEqual([10, 10, 1], packs.pack_distribution(21))
+ # 100s
+ self.assertEqual([100], packs.pack_distribution(100))
+ self.assertEqual([100, 1], packs.pack_distribution(101))
+ self.assertEqual([100, 10, 1], packs.pack_distribution(111))
+ self.assertEqual([100, 100], packs.pack_distribution(200))
+ self.assertEqual([100, 100, 1], packs.pack_distribution(201))
+ self.assertEqual([100, 100, 10, 1], packs.pack_distribution(211))
+
class TestExperimentalSubtrees(TestExperimentalNoSubtrees):
=== modified file 'doc/developers/repository.txt'
--- a/doc/developers/repository.txt 2007-08-01 00:20:37 +0000
+++ b/doc/developers/repository.txt 2007-08-10 06:32:12 +0000
@@ -271,7 +271,9 @@
interesting to have it be deterministic based on content, but there are no
specific problems we have solved by doing that, and doing so would require
hashing the full file. OTOH hashing the full file is a cheap way to detect
-bit-errors in transfer (such as windows corruption).
+bit-errors in transfer (such as windows corruption). Non-reused file names
+are required for data integrity, as clients having read an index will
+readv at arbitrary times later.
Discovery of files
~~~~~~~~~~~~~~~~~~
@@ -298,6 +300,49 @@
Choosing compression/delta support
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+Sketched notes on packs
+~~~~~~~~~~~~~~~~~~~~~~~
+
+ fast pull:
+ - query and pull revs
+ - query and pull references
+ - query and pull further references
+ - etc
+ - for now:
+ - query and pull revs and sigs
+ - query and pull inventory with cache on
+ - query and pull all of texts
+
+ pack:
+ - two modes - complete and incremental
+ - complete:
+ - grab all revisions, regenerate deltas, etc.
+ - may want data grouping by recreation not by
+ origin. i.e. 'tip' pack has everything to construct
+ revision -1, older packs have data that is not needed
+ for current revisions.
+
+ - incremental:
+ - leave most data untouched most of the time
+ - therefore can't really promote old data to be current.
+ - possible approach:
+ upper number of packs = sum of digits in the commit count
+ so we need to combine packs whenever a commit rolls over a power
+ of 10 - at most once every 10 commit, possibly much less.
+ exponential backoff of commits per pack file. So the number of
+ revisions per pack is log10(revisions in repository).
+ up to 10 commits - 1 revision per pack
+ up to 100 commits - 2 revisions per pack
+ 10 commits - 1 packs, 100 commits - 2 packs
+
+ - count number of 1-revision packs, if 10 group them
+ into one larger pack.
+ - count number of 10-unit packs. if 10, group those
+ into one larger pack.
+ - etc
+ this approach reads 10 entire packs every 10 commits,
+ 100 every 100, 1000 every 1000
+
Caching and writeing of data
============================
More information about the bazaar-commits
mailing list