Rev 2686: Nuke per-fileid indices for a single unified index. in http://people.ubuntu.com/~robertc/baz2.0/repository

Robert Collins robertc at robertcollins.net
Mon Jul 30 06:07:05 BST 2007


At http://people.ubuntu.com/~robertc/baz2.0/repository

------------------------------------------------------------
revno: 2686
revision-id: robertc at robertcollins.net-20070730050702-lrw04l6d2qokqlf6
parent: robertc at robertcollins.net-20070730050531-637oil76rrd60udq
committer: Robert Collins <robertc at robertcollins.net>
branch nick: repository
timestamp: Mon 2007-07-30 15:07:02 +1000
message:
  Nuke per-fileid indices for a single unified index.
modified:
  bzrlib/repofmt/knitrepo.py     knitrepo.py-20070206081537-pyy4a00xdas0j4pf-1
  bzrlib/tests/repository_implementations/test_repository.py test_repository.py-20060131092128-ad07f494f5c9d26c
  bzrlib/tests/test_repository.py test_repository.py-20060131075918-65c555b881612f4d
=== modified file 'bzrlib/repofmt/knitrepo.py'
--- a/bzrlib/repofmt/knitrepo.py	2007-07-26 04:35:56 +0000
+++ b/bzrlib/repofmt/knitrepo.py	2007-07-30 05:07:02 +0000
@@ -19,7 +19,12 @@
 from bzrlib import (
         file_names,
         )
-from bzrlib.index import InMemoryGraphIndex, GraphIndex, CombinedGraphIndex
+from bzrlib.index import (
+    InMemoryGraphIndex,
+    GraphIndex,
+    CombinedGraphIndex,
+    GraphIndexPrefixAdapter,
+    )
 from bzrlib.knit import KnitGraphIndex
 from bzrlib.store import revision
 """)
@@ -282,6 +287,37 @@
         return result
 
 
+class RepositoryDataNames(object):
+
+    def __init__(self, repo, transport):
+        self.repo = repo
+        self.transport = transport
+
+    def ensure_loaded(self):
+        if self._names is None:
+            self._names = file_names.FileNames(self.transport, 'index')
+            self._names.load()
+
+    def allocate(self):
+        return self._names.allocate()
+
+    def names(self):
+        """Provide order to the underlying names."""
+        def _cmp(x, y): return cmp(int(x), int(y))
+        return sorted(self._names.names(), cmp=_cmp, reverse=True)
+
+    def reset(self):
+        self._names = None
+
+    def save(self):
+        return self._names.save()
+
+    def setup(self):
+        # cannot add names if we're not in a 'write lock'.
+        if self.repo.control_files._lock_mode != 'w':
+            raise errors.NotWriteLocked(self)
+
+
 class GraphKnitRevisionStore(KnitRevisionStore):
     """An object to adapt access from RevisionStore's to use GraphKnits.
 
@@ -294,47 +330,31 @@
     This class works by replacing the original RevisionStore.
     We need to do this because the GraphKnitRevisionStore is less
     isolated in its layering - it uses services from the repo.
-
-    DEFECTS:
-     - unlock writes an index even on error. This is fine while we are writing
-       data to knits, but we really should not use unlock to trigger writes,
-       rather operations should finish explicitly.
     """
 
-    def __init__(self, repo, revisionstore):
+    def __init__(self, repo, transport, revisionstore):
         """Create a GraphKnitRevisionStore on repo with revisionstore.
 
         This will store its state in the Repository, use the
