[MERGE] Packs. Kthxbye.

Martin Pool mbp at sourcefrog.net
Fri Oct 19 04:11:13 BST 2007


ok, I've read the whole patch.  I could not spot the intentional
mistake, if any.  The most important things, which are pretty
important but need not block landing at least as an experimental
format, are already filed as bugs: having readers cope with pack
rearrangements, and avoiding ever-growing duplicated data.

=== added file 'bzrlib/repofmt/pack_repo.py'
--- bzrlib/repofmt/pack_repo.py	1970-01-01 00:00:00 +0000
+++ bzrlib/repofmt/pack_repo.py	2007-10-17 09:39:41 +0000
@@ -0,0 +1,1709 @@
+class Pack(object):
+    """An in memory proxy for a pack and its indices.

Odd to use 'indices' in some places and 'indexes' in others.  Either is ok,
indexes might be more obvious.  I dread having an object with .indices
and .indexes :)

+class ExistingPack(Pack):
+    """An in memory proxy for an exisiting .pack and its disk indices."""
+

Worth saying, slightly redundantly, that these are readonly?

+    def __init__(self, pack_transport, name, revision_index, inventory_index,
+        text_index, signature_index):
+        """Create an ExistingPack object.
+
+        :param pack_transport: The transport where the pack file resides.
+        :param name: The name of the pack on disk in the pack_transport.
+        """
+        Pack.__init__(self, revision_index, inventory_index, text_index,
+            signature_index)
+        self.name = name
+        self.pack_transport = pack_transport
+        assert None not in (revision_index, inventory_index, text_index,
+            signature_index, name, pack_transport)
+
+    def __eq__(self, other):
+        return self.__dict__ == other.__dict__
+
+    def __ne__(self, other):
+        return not self.__eq__(other)

I think on complex objects it's better to use an explicit method name that says
just what is being compared.  Otherwise, there is a tendency to
compare the whole dict
for ExistingPack, but just some attributes for others and a bug-prone situation
results.  If it's intended for use only in testing them spelling out the
method name is not too burdensome.


+
+    def __repr__(self):
+        return "<bzrlib.repofmt.pack_repo.Pack object at 0x%x, %s, %s" % (
+            id(self), self.transport, self.name)
+
+
+class NewPack(Pack):
+    """An in memory proxy for a pack which is being created."""
+
+    # A map of index 'type' to the file extension and position in the
+    # index_sizes array.
+    indices = {
+        'revision':('.rix', 0),
+        'inventory':('.iix', 1),
+        'text':('.tix', 2),
+        'signature':('.six', 3),
+        }

pep8 says or implies there should be a space after the colon, iirc.

This is not really a list of indexes - because it's also visible on instances I
think it's good to make that clear - maybe index_definitions or something.

The point of the numbers is not just that they index the array but they give
the order in the names list (?)

This list is effectively repeated several places; it should be combined.


+
+    def __init__(self, upload_transport, index_transport, pack_transport,
+        upload_suffix=''):
+        """Create a NewPack instance.
+
+        :param upload_transport: A writable transport for the pack to be
+            incrementally uploaded to.
+        :param index_transport: A writable transport for the pack's indices to
+            be written to when the pack is finished.
+        :param pack_transport: A writable transport for the pack to be renamed
+            to when the upload is complete. This *must* be the same as
+            upload_transport.clone('../packs').

Well, if it must be the same, remove the parameter and clone it yourself?  I
guess constructing the new transport would be a bit inefficient as the caller
already has them, and this is only called by RepositoryPackCollection.

