Rev 8: Persistent storage of indices is a good idea. in http://people.ubuntu.com/~robertc/baz2.0/plugins/search/trunk

Robert Collins robertc at robertcollins.net
Sun Jun 8 14:36:39 BST 2008


At http://people.ubuntu.com/~robertc/baz2.0/plugins/search/trunk

------------------------------------------------------------
revno: 8
revision-id: robertc at robertcollins.net-20080608133638-j5yazpedr3mlixju
parent: robertc at robertcollins.net-20080608114120-2gjfj34t8xuqp2qt
committer: Robert Collins <robertc at robertcollins.net>
branch nick: trunk
timestamp: Sun 2008-06-08 23:36:38 +1000
message:
  Persistent storage of indices is a good idea.
modified:
  index.py                       index.py-20080608055509-hnimeek7q8tctkqf-2
  tests/test_index.py            test_index.py-20080608055509-hnimeek7q8tctkqf-4
=== modified file 'index.py'
--- a/index.py	2008-06-08 11:41:20 +0000
+++ b/index.py	2008-06-08 13:36:38 +0000
@@ -17,10 +17,12 @@
 
 """The core logic for search."""
 
+import md5
+
 from bzrlib import branch as _mod_branch
 from bzrlib.bzrdir import BzrDirMeta1
 from bzrlib.errors import NotBranchError, NoSuchFile, UnknownFormatError
-from bzrlib.index import CombinedGraphIndex, InMemoryGraphIndex
+from bzrlib.index import CombinedGraphIndex, GraphIndex, InMemoryGraphIndex
 from bzrlib.lockdir import LockDir
 from bzrlib.plugins.search import errors
 
@@ -41,8 +43,11 @@
     lockdir.lock_write()
     try:
         index_transport.put_bytes('format', _FORMAT_1)
-        names_list = InMemoryGraphIndex(1, 3)
+        names_list = InMemoryGraphIndex(0, 1)
         index_transport.put_file('names', names_list.finish())
+        index_transport.mkdir('obsolete')
+        index_transport.mkdir('indices')
+        index_transport.mkdir('upload')
     finally:
         lockdir.unlock()
     return open_index_url(branch.bzrdir.root_transport.base)
@@ -76,6 +81,8 @@
     return Index(branch.bzrdir.transport.clone('bzr-search'))
 
 
+# XXX: This wants to be a PackCollection subclass with RepositoryPackCollection
+# being a sibling. For now though, copy and paste FTW.
 class Index(object):
     """A bzr content index."""
 
@@ -84,15 +91,21 @@
 
         :param index_transport: The path where the index data should be stored.
         """
-        self.transport = index_transport
+        self._transport = index_transport
         try:
-            format = self.transport.get_bytes('format')
+            format = self._transport.get_bytes('format')
         except NoSuchFile:
             raise errors.NoSearchIndex(index_transport)
+        self._upload_transport = self._transport.clone('upload')
+        self._obsolete_transport = self._transport.clone('obsolete')
+        self._indices_transport = self._transport.clone('indices')
         if _FORMAT_1 != format:
             raise UnknownFormatError(format, 'bzr-search')
         self._indices = []
+        self._orig_names = {}
+        self._current_names = {}
         self._index = CombinedGraphIndex(self._indices)
+        self._lock = LockDir(index_transport, 'names-lock')
 
     def index_revisions(self, branch, revisions_to_index):
         """Index some revisions from branch.