-        revision-indices FileNames to provide a KnitGraphIndex,
+        indices FileNames to provide a KnitGraphIndex,
         and at the end of transactions write new indices.
         """
         KnitRevisionStore.__init__(self, revisionstore.versioned_file_store)
         self.repo = repo
         self._serializer = revisionstore._serializer
-
-    def _ensure_names_loaded(self):
-        if self.repo._revision_indices is None:
-            index_transport = self.get_indices_transport()
-            self.repo._revision_indices = file_names.FileNames(
-                index_transport, 'index')
-            self.repo._revision_indices.load()
-
-    def get_indices_transport(self):
-        return self.versioned_file_store._transport.clone('indices')
+        self.transport = transport
 
     def get_revision_file(self, transaction):
         """Get the revision versioned file object."""
         if getattr(self.repo, '_revision_knit', None) is not None:
             return self.repo._revision_knit
-        index_transport = self.get_indices_transport()
         indices = []
-        self._ensure_names_loaded()
-        def _cmp(x, y): return cmp(int(x), int(y))
-        for name in sorted(self.repo._revision_indices.names(), cmp=_cmp, reverse=True):
+        self.repo._data_names.ensure_loaded()
+        for name in self.repo._data_names.names():
             # TODO: maybe this should expose size to us  to allow
             # sorting of the indices for better performance ?
             index_name = self.name_to_revision_index_name(name)
-            indices.append(GraphIndex(index_transport, index_name))
+            indices.append(GraphIndex(self.transport, index_name))
         if self.repo.is_in_write_group():
             # allow writing: queue writes to a new index
             indices.append(self.repo._revision_write_index)
@@ -345,7 +365,7 @@
         knit_index = KnitGraphIndex(self.repo._revision_all_indices,
             add_callback=add_callback)
         self.repo._revision_knit = knit.KnitVersionedFile(
-            'revisions', index_transport.clone('..'),
+            'revisions', self.transport.clone('..'),
             self.repo.control_files._file_mode,
             create=False, access_mode=self.repo.control_files._lock_mode,
             index=knit_index, delta=False, factory=knit.KnitPlainFactory())
@@ -355,15 +375,13 @@
         """Get the signature versioned file object."""
         if getattr(self.repo, '_signature_knit', None) is not None:
             return self.repo._signature_knit
-        index_transport = self.get_indices_transport()
         indices = []
-        self._ensure_names_loaded()
-        def _cmp(x, y): return cmp(int(x), int(y))
-        for name in sorted(self.repo._revision_indices.names(), cmp=_cmp, reverse=True):
+        self.repo._data_names.ensure_loaded()
+        for name in self.repo._data_names.names():
             # TODO: maybe this should expose size to us  to allow
             # sorting of the indices for better performance ?
             index_name = self.name_to_signature_index_name(name)
-            indices.append(GraphIndex(index_transport, index_name))
+            indices.append(GraphIndex(self.transport, index_name))
         if self.repo.is_in_write_group():
             # allow writing: queue writes to a new index
             indices.append(self.repo._signature_write_index)
@@ -374,48 +392,45 @@
         knit_index = KnitGraphIndex(self.repo._signature_all_indices,
             add_callback=add_callback, parents=False)
         self.repo._signature_knit = knit.KnitVersionedFile(
-            'signatures', index_transport.clone('..'),
+            'signatures', self.transport.clone('..'),
             self.repo.control_files._file_mode,
             create=False, access_mode=self.repo.control_files._lock_mode,
             index=knit_index, delta=False, factory=knit.KnitPlainFactory())
         return self.repo._signature_knit
 
-    def flush(self):
-        """Write out pending indices."""
-        data_inserted = False
+    def data_inserted(self):
         # XXX: Should we define __len__ for indices?
         if (getattr(self.repo, '_revision_write_index', None) and
             len(list(self.repo._revision_write_index.iter_all_entries()))):
-            data_inserted = True
+            return True
         if (getattr(self.repo, '_signature_write_index', None) and
             len(list(self.repo._signature_write_index.iter_all_entries()))):
-            data_inserted = True
-        if not data_inserted:
-            return
-        new_name = self.repo._revision_indices.allocate()
-        self.repo._revision_indices.save()
-        index_transport = self.get_indices_transport()
+            return True
+        return False
+
+    def flush(self, new_name):
+        """Write out pending indices."""
         # write a revision index (might be empty)
         new_index_name = self.name_to_revision_index_name(new_name)
-        index_transport.put_file(new_index_name,
+        self.transport.put_file(new_index_name,
             self.repo._revision_write_index.finish())
         self.repo._revision_write_index = None
         if self.repo._revision_all_indices is not None:
             # revisions 'knit' accessed : update it.
             self.repo._revision_all_indices.insert_index(0,
-                GraphIndex(index_transport, new_index_name))
+                GraphIndex(self.transport, new_index_name))
             # remove the write buffering index. XXX: API break
             # - clearly we need a remove_index call too.
             del self.repo._revision_all_indices._indices[-1]
         # write a signatures index (might be empty)
         new_index_name = self.name_to_signature_index_name(new_name)
-        index_transport.put_file(new_index_name,
+        self.transport.put_file(new_index_name,
             self.repo._signature_write_index.finish())
         self.repo._signature_write_index = None
         if self.repo._signature_all_indices is not None:
             # sigatures 'knit' accessed : update it.
             self.repo._signature_all_indices.insert_index(0,
-                GraphIndex(index_transport, new_index_name))
+                GraphIndex(self.transport, new_index_name))
             # remove the write buffering index. XXX: API break
             # - clearly we need a remove_index call too.
             del self.repo._signature_all_indices._indices[-1]
@@ -430,8 +445,6 @@
 
     def reset(self):
         """Clear all cached data."""
-        # the packs that exist
-        self.repo._revision_indices = None
         # cached revision data
         self.repo._revision_knit = None
         self.repo._revision_write_index = None
@@ -443,8 +456,6 @@
 
     def setup(self):
         # setup in-memory indices to accumulate data.
-        if self.repo.control_files._lock_mode != 'w':
-            raise errors.NotWriteLocked(self)
         self.repo._revision_write_index = InMemoryGraphIndex(1)
         self.repo._signature_write_index = InMemoryGraphIndex(0)
         # if knit indices have been handed out, add a mutable
@@ -457,6 +468,122 @@
             self.repo._signature_knit._index._add_callback = self.repo._signature_write_index.add_nodes
 
 
+class GraphKnitTextStore(VersionedFileStore):
+    """An object to adapt access from VersionedFileStore's to use GraphKnits.
+
+    This should not live through to production: by production time we should
+    have fully integrated the new indexing and have new data for the
+    repository classes; also we may choose not to do a Knit1 compatible
+    new repository, just a Knit3 one. If neither of these happen, this 
+    should definately be cleaned up before merging.
+
+    This class works by replacing the original VersionedFileStore.
+    We need to do this because the GraphKnitRevisionStore is less
+    isolated in its layering - it uses services from the repo and shares them
+    with all the data written in a single write group.
+    """
+
+    def __init__(self, repo, transport, weavestore):
+        """Create a GraphKnitTextStore on repo with weavestore.
+
+        This will store its state in the Repository, use the
+        indices FileNames to provide a KnitGraphIndex,
+        and at the end of transactions write new indices.
+        """
+        # don't call base class constructor - its not suitable.
+        # no transient data stored in the transaction
+        # cache.
+        self._precious = False
+        self.repo = repo
+        self.transport = transport
+        self.weavestore = weavestore
+        # XXX for check() which isn't updated yet
+        self._transport = weavestore._transport
+
+    def data_inserted(self):
+        # XXX: Should we define __len__ for indices?
+        if (getattr(self.repo, '_text_write_index', None) and
+            len(list(self.repo._text_write_index.iter_all_entries()))):
+            return True
+
+    def _ensure_all_index(self):
+        """Create the combined index for all texts."""
+        if getattr(self.repo, '_text_all_indices', None) is not None:
+            return
+        indices = []
+        self.repo._data_names.ensure_loaded()
+        for name in self.repo._data_names.names():
+            # TODO: maybe this should expose size to us  to allow
+            # sorting of the indices for better performance ?
+            index_name = self.name_to_text_index_name(name)
+            indices.append(GraphIndex(self.transport, index_name))
+        if self.repo.is_in_write_group():
+            # allow writing: queue writes to a new index
+            indices.append(self.repo._text_write_index)
+        self.repo._text_all_indices = CombinedGraphIndex(indices)
+
+    def flush(self, new_name):
+        """Write the index out to new_name."""
+        # write a revision index (might be empty)
+        new_index_name = self.name_to_text_index_name(new_name)
+        self.transport.put_file(new_index_name,
+            self.repo._text_write_index.finish())
+        self.repo._text_write_index = None
+        if self.repo._text_all_indices is not None:
+            # text 'knits' have been used, replace the mutated memory index
+            # with the new on-disk one. XXX: is this really a good idea?
+            # perhaps just keep using the memory one ?
+            self.repo._text_all_indices.insert_index(0,
+                GraphIndex(self.transport, new_index_name))
+            # remove the write buffering index. XXX: API break
+            # - clearly we need a remove_index call too.
+            del self.repo._text_all_indices._indices[-1]
+
+    def get_weave_or_empty(self, file_id, transaction):
+        """Get a 'Knit' backed by the .tix indices.
+
+        The transaction parameter is ignored.
+        """
+        self._ensure_all_index()
+        filename = self.weavestore.filename(file_id)
+        if self.repo.is_in_write_group():
+            add_callback = self.repo._text_write_index.add_nodes
+        else:
+            add_callback = None # no data-adding permitted.
+
+        file_id_index = GraphIndexPrefixAdapter(self.repo._text_all_indices,
+            (file_id, ), 1, add_nodes_callback=add_callback)
+        knit_index = KnitGraphIndex(file_id_index,
+            add_callback=file_id_index.add_nodes,
+            deltas=True, parents=True)
+        return knit.KnitVersionedFile(filename, self.weavestore._transport,
+            self.weavestore._file_mode,
+            index=knit_index,
+            **self.weavestore._versionedfile_kwargs)
+
+    get_weave = get_weave_or_empty
+
+    def name_to_text_index_name(self, name):
+        """The text index is the name + .tix."""
+        return name + '.tix'
+
+    def reset(self):
+        """Clear all cached data."""
+        # remove any accumlating index of text data
+        self.repo._text_write_index = None
+        # remove all constructed text data indices
+        self.repo._text_all_indices = None
+
+    def setup(self):
+        # setup in-memory indices to accumulate data.
+        self.repo._text_write_index = InMemoryGraphIndex(reference_lists=2,
+            key_elements=2)
+        # we require that text 'knits' be accessed from within the write 
+        # group to be able to be written to, simply because it makes this
+        # code cleaner - we don't need to track all 'open' knits and 
+        # adjust them.
+
+
 class GraphKnitRepository1(KnitRepository):
     """Experimental graph-knit using repository."""
 
@@ -464,22 +591,43 @@
                  control_store, text_store):
         KnitRepository.__init__(self, _format, a_bzrdir, control_files,
                               _revision_store, control_store, text_store)
-        self._revision_store = GraphKnitRevisionStore(self, self._revision_store)
+        index_transport = control_files._transport.clone('indices')
+        self._data_names = RepositoryDataNames(self, index_transport)
+        self._revision_store = GraphKnitRevisionStore(self, index_transport, self._revision_store)
+        self.weave_store = GraphKnitTextStore(self, index_transport, self.weave_store)
 
     def _abort_write_group(self):
         # FIXME: just drop the transient index.
         self._revision_store.reset()
+        self.weave_store.reset()
+        # forget what names there are
+        self._data_names.reset()
 
     def _refresh_data(self):
         if self.control_files._lock_count==1:
             self._revision_store.reset()
+            self.weave_store.reset()
+            # forget what names there are
+            self._data_names.reset()
 
     def _start_write_group(self):
+        self._data_names.setup()
         self._revision_store.setup()
+        self.weave_store.setup()
 
     def _commit_write_group(self):
-        self._revision_store.flush()
+        data_inserted = (self._revision_store.data_inserted() or
+            self.weave_store.data_inserted())
+        if data_inserted:
+            new_name = self._data_names.allocate()
+            self._revision_store.flush(new_name)
+            self.weave_store.flush(new_name)
+            self._data_names.save()
         self._revision_store.reset()
+        self.weave_store.reset()
+        # forget what names there are - should just refresh and deal with the
+        # delta.
+        self._data_names.reset()
 
 
 class GraphKnitRepository3(KnitRepository3):
@@ -489,22 +637,43 @@
                  control_store, text_store):
         KnitRepository3.__init__(self, _format, a_bzrdir, control_files,
                               _revision_store, control_store, text_store)
-        self._revision_store = GraphKnitRevisionStore(self, self._revision_store)
+        index_transport = a_bzrdir.get_repository_transport(None).clone('indices')
+        self._data_names = RepositoryDataNames(self, index_transport)
+        self._revision_store = GraphKnitRevisionStore(self, index_transport, self._revision_store)
+        self.weave_store = GraphKnitTextStore(self, index_transport, self.weave_store)
 
     def _abort_write_group(self):
         # FIXME: just drop the transient index.
         self._revision_store.reset()
+        self.weave_store.reset()
+        # forget what names there are
+        self._data_names.reset()
 
     def _refresh_data(self):
         if self.control_files._lock_count==1:
             self._revision_store.reset()
+            self.weave_store.reset()
+            # forget what names there are
+            self._data_names.reset()
 
     def _start_write_group(self):
+        self._data_names.setup()
         self._revision_store.setup()
+        self.weave_store.setup()
 
     def _commit_write_group(self):
-        self._revision_store.flush()
+        data_inserted = (self._revision_store.data_inserted() or
+            self.weave_store.data_inserted())
+        if data_inserted:
+            new_name = self._data_names.allocate()
+            self._revision_store.flush(new_name)
+            self.weave_store.flush(new_name)
+            self._data_names.save()
         self._revision_store.reset()
+        self.weave_store.reset()
+        # forget what names there are - should just refresh and deal with the
+        # delta.
+        self._data_names.reset()
 
 
 class RepositoryFormatKnit(MetaDirRepositoryFormat):
@@ -568,7 +737,7 @@
                        repository.
         """
         mutter('creating repository in %s.', a_bzrdir.transport.base)