+        :param upload_suffix: An optional suffix to be given to any temporary
+            files created during the pack creation. e.g '.autopack'
+        """
+        Pack.__init__(self,
+            # Revisions: parents list, no text compression.
+            InMemoryGraphIndex(reference_lists=1),
+            # Inventory: We want to map compression only, but currently the
+            # knit code hasn't been updated enough to understand that, so we
+            # have a regular 2-list index giving parents and compression
+            # source.
+            InMemoryGraphIndex(reference_lists=2),
+            # Texts: compression and per file graph, for all fileids - so two
+            # reference lists and two elements in the key tuple.
+            InMemoryGraphIndex(reference_lists=2, key_elements=2),
+            # Signatures: Just blobs to store, no compression, no parents
+            # listing.
+            InMemoryGraphIndex(reference_lists=0),
+            )
+        # where should the new pack be opened
+        self.upload_transport = upload_transport
+        # where are indices written out to
+        self.index_transport = index_transport
+        # where is the pack renamed to when it is finished?
+        self.pack_transport = pack_transport
+        # tracks the content written to the .pack file.
+        self._hash = md5.new()
+        # a four-tuple with the length in bytes of the indices, once the pack
+        # is finalised. (rev, inv, text, sigs)
+        self.index_sizes = None

This would be simpler if you just made index_sizes a dictionary keyed
by index_type.

+        # How much data to cache when writing packs. Note that this is not
+        # synchronised with reads, because its not in the transport layer, so
+        # is not safe unless the client knows it won't be reading from the pack
+        # under creation.

it's

+        self._cache_limit = 0
+        # the temporary pack file name.
+        self.random_name = rand_chars(20) + upload_suffix
+        # when was this pack started ?
+        self.start_time = time.time()
+        # open an output stream for the data added to the pack.
+        self.write_stream = self.upload_transport.open_write_stream(
+            self.random_name)
+        if 'fetch' in debug.debug_flags:
+            mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
+                time.ctime(), self.upload_transport.base, self.random_name,
+                time.time() - self.start_time)
+        self._buffer = [[], 0]

can you comment _buffer please, you were doing so well :)

+        # create a callable for adding data
+        def _write_data(bytes, flush=False, _buffer=self._buffer,
+            _write=self.write_stream.write, _update=self._hash.update):
+            _buffer[0].append(bytes)
+            _buffer[1] += len(bytes)
+            # 1MB buffer cap
+            if _buffer[1] > self._cache_limit or flush:
+                bytes = ''.join(_buffer[0])
+                _write(bytes)
+                _update(bytes)
+                _buffer[:] = [[], 0]
+        # expose this on self, for the occasion when clients want to add data.
+        self._write_data = _write_data

Is there a reason this cannot just be a regular method?

It seems like the buffer would be better off as a separate object,
perhaps later.

+    def data_inserted(self):
+        """True if data has been added to this pack."""
+        return 0 != sum((self.get_revision_count(),
+            self.inventory_index.key_count(),
+            self.text_index.key_count(),
+            self.signature_index.key_count(),
+            ))

or write

   bool(x() or y() or z() ...)



+
+    def finish(self):
+        """Finish the new pack.
+
+        This:
+         - finalises the content
+         - assigns a name (the md5 of the content, currently)
+         - writes out the associated indices
+         - renames the pack into place.
+         - stores the index size tuple for the pack in the index_sizes
+           attribute.
+        """
+        self._writer.end()
+        if self._buffer[1]:
+            self._write_data('', flush=True)
+        self.name = self._hash.hexdigest()
+        # write indices
+        # XXX: should rename each index too rather than just uploading blind
+        # under the chosen name.

Well, you're using put_file which I think does upload and then rename.  Is more
than that needed?

+        self.index_sizes = [None, None, None, None]
+        self._write_index('revision', self.revision_index, 'revision')
+        self._write_index('inventory', self.inventory_index, 'inventory')
+        self._write_index('text', self.text_index, 'file texts')
+        self._write_index('signature', self.signature_index,
+            'revision signatures')
+        self.write_stream.close()
+        # Note that this will clobber an existing pack with the same name,
+        # without checking for hash collisions. While this is undesirable this
+        # is something that can be rectified in a subsequent release. One way
+        # to rectify it may be to leave the pack at the original name, writing
+        # its pack-names entry as something like 'HASH: index-sizes
+        # temporary-name'. Allocate that and check for collisions, if it is
+        # collision free then rename it into place. If clients know this scheme
+        # they can handle missing-file errors by:
+        #  - try for HASH.pack
+        #  - try for temporary-name
+        #  - refresh the pack-list to see if the pack is now absent
+        self.upload_transport.rename(self.random_name,
+                '../packs/' + self.name + '.pack')
+        self._state = 'finished'
+
+    def make_index(self, index_type):
+        """Construct a GraphIndex object for this packs index 'index_type'."""
+        setattr(self, index_type + '_index',
+            GraphIndex(self.index_transport,
+                self.index_name(index_type, self.name),
+                self.index_sizes[self.index_offset(index_type)]))

Would this be better called _load_index?  It does more than just make one.

Rather than setattr tricks, why not have

  self.indexes[index_type] = ....

not much longer and simplifies some other code too.

+
+    def index_name(self, index_type, name):
+        """Get the disk name of an index type for pack name 'name'."""
+        return name + NewPack.indices[index_type][0]
+
+    def index_offset(self, index_type):
+        """Get the position in a index_size array for a given index type."""
+        return NewPack.indices[index_type][1]

That docstring doesn't seem very helpful.

+
+    def set_write_cache_size(self, size):
+        self._cache_limit = size
+
+    def _write_index(self, index_type, index, label):
+        """Write out an index.
+
+        :param index: The index object to serialise.
+        :param index_offset: Where in self.index_sizes to remember this.
+        :param name_getter: What to use to get the name of the index on disk.
+        :param label: What label to give the index e.g. 'revision'.
+        """
+        index_name = self.index_name(index_type, self.name)
+        self.index_sizes[self.index_offset(index_type)] = \
+            self.index_transport.put_file(index_name, index.finish())
+        if 'fetch' in debug.debug_flags:
+            # XXX: size might be interesting?
+            mutter('%s: create_pack: wrote %s index: %s%s t+%6.3fs',
+                time.ctime(), label, self.upload_transport.base,
+                self.random_name, time.time() - self.start_time)
+        # As we have no current protection against erroneous additional
+        # insertions, load the index from disk on further use. We should alter
+        # the index layer to make it's finish() error if add_node is
+        # subsequently used. RBC
+        self.make_index(index_type)

'its'

As 'label' is only used in the trace message I'd suggest it's simpler to just
omit the parameter and use 'index_type' instead.  Also the docstring is out of
date with the parameters, which I suppose comes from an earlier version that
necessitated label.

In all these places, -Dfetch does not seem like the appropriate flag; why
not -Dpack or -Dpackrepo?

+
+
+class AggregateIndex(object):
+    """An aggregated index for the RepositoryPackCollection.
+
+    AggregateIndex is reponsible for managing the PackAccess object,
+    Index-To-Pack mapping, and all indices list for a specific type of index
+    such as 'revision index'.
+    """

Can you explain more /why/ not /how/?


+
+    def __init__(self):
+        """Create an AggregateIndex."""
+        self.index_to_pack = {}
+        self.combined_index = CombinedGraphIndex([])
+        self.knit_access = _PackAccess(self.index_to_pack)

Comment that combined_index is the combined index of this type(?)
across all packs.

Should add_callback be set to None here, and documented?  Rather than
generically "add_callback" the name could say more what it does.
However I'm not sure a bound method reference is really the best thing to
use here: the callers seem like they would be happier getting a reference
to the writable index, rather than just the add callback.

+    def add_writable_index(self, index, pack):
+        """Add an index which is able to have data added to it.
+
+        :param index: An index from the pack parameter.
+        :param pack: A Pack instance.
+        """

doc that there can only be one at a time.

+        assert self.add_callback is None
+        # allow writing: queue writes to a new index
+        self.add_index(index, pack)
+        # Updates the index to packs mapping as a side effect,
+        self.knit_access.set_writer(pack._writer, index, pack.access_tuple())
+        self.add_callback = index.add_nodes
+
+    def clear(self):
+        """Reset all the aggregate data to nothing."""
+        self.knit_access.set_writer(None, None, (None, None))
+        self.index_to_pack.clear()
+        del self.combined_index._indices[:]
+        self.add_callback = None
+
+    def remove_index(self, index, pack):
+        """Remove index from the indices used to answer queries.
+
+        :param index: An index from the pack parameter.
+        :param pack: A Pack instance.
+        """
+        del self.index_to_pack[index]
+        self.combined_index._indices.remove(index)
+        if (self.add_callback is not None and
+            getattr(index, 'add_nodes', None) == self.add_callback):
+            self.add_callback = None
+            self.knit_access.set_writer(None, None, (None, None))

So this for example could be just self.writing_index and avoid the getattr, etc.

+
+
+class RepositoryPackCollection(object):
+    """Management of packs within a repository."""
+
+    def __init__(self, repo, transport, index_transport, upload_transport,
+                 pack_transport):
+        """Create a new RepositoryPackCollection.
+
+        :param transport: Addresses the repository base directory
+            (typically .bzr/repository/).
+        :param index_transport: Addresses the directory containing indexes.
+        :param upload_transport: Addresses the directory into which
packs are written
+            while they're being created.
+        :param pack_transport: Addresses the directory of existing
complete packs.
+        """
+        self.repo = repo
+        self.transport = transport
+        self._index_transport = index_transport
+        self._upload_transport = upload_transport
+        self._pack_transport = pack_transport
+        self._suffix_offsets = {'.rix':0, '.iix':1, '.tix':2, '.six':3}

This seems duplicative of the list of indexes above, and my same comment about
using just a dictionary would apply.

+        self.packs = []
+        # name:Pack mapping
+        self._packs = {}

having .packs and ._packs as different variables is poor.

+        # the previous pack-names content
+        self._packs_at_load = None

more docs - this is the packs that existed when this object was created - when
we first read the pack names list?

+        # when a pack is being created by this object, the state of that pack.
+        self._new_pack = None
+        # aggregated revision index data
+        self.revision_index = AggregateIndex()
+        self.inventory_index = AggregateIndex()
+        self.text_index = AggregateIndex()
+        self.signature_index = AggregateIndex()
+
+    def add_pack_to_memory(self, pack):
+        """Make a Pack object available to the repository to satisfy queries.
+
+        :param pack: A Pack object.
+        """
+        self.packs.append(pack)
+        assert pack.name not in self._packs
+        self._packs[pack.name] = pack
+        self.revision_index.add_index(pack.revision_index, pack)
+        self.inventory_index.add_index(pack.inventory_index, pack)
+        self.text_index.add_index(pack.text_index, pack)
+        self.signature_index.add_index(pack.signature_index, pack)

Precondition assertions should come first.

+
+    def _add_text_to_weave(self, file_id, revision_id, new_lines, parents,
+        nostore_sha, random_revid):
+        file_id_index = GraphIndexPrefixAdapter(
+            self.text_index.combined_index,
+            (file_id, ), 1,
+            add_nodes_callback=self.text_index.add_callback)
+        self.repo._text_knit._index._graph_index = file_id_index
+        self.repo._text_knit._index._add_callback = file_id_index.add_nodes
+        return self.repo._text_knit.add_lines_with_ghosts(
+            revision_id, parents, new_lines, nostore_sha=nostore_sha,
+            random_id=random_revid, check_content=False)[0:2]
+
+    def all_packs(self):
+        """Return a list of all the Pack objects this repository has.
+
+        Note that an in-progress pack being created is not returned.
+
+        :return: A list of Pack objects for all the packs in the repository.
+        """
+        result = []
+        for name in self.names():
+            result.append(self.get_pack_by_name(name))
+        return result
+
+    def autopack(self):
+        """Pack the pack collection incrementally.
+
+        This will not attempt global reorganisation or recompression,
+        rather it will just ensure that the total number of packs does
+        not grow without bound. It uses the _max_pack_count method to
+        determine if autopacking is needed, and the pack_distribution
+        method to determine the number of revisions in each pack.
+
+        If autopacking takes place then the packs name collection will have
+        been flushed to disk - packing requires updating the name collection
+        in synchronisation with certain steps. Otherwise the names collection
+        is not flushed.
+
+        :return: True if packing took place.
+        """
+        # XXX: Should not be needed when the management of indices is sane.
+        total_revisions = self.revision_index.combined_index.key_count()
+        total_packs = len(self._names)
+        if self._max_pack_count(total_revisions) >= total_packs:
+            return False
+        # XXX: the following may want to be a class, to pack with a given
+        # policy.
+        mutter('Auto-packing repository %s, which has %d pack files, '
+            'containing %d revisions into %d packs.', self, total_packs,
+            total_revisions, self._max_pack_count(total_revisions))
+        # determine which packs need changing
+        pack_distribution = self.pack_distribution(total_revisions)
+        existing_packs = []
+        for pack in self.all_packs():
+            revision_count = pack.get_revision_count()
+            if revision_count == 0:
+                # revision less packs are not generated by normal operation,
+                # only by operations like sign-my-commits, and thus will not
+                # tend to grow rapdily or without bound like commit containing
+                # packs do - leave them alone as packing them really should
+                # group their data with the relevant commit, and that may
+                # involve rewriting ancient history - which autopack tries to
+                # avoid. Alternatively we could not group the data but treat
+                # each of these as having a single revision, and thus add
+                # one revision for each to the total revision count, to get
+                # a matching distribution.
+                continue
+            existing_packs.append((revision_count, pack))
+        pack_operations = self.plan_autopack_combinations(
+            existing_packs, pack_distribution)
+        self._execute_pack_operations(pack_operations)
+        return True

I guess this should return False if no pack_operations were planned?

+
+    def create_pack_from_packs(self, packs, suffix, revision_ids=None):
+        """Create a new pack by reading data from other packs.
+
+        This does little more than a bulk copy of data. One key difference
+        is that data with the same item key across multiple packs is elided
+        from the output. The new pack is written into the current pack store
+        along with its indices, and the name added to the pack names. The
+        source packs are not altered and are not required to be in the current
+        pack collection.
+
+        :param packs: An iterable of Packs to combine.
+        :param revision_ids: Either None, to copy all data, or a list
+            of revision_ids to limit the copied data to the data they
+            introduced.
+        :return: A Pack object, or None if nothing was copied.
+        """
+        # open a pack - using the same name as the last temporary file
+        # - which has already been flushed, so its safe.
+        # XXX: - duplicate code warning with start_write_group; fix before
+        #      considering 'done'.
+        if self._new_pack is not None:
+            raise errors.BzrError('call to create_pack_from_packs while '
+                'another pack is being written.')
+        if revision_ids is not None and len(revision_ids) == 0:
+            # silly fetch request.
+            return None
+        new_pack = NewPack(self._upload_transport, self._index_transport,
+            self._pack_transport, upload_suffix=suffix)
+        # buffer data - we won't be reading-back during the pack creation and
+        # this makes a significant difference on sftp pushes.
+        new_pack.set_write_cache_size(1024*1024)
+        if 'fetch' in debug.debug_flags:
+            plain_pack_list = ['%s%s' % (a_pack.pack_transport.base,
a_pack.name)
+                for a_pack in packs]
+            if revision_ids is not None:
+                rev_count = len(revision_ids)
+            else:
+                rev_count = 'all'
+            mutter('%s: create_pack: creating pack from source packs: '
+                '%s%s %s revisions wanted %s t=0',
+                time.ctime(), self._upload_transport.base,
new_pack.random_name,
+                plain_pack_list, rev_count)
+        # select revisions
+        if revision_ids:
+            revision_keys = [(revision_id,) for revision_id in revision_ids]
+        else:
+            revision_keys = None
+
+        # select revision keys
+        revision_index_map = self._packs_list_to_pack_map_and_index_list(
+            packs, 'revision_index')[0]
+        revision_nodes = self._index_contents(revision_index_map,
revision_keys)
+        # copy revision keys and adjust values
+        list(self._copy_nodes_graph(revision_nodes, revision_index_map,
+            new_pack._writer, new_pack.revision_index))
+        if 'fetch' in debug.debug_flags:
+            mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
+                time.ctime(), self._upload_transport.base,
new_pack.random_name,
+                new_pack.revision_index.key_count(),
+                time.time() - new_pack.start_time)
+        # select inventory keys
+        inv_keys = revision_keys # currently the same keyspace, and note that
+        # querying for keys here could introduce a bug where an inventory item
+        # is missed, so do not change it to query separately without cross
+        # checking like the text key check below.
+        inventory_index_map = self._packs_list_to_pack_map_and_index_list(
+            packs, 'inventory_index')[0]
+        inv_nodes = self._index_contents(inventory_index_map, inv_keys)
+        # copy inventory keys and adjust values
+        # XXX: Should be a helper function to allow different inv
representation
+        # at this point.
+        inv_lines = self._copy_nodes_graph(inv_nodes, inventory_index_map,
+            new_pack._writer, new_pack.inventory_index, output_lines=True)
+        if revision_ids:
+            fileid_revisions =
self.repo._find_file_ids_from_xml_inventory_lines(
+                inv_lines, revision_ids)
+            text_filter = []
+            for fileid, file_revids in fileid_revisions.iteritems():
+                text_filter.extend(
+                    [(fileid, file_revid) for file_revid in file_revids])
+        else:
+            # eat the iterator to cause it to execute.

  ... which as a side effect copies the inventory data

+            list(inv_lines)
+            text_filter = None

The mostly-repeated code for copying each type of object could be split out.


+        if 'fetch' in debug.debug_flags:
+            mutter('%s: create_pack: inventories copied: %s%s %d
items t+%6.3fs',
+                time.ctime(), self._upload_transport.base,
new_pack.random_name,
+                new_pack.inventory_index.key_count(),
+                time.time() - new_pack.start_time)
+        # select text keys
+        text_index_map = self._packs_list_to_pack_map_and_index_list(
+            packs, 'text_index')[0]
+        text_nodes = self._index_contents(text_index_map, text_filter)
+        if text_filter is not None:
+            # We could return the keys copied as part of the return value from
+            # _copy_nodes_graph but this doesn't work all that well with the
+            # need to get line output too, so we check separately, and as we're
+            # going to buffer everything anyway, we check beforehand, which
+            # saves reading knit data over the wire when we know there are
+            # mising records.
+            text_nodes = set(text_nodes)
+            present_text_keys = set(_node[1] for _node in text_nodes)
+            missing_text_keys = set(text_filter) - present_text_keys
+            if missing_text_keys:
+                # TODO: raise a specific error that can handle many missing
+                # keys.
+                a_missing_key = missing_text_keys.pop()
+                raise errors.RevisionNotPresent(a_missing_key[1],
+                    a_missing_key[0])
+        # copy text keys and adjust values
+        list(self._copy_nodes_graph(text_nodes, text_index_map,
+            new_pack._writer, new_pack.text_index))
+        if 'fetch' in debug.debug_flags:
+            mutter('%s: create_pack: file texts copied: %s%s %d items
t+%6.3fs',
+                time.ctime(), self._upload_transport.base,
new_pack.random_name,
+                new_pack.text_index.key_count(),
+                time.time() - new_pack.start_time)
+        # select signature keys
+        signature_filter = revision_keys # same keyspace
+        signature_index_map = self._packs_list_to_pack_map_and_index_list(
+            packs, 'signature_index')[0]
+        signature_nodes = self._index_contents(signature_index_map,
+            signature_filter)
+        # copy signature keys and adjust values
+        self._copy_nodes(signature_nodes, signature_index_map,
new_pack._writer,
+            new_pack.signature_index)
+        if 'fetch' in debug.debug_flags:
+            mutter('%s: create_pack: revision signatures copied: %s%s
%d items t+%6.3fs',
+                time.ctime(), self._upload_transport.base,
new_pack.random_name,
+                new_pack.signature_index.key_count(),
+                time.time() - new_pack.start_time)
+        if not new_pack.data_inserted():
+            new_pack.abort()
+            return None
+        new_pack.finish()
+        self.allocate(new_pack)
+        if 'fetch' in debug.debug_flags:
+            # XXX: size might be interesting?
+            mutter('%s: create_pack: pack renamed into place:
%s%s->%s%s t+%6.3fs',
+                time.ctime(), self._upload_transport.base,
new_pack.random_name,
+                new_pack.pack_transport, new_pack.name,
+                time.time() - new_pack.start_time)
+        if 'fetch' in debug.debug_flags:
+            # XXX: size might be interesting?
+            mutter('%s: create_pack: finished: %s%s t+%6.3fs',
+                time.ctime(), self._upload_transport.base,
new_pack.random_name,
+                time.time() - new_pack.start_time)
+        return new_pack

+    def _parse_index_sizes(self, value):
+        """Parse a string of index sizes."""
+        return tuple([int(digits) for digits in value.split(' ')])

To change index_sizes in memory to a dict, this would need to be something like

  l = map(int, value.split(' '))
  r = {}
  for (index_type, (extension, position)) in NewPack.indicies.items():
    r[index_type] = l[position]
  return r

which is not too complex and makes the in memory representation better.

+    def remove_pack_from_memory(self, pack):
+        """Remove pack from the packs accessed by this repository.
+
+        Only affects memory state, until self._save_pack_names() is invoked.
+        """
+        self._names.pop(pack.name)
+        self._packs.pop(pack.name)
+        self._remove_pack_indices(pack)

Should be private?

Just del would be clearer than pop as you don't want the result.

+    def _make_index_map(self, index_suffix):
+        """Return information on existing indexes.
+
+        :param suffix: Index suffix added to pack name.
+
+        :returns: (pack_map, indices) where indices is a list of GraphIndex
+        objects, and pack_map is a mapping from those objects to the
+        pack tuple they describe.
+        """
+        # TODO: stop using this; it creates new indices unnecessarily.
+        self.ensure_loaded()
+        suffix_map = {'.rix':'revision_index',
+            '.six':'signature_index',
+            '.iix':'inventory_index',
+            '.tix':'text_index',
+        }

The pack definitions are duplicated (triplicated?) again.

+    def release_names(self):
+        """Release the mutex around the pack-names index."""
+        self.repo.control_files.unlock()

Should be unlock_names to match.

+    def _start_write_group(self):
+        # Do not permit preparation for writing if we're not in a 'write lock'.
+        if not self.repo.is_write_locked():
+            raise errors.NotWriteLocked(self)
+        self._new_pack = NewPack(self._upload_transport, self._index_transport,
+            self._pack_transport, upload_suffix='.pack')
+        # allow writing: queue writes to a new index
+        self.revision_index.add_writable_index(self._new_pack.revision_index,
+            self._new_pack)
+        self.inventory_index.add_writable_index(self._new_pack.inventory_index,
+            self._new_pack)
+        self.text_index.add_writable_index(self._new_pack.text_index,
+            self._new_pack)
+        self.signature_index.add_writable_index(self._new_pack.signature_index,
+            self._new_pack)
+
+        # reused revision and signature knits may need updating
+        if self.repo._revision_knit is not None:
+            self.repo._revision_knit._index._add_callback = \
+                self.revision_index.add_callback
+        if self.repo._signature_knit is not None:
+            self.repo._signature_knit._index._add_callback = \
+                self.signature_index.add_callback
+        # create a reused knit object for text addition in commit.
+        self.repo._text_knit = self.repo.weave_store.get_weave_or_empty(
+            'all-texts', None)

I don't understand this.  Why only those two knits?

+class GraphKnitRevisionStore(KnitRevisionStore):
+    """An object to adapt access from RevisionStore's to use GraphKnits.
+
+    This should not live through to production: by production time we should
+    have fully integrated the new indexing and have new data for the
+    repository classes; also we may choose not to do a Knit1 compatible
+    new repository, just a Knit3 one. If neither of these happen, this
+    should definately be cleaned up before merging.