@@ -100,18 +113,83 @@
         :param branch: A branch to index.
         :param revisions_to_index: A set of revision ids to index.
         """
-        # TODO: split into groups of <reasonable memory size>
+        # TODO: split into groups of <reasonable memory size> for which we
+        # then:
         # here: index texts
         # here: index inventory/paths
-        # here: index revision
+        # here: index revisions
         index = InMemoryGraphIndex(1, 3)
         for rev_id in revisions_to_index:
             index.add_node(('x', 'r', rev_id), '', ((),))
+        # write to disc.
+        index_bytes = index.finish().read()
+        index_name = md5.new(index_bytes).hexdigest()
+        index_value = "%d" % len(index_bytes)
+        self._upload_transport.put_bytes_non_atomic(index_name, index_bytes)
+        self._add_index(index, index_name, index_value)
+
+    def _add_index(self, index, name, value):
+        """Add an index to the list of indices."""
+        index_name = name
+        if name in self._current_names:
+            raise Exception("md5 collision! rad! %s" % name)
+        self._refresh_indices()
+        if name in self._current_names:
+            raise Exception("md5 collision with concurrent writer! rad! %s" % name)
+        self._lock.lock_write()
+        try:
+            # Serialise the index list
+            new_names = InMemoryGraphIndex(0, 1)
+            new_names.add_node((name,), value, ())
+            for name, (value, index) in self._current_names.items():
+                new_names.add_node((name,), value, ())
+            # Now, as the last step, rename the new index into place and update
+            # the disk list of names.
+            self._upload_transport.rename(index_name, '../indices/' + index_name)
+            self._transport.put_file('names', new_names.finish())
+        finally:
+            self._lock.unlock()
+        # Add in-memory
+        self._current_names[name] = (value, index)
+        # Allow it to be searched
         self._indices.append(index)
-        # write to disc.
-        # pack etc.
+        # Its safely inserted. Trigger a pack ?
 
     def indexed_revisions(self):
         """Return the revision_keys that this index contains terms for."""
+        self._refresh_indices()
         for node in self._index.iter_entries_prefix([('x', 'r', None)]):
             yield node[1][2:3]
+
+    def _refresh_indices(self):
+        """Merge on-disk index lists into the memory top level index list."""
+        names = GraphIndex(self._transport, 'names', None)
+        new_names = {}
+        merged_names = {}
+        deleted_names = set()
+        added_names = set()
+        same_names = set()
+        for node in names.iter_all_entries():
+            name = node[1][0]
+            value = node[2]
+            new_names[name] = [value, None]
+        for name in new_names:
+            if name not in self._orig_names:
+                added_names.add(name)
+            elif name in self._current_names:
+                self.same_names.add(name)
+            else:
+                # in our last read; not in memory anymore:
+                deleted_names.add(name)
+                # XXX perhaps cross-check the size?
+        # in case of exception, have no change until we are finished:
+        current_names = dict(self._current_names.items())
+        for name in added_names:
+            # we need an index object:
+            # NB: we disable bisection because in this get-it-going layout
+            # post-lists may be >>>> than the bisection code is prepared for.
+            index = GraphIndex(self._indices_transport, name, None)
+            self._current_names[name] = (new_names[name][0], index)
+            self._indices.append(index)
+        self._current_names = current_names
+        self._orig_names = new_names

=== modified file 'tests/test_index.py'
--- a/tests/test_index.py	2008-06-08 11:41:20 +0000
+++ b/tests/test_index.py	2008-06-08 13:36:38 +0000
@@ -38,6 +38,10 @@
             transport.get_bytes('format'))
         names_list = GraphIndex(transport, 'names', None)
         self.assertEqual([], list(names_list.iter_all_entries()))
+        # And a number of empty directories
+        self.assertTrue(transport.has('obsolete'))
+        self.assertTrue(transport.has('upload'))
+        self.assertTrue(transport.has('indices'))
 
     def test_init_index_unindexable(self):
         # any non-metadir will do here:
@@ -81,3 +85,6 @@
         revid = tree.commit('first post')
         rev_index.index_revisions(tree.branch, [revid])
         self.assertEqual(set([(revid,)]), set(rev_index.indexed_revisions()))
+        # reopen - it should retain the indexed revisions.
+        rev_index = index.open_index_url('')
+        self.assertEqual(set([(revid,)]), set(rev_index.indexed_revisions()))




More information about the bazaar-commits mailing list