-        dirs = ['revision-store', 'knits']
+        dirs = ['knits']
         files = []
         utf8_files = [('format', self.get_format_string())]
         
@@ -759,6 +928,9 @@
     names.save()
     repo_transport.delete('revisions.kndx')
     repo_transport.delete('signatures.kndx')
+    for first in '0123456789abcdef':
+        for second in '0123456789abcdef':
+            repo_transport.mkdir('knits/%s%s' % (first, second))
 
 
 class RepositoryFormatGraphKnit3(RepositoryFormatKnit3):

=== modified file 'bzrlib/tests/repository_implementations/test_repository.py'
--- a/bzrlib/tests/repository_implementations/test_repository.py	2007-07-26 03:26:31 +0000
+++ b/bzrlib/tests/repository_implementations/test_repository.py	2007-07-30 05:07:02 +0000
@@ -196,6 +196,7 @@
         knit3_repo = b_bzrdir.create_repository()
         # fetch with a default limit (grab everything)
         knit3_repo.fetch(tree_a.branch.repository, revision_id=None)
+        knit3_repo = b_bzrdir.open_repository()
         rev1_tree = knit3_repo.revision_tree('rev1')
         lines = rev1_tree.get_file_lines(rev1_tree.inventory.root.file_id)
         self.assertEqual([], lines)