Is this true?  I'm not sure what 'new' means here.

+
+    This class works by replacing the original RevisionStore.
+    We need to do this because the GraphKnitRevisionStore is less
+    isolated in its layering - it uses services from the repo.
+    """
+
+    def __init__(self, repo, transport, revisionstore):
+        """Create a GraphKnitRevisionStore on repo with revisionstore.
+
+        This will store its state in the Repository, use the
+        indices FileNames to provide a KnitGraphIndex,
+        and at the end of transactions write new indices.
+        """

FileNames?

+class GraphKnitTextStore(VersionedFileStore):
+    """An object to adapt access from VersionedFileStore's to use GraphKnits.

no apostrophe

The docstring might be clearer as

  Presents a TextStore abstraction on top of packs.

if that's correct.

+
+    This should not live through to production: by production time we should
+    have fully integrated the new indexing and have new data for the
+    repository classes; also we may choose not to do a Knit1 compatible
+    new repository, just a Knit3 one. If neither of these happen, this
+    should definately be cleaned up before merging.


again

+    get_weave = get_weave_or_empty
+
+    def __iter__(self):
+        """Generate a list of the fileids inserted, for use by check."""

/a list of file_ids in this repository/

+        self.repo._packs.ensure_loaded()
+        ids = set()
+        for index, key, value, refs in \
+            self.repo._packs.text_index.combined_index.iter_all_entries():
+            ids.add(key[0])
+        return iter(ids)
+
+
+class InventoryKnitThunk(object):
+    """An object to manage thunking get_inventory_weave to pack based knits."""
+
+    def __init__(self, repo, transport):
+        """Create an InventoryKnitThunk for repo at transport.
+
+        This will store its state in the Repository, use the
+        indices FileNames to provide a KnitGraphIndex,
+        and at the end of transactions write a new index..
+        """
+        self.repo = repo
+        self.transport = transport
+
+    def get_weave(self):
+        """Get a 'Knit' that contains inventory data."""
+        self.repo._packs.ensure_loaded()
+        add_callback = self.repo._packs.inventory_index.add_callback
+        # setup knit specific objects
+        knit_index = KnitGraphIndex(
+            self.repo._packs.inventory_index.combined_index,
+            add_callback=add_callback, deltas=True, parents=True)
+        return knit.KnitVersionedFile(
+            'inventory', self.transport.clone('..'),
+            self.repo.control_files._file_mode,
+            create=False, access_mode=self.repo._access_mode(),
+            index=knit_index, delta=True, factory=knit.KnitPlainFactory(),
+            access_method=self.repo._packs.inventory_index.knit_access)
+
+
+class GraphKnitRepository(KnitRepository):
+    """Experimental graph-knit using repository."""

(comment) I've probably done this too, but it's better not to say
'experimental' in docstrings or class names because they can hang around after
the code's considered mature.

=== modified file 'bzrlib/bzrdir.py'
--- bzrlib/bzrdir.py	2007-09-21 07:29:37 +0000
+++ bzrlib/bzrdir.py	2007-10-03 06:37:01 +0000
@@ -2478,4 +2478,24 @@
     tree_format='bzrlib.workingtree.WorkingTreeFormat4',
     hidden=True,
     )
+format_registry.register_metadir('experimental',
+    'bzrlib.repofmt.pack_repo.RepositoryFormatGraphKnit1',
+    help='New in XXX: Experimental format with data compatible with dirstate '
+        'format repositories. Cannot be read except with bzr.dev. '
+        'WARNING: This format is unstable and data in it will not be
upgradable'
+        ' to release formats of bzr.',
+    branch_format='bzrlib.branch.BzrBranchFormat6',
+    tree_format='bzrlib.workingtree.WorkingTreeFormat4',
+    hidden=True,
+    )
+format_registry.register_metadir('experimental-subtree',
+    'bzrlib.repofmt.pack_repo.RepositoryFormatGraphKnit3',
+    help='New in XXX: Experimental format with data compatible with '
+        'dirstate-with-subtree format repositories. Cannot be read except with'
+        ' bzr.dev. WARNING: This format is unstable and data in it will not be'
+        ' upgradable to release formats of bzr.',
+    branch_format='bzrlib.branch.BzrBranchFormat6',
+    tree_format='bzrlib.workingtree.WorkingTreeFormat4',
+    hidden=True,
+    )
 format_registry.set_default('dirstate-tags')