=== modified file 'bzrlib/tests/test_repository.py'
--- a/bzrlib/tests/test_repository.py	2007-07-26 04:35:56 +0000
+++ b/bzrlib/tests/test_repository.py	2007-07-30 05:07:02 +0000
@@ -609,6 +609,15 @@
         tree.commit('foobarbaz')
         self.assertTrue(trans.has('indices/0.six'))
 
+    def test_add_revision_creates_zero_dot_tix(self):
+        """Adding a revision makes a 0.tix (Text IndeX) file."""
+        format = self.get_format()
+        tree = self.make_branch_and_tree('.', format=format)
+        trans = tree.branch.repository.bzrdir.get_repository_transport(None)
+        self.assertFalse(trans.has('indices/0.tix'))
+        tree.commit('foobarbaz')
+        self.assertTrue(trans.has('indices/0.tix'))
+
     def test_pulling_nothing_leads_to_no_new_names(self):
         format = self.get_format()
         tree1 = self.make_branch_and_tree('1', format=format)
@@ -617,6 +626,7 @@
         trans = tree1.branch.repository.bzrdir.get_repository_transport(None)
         self.assertFalse(trans.has('indices/0.rix'))
         self.assertFalse(trans.has('indices/0.six'))
+        self.assertFalse(trans.has('indices/0.tix'))
         names = FileNames(trans.clone('indices'), 'index')
         names.load()
         self.assertEqual(set(), names.names())



More information about the bazaar-commits mailing list