(comment) we really need a better exposure in the ui of the different
components; a 'dirstate format repository' meaning a 'repository in the
versioned used when dirstate came out' is confusing.

I wonder then if rather than this eventually being called --format=packs
it should be just 0.92 and 0.92-subtree.

Obviously the text needs to be updated when this is finally
merged, as well as getting new names.




=== modified file 'bzrlib/index.py'
--- bzrlib/index.py	2007-10-15 07:56:04 +0000
+++ bzrlib/index.py	2007-10-17 09:39:41 +0000
@@ -267,6 +267,17 @@
         self._nodes_by_key = None
         self._size = size

+    def __eq__(self, other):
+        """Equal when self and otherwere created with the same parameters."""
+        return (
+            type(self) == type(other) and
+            self._transport == other._transport and
+            self._name == other._name and
+            self._size == other._size)
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
     def _buffer_all(self):
         """Buffer all the index data.

I think providing __eq__ on objects that are not value objects is not a good
idea, because it's unclear what is or isn't compared.  This is not checking all
the members of the class, so I would prefer instead renaming it to e.g.
has_same_location() as we did for Repository(?).

Also as far as I can see this is not tested.




=== modified file 'bzrlib/knit.py'
--- bzrlib/knit.py	2007-10-12 05:26:46 +0000
+++ bzrlib/knit.py	2007-10-17 09:39:41 +0000
@@ -1950,7 +1950,8 @@

     def set_writer(self, writer, index, (transport, packname)):
         """Set a writer to use for adding data."""
-        self.indices[index] = (transport, packname)
+        if index is not None:
+            self.indices[index] = (transport, packname)
         self.container_writer = writer
         self.write_index = index


It looks like this is being done so that you can pass None, None, (None, None)
to disable writing to this object.  If so, you should document it.
But maybe it'd
be clearer and safer to add a different method remove_writer() instead.


=== modified file 'bzrlib/reconcile.py'

+
+class PackReconciler(RepoReconciler):
+    """Reconciler that reconciles a pack based repository.
+
+    Garbage inventories do not affect ancestry queries, and removal is
+    considerably more expensive as there is no separate versioned file for
+    them, so they are not cleaned. In short it is currently a no-op.
+
+    In future this may be a good place to hook in annotation cache checking,
+    index recreation etc.
+
+    XXX: The index corruption that _fix_text_parents performs is needed for
+    packs, but not yet implemented. The basic approach is to:
+     - lock the names list
+     - perform a customised pack() that regenerates data as needed
+     - unlock the names list
+    """

You should add a bug number here. s/corruption/inconsistency/ because the index
is fine in itself.
=== modified file 'bzrlib/repository.py'
--- bzrlib/repository.py	2007-10-16 02:42:33 +0000
+++ bzrlib/repository.py	2007-10-17 09:39:41 +0000
@@ -501,6 +501,7 @@

         returns the sha1 of the serialized inventory.
         """
+        assert self.is_in_write_group()
         _mod_revision.check_not_reserved_id(revision_id)
         assert inv.revision_id is None or inv.revision_id == revision_id, \
             "Mismatch between inventory revision" \
@@ -765,6 +766,11 @@
         raise NotImplementedError(self.get_data_stream)

     def insert_data_stream(self, stream):
+        """XXX What does this really do?
+
+        Is it a substitute for fetch?
+        Should it manage its own write group ?
+        """
         for item_key, bytes in stream:
             if item_key[0] == 'file':
                 (file_id,) = item_key[1:]

I think XXX: is better in a comment, not in the docstring as this is not very
helpful to people reading about the api through pydoc.

This particular method was added I believe by Andrew, so you should ask him to
document it (or file a bug assigned to him.)

+    def fileids_altered_by_revision_ids(self, revision_ids):
+        """Find the file ids and versions affected by revisions.
+
+        :param revisions: an iterable containing revision ids.
+        :return: a dictionary mapping altered file-ids to an iterable of
+        revision_ids. Each altered file-ids has the exact revision_ids that
+        altered it listed explicitly.
+        """
+        assert self._serializer.support_altered_by_hack, \
+            ("fileids_altered_by_revision_ids only supported for branches "
+             "which store inventory as unnested xml, not on %r" % self)
+        selected_revision_ids = set(revision_ids)
+        w = self.get_inventory_weave()

(query) And not only that, but it also needs them to be able to do
iter_lines_added_or_present_in_versions, which is not supported by
the base VersionedFile class.  I guess this is moot because
the current code works for all formats and you get a decent error
message anyhow.

+    @needs_write_lock
+    def fetch(self, revision_id=None, pb=None):
+        """See InterRepository.fetch()."""
+        mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
+               self.source, self.source._format, self.target,
self.target._format)
+        # TODO: jam 20070210 This should be an assert, not a translate
+        revision_id = osutils.safe_revision_id(revision_id)

The safe_revision_id calls have now been removed; you should remove
this one too.

+    @needs_read_lock
+    def missing_revision_ids(self, revision_id=None):
+        """See InterRepository.missing_revision_ids()."""
+        if revision_id is not None:
+            source_ids = self.source.get_ancestry(revision_id)
+            assert source_ids[0] is None
+            source_ids.pop(0)
+        else:
+            source_ids = self.source.all_revision_ids()
+        source_ids_set = set(source_ids)
+        # source_ids is the worst possible case we may need to pull.
+        # now we want to filter source_ids against what we actually
+        # have in target, but don't try to check for existence where we know
+        # we do not have a revision as that would be pointless.
+        target_ids = set(self.target.all_revision_ids())
+        actually_present_revisions = target_ids.intersection(source_ids_set)
+        required_revisions =
source_ids_set.difference(actually_present_revisions)
+        required_topo_revisions = [rev_id for rev_id in source_ids if
rev_id in required_revisions]
+        return required_topo_revisions
+
+

I think you can avoid the intermediate sets and just say

   return [r for r in source_ids if (r not in target_ids)]


 class InterModel1and2(InterRepository):

     @classmethod
@@ -2306,10 +2443,15 @@
         """Be compatible with Knit1 source and Knit3 target"""
         from bzrlib.repofmt.knitrepo import RepositoryFormatKnit3
         try:
-            from bzrlib.repofmt.knitrepo import RepositoryFormatKnit1, \
-                    RepositoryFormatKnit3
-            return (isinstance(source._format, (RepositoryFormatKnit1)) and
-                    isinstance(target._format, (RepositoryFormatKnit3)))
+            from bzrlib.repofmt.knitrepo import (RepositoryFormatKnit1,
+                RepositoryFormatKnit3)
+            from bzrlib.repofmt.pack_repo import (RepositoryFormatGraphKnit1,
+                RepositoryFormatGraphKnit3)
+            return (isinstance(source._format, RepositoryFormatKnit1) and
+                    isinstance(target._format, RepositoryFormatKnit3) or
+                    isinstance(source._format,
(RepositoryFormatGraphKnit1)) and
+                    isinstance(target._format, (RepositoryFormatGraphKnit3))
+                    )
         except AttributeError:
             return False

To answer Ian's question, I think this code is here because the InterModel1and2
fetcher does the insertion of root ids when moving from one format to another?
A comment might help.

So, what will happen if you try to fetch from a knit1 into a pack3?

This doesn't seem to be directly tested and maybe it should be.

=== modified file 'bzrlib/tests/repository_implementations/test_repository.py'
--- bzrlib/tests/repository_implementations/test_repository.py	2007-10-12
08:18:54 +0000
+++ bzrlib/tests/repository_implementations/test_repository.py	2007-10-17
09:39:41 +0000
@@ -61,6 +61,8 @@
         bzrdirb = self.make_bzrdir('b')
         repo_b = tree_a.branch.repository.clone(bzrdirb)
         tree_b = repo_b.revision_tree('rev1')
+        tree_b.lock_read()
+        self.addCleanup(tree_b.unlock)
         tree_b.get_file_text('file1')
         rev1 = repo_b.get_revision('rev1')

@@ -421,7 +423,20 @@
             self.assertContainsRe(str(e), 'get_data_stream')
             raise TestSkipped('This format does not support streaming.')

-        dest_repo.insert_data_stream(stream)
+        dest_repo.lock_write()
+        try:
+            dest_repo.start_write_group()
+            try:
+                dest_repo.insert_data_stream(stream)
+            except:
+                dest_repo.abort_write_group()
+                raise
+            else:
+                dest_repo.commit_write_group()
+        finally:
+            dest_repo.unlock()
+        # reopen to be sure it was added.
+        dest_repo = dest_repo.bzrdir.open_repository()
         self.assertTrue(dest_repo.has_revision('rev_id'))

(comment) The amount of clutter to do these in test cases is
unfortunate, compared
to maybe

  thing.lock_and_start_write()
  try:
    ...
    thing.commit_unlock()
  finally:
    thing.abort_if_unfinished()



+    def test_concurrent_writers_merge_new_packs(self):
+        format = self.get_format()
+        self.make_repository('.', shared=True, format=format)
+        r1 = repository.Repository.open('.')
+        r2 = repository.Repository.open('.')
+        r1.lock_write()
+        try:
+            # access enough data to load the names list
+            list(r1.all_revision_ids())
+            r2.lock_write()
+            try:
+                # access enough data to load the names list
+                list(r2.all_revision_ids())
+                r1.start_write_group()
+                try:
+                    r2.start_write_group()
+                    try:
+                        self._add_text(r1, 'fileidr1')
+                        self._add_text(r2, 'fileidr2')
+                    except:
+                        r2.abort_write_group()
+                        raise
+                except:
+                    r1.abort_write_group()
+                    raise
+                # both r1 and r2 have open write groups with data in them
+                # created while the other's write group was open.
+                # Commit both which requires a merge to the pack-names.
+                try:
+                    r1.commit_write_group()
+                except:
+                    r2.abort_write_group()
+                    raise
+                r2.commit_write_group()
+                # tell r1 to reload from disk
+                r1._packs.reset()
+                # Now both repositories should now about both names
+                r1._packs.ensure_loaded()
+                r2._packs.ensure_loaded()
+                self.assertEqual(r1._packs.names(), r2._packs.names())
+                self.assertEqual(2, len(r1._packs.names()))
+            finally:
+                r1.unlock()
+        finally:
+            r2.unlock()

The finally blocks are mismatched.  It might be simpler just to do
add_cleanup().

(comment) Maybe unlock() should implicitly abort a pending write group (does it
error at the moment?)

+
+    def test_concurrent_writer_second_preserves_dropping_a_pack(self):
+        format = self.get_format()
+        self.make_repository('.', shared=True, format=format)
+        r1 = repository.Repository.open('.')
+        r2 = repository.Repository.open('.')
+        # add a pack to drop
+        r1.lock_write()
+        try:
+            r1.start_write_group()
+            try:
+                self._add_text(r1, 'fileidr1')
+            except:
+                r1.abort_write_group()
+                raise
+            else:
+                r1.commit_write_group()
+            r1._packs.ensure_loaded()
+            name_to_drop = r1._packs.all_packs()[0].name
+        finally:
+            r1.unlock()
+        r1.lock_write()
+        try:
+            # access enough data to load the names list
+            list(r1.all_revision_ids())
+            r2.lock_write()
+            try:
+                # access enough data to load the names list
+                list(r2.all_revision_ids())
+                r1._packs.ensure_loaded()
+                try:
+                    r2.start_write_group()
+                    try:
+                        # in r1, drop the pack
+                        r1._packs.remove_pack_from_memory(
+                            r1._packs.get_pack_by_name(name_to_drop))
+                        # in r2, add a pack
+                        self._add_text(r2, 'fileidr2')
+                    except:
+                        r2.abort_write_group()
+                        raise
+                except:
+                    r1._packs.reset()
+                    raise
+                # r1 has a changed names list, and r2 an open write groups with
+                # changes.
+                # save r1, and then commit the r2 write group, which requires a
+                # merge to the pack-names, which should not reinstate
+                # name_to_drop
+                try:
+                    r1._packs._save_pack_names()
+                    r1._packs.reset()
+                except:
+                    r2.abort_write_group()
+                    raise
+                try:
+                    r2.commit_write_group()
+                except:
+                    r2.abort_write_group()
+                    raise
+                # Now both repositories should now about just one name.
+                r1._packs.ensure_loaded()
+                r2._packs.ensure_loaded()
+                self.assertEqual(r1._packs.names(), r2._packs.names())
+                self.assertEqual(1, len(r1._packs.names()))
+                self.assertFalse(name_to_drop in r1._packs.names())
+            finally:
+                r1.unlock()
+        finally:
+            r2.unlock()

Wrong order here too.

+class TestPack(TestCaseWithTransport):
+    """Tests for the Pack object."""
+
+    def assertCurrentlyEqual(self, left, right):
+        self.assertTrue(left == right)
+        self.assertTrue(right == left)
+        self.assertFalse(left != right)
+        self.assertFalse(right != left)
+
+    def assertCurrentlyNotEqual(self, left, right):
+        self.assertFalse(left == right)
+        self.assertFalse(right == left)
+        self.assertTrue(left != right)
+        self.assertTrue(right != left)

'Currently' is an odd name, maybe 'thoroughly'?  But really, are these
comparisons used other than in this test?

+
+    def test___eq____ne__(self):
+        left = pack_repo.ExistingPack('', '', '', '', '', '')
+        right = pack_repo.ExistingPack('', '', '', '', '', '')
+        self.assertCurrentlyEqual(left, right)
+        # change all attributes and ensure equality changes as we do.
+        left.revision_index = 'a'
+        self.assertCurrentlyNotEqual(left, right)
+        right.revision_index = 'a'
+        self.assertCurrentlyEqual(left, right)
+        left.inventory_index = 'a'
+        self.assertCurrentlyNotEqual(left, right)
+        right.inventory_index = 'a'
+        self.assertCurrentlyEqual(left, right)
+        left.text_index = 'a'
+        self.assertCurrentlyNotEqual(left, right)
+        right.text_index = 'a'
+        self.assertCurrentlyEqual(left, right)
+        left.signature_index = 'a'
+        self.assertCurrentlyNotEqual(left, right)
+        right.signature_index = 'a'
+        self.assertCurrentlyEqual(left, right)
+        left.name = 'a'
+        self.assertCurrentlyNotEqual(left, right)
+        right.name = 'a'
+        self.assertCurrentlyEqual(left, right)
+        left.transport = 'a'
+        self.assertCurrentlyNotEqual(left, right)
+        right.transport = 'a'
+        self.assertCurrentlyEqual(left, right)
+
+    def test_file_name(self):
+        pack = pack_repo.ExistingPack('', 'a_name', '', '', '', '')
+        self.assertEqual('a_name.pack', pack.file_name())
+
+
+class TestNewPack(TestCaseWithTransport):
+    """Tests for pack_repo.NewPack."""
+
+    def test_new_instance_attributes(self):
+        upload_transport = self.get_transport('upload')
+        pack_transport = self.get_transport('pack')
+        index_transport = self.get_transport('index')
+        upload_transport.mkdir('.')
+        pack = pack_repo.NewPack(upload_transport, index_transport,
+            pack_transport)
+        self.assertIsInstance(pack.revision_index, InMemoryGraphIndex)
+        self.assertIsInstance(pack.inventory_index, InMemoryGraphIndex)
+        self.assertIsInstance(pack._hash, type(md5.new()))
+        self.assertTrue(pack.upload_transport is upload_transport)
+        self.assertTrue(pack.index_transport is index_transport)
+        self.assertTrue(pack.pack_transport is pack_transport)
+        self.assertEqual(None, pack.index_sizes)
+        self.assertEqual(20, len(pack.random_name))
+        self.assertIsInstance(pack.random_name, str)
+        self.assertIsInstance(pack.start_time, float)



More information about the bazaar mailing list