Rev 33: Start working on an 'incremental' mode. in http://bzr.arbash-meinel.com/plugins/history_db

John Arbash Meinel john at arbash-meinel.com
Sun Apr 4 19:37:05 BST 2010


At http://bzr.arbash-meinel.com/plugins/history_db

------------------------------------------------------------
revno: 33
revision-id: john at arbash-meinel.com-20100404183702-tmfo01aayvl46vuu
parent: john at arbash-meinel.com-20100404165308-qzj0az9u5x5qoi6u
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: history_db
timestamp: Sun 2010-04-04 13:37:02 -0500
message:
  Start working on an 'incremental' mode.
  
  The idea is that we can compute gdfo ourselves from the graph (which has
  been implemented). And then we should be able to compute a merge_sort
  for just the new revisions, which does not have to be O(history).
-------------- next part --------------
=== modified file '__init__.py'
--- a/__init__.py	2010-04-04 15:32:49 +0000
+++ b/__init__.py	2010-04-04 18:37:02 +0000
@@ -34,16 +34,18 @@
                      option.Option('directory', type=unicode, short_name='d',
                         help='Import this location instead of "."'),
                      option.Option('expand-all', help='Expand all revnos'),
+                     option.Option('incremental', short_name='i',
+                        help='Consider this an incremental update.')
                     ]
 
-    def run(self, directory='.', db=None, expand_all=False):
+    def run(self, directory='.', db=None, expand_all=False, incremental=False):
         import pprint
         from bzrlib.plugins.history_db import history_db
         from bzrlib import branch
         b = branch.Branch.open(directory)
         b.lock_read()
         try:
-            importer = history_db.Importer(db, b)
+            importer = history_db.Importer(db, b, incremental=incremental)
             importer.do_import(expand_all=expand_all)
             importer.build_mainline_cache()
         finally:

=== modified file 'history_db.py'
--- a/history_db.py	2010-04-04 16:53:08 +0000
+++ b/history_db.py	2010-04-04 18:37:02 +0000
@@ -25,6 +25,7 @@
 import time
 
 from bzrlib import (
+    revision,
     trace,
     ui,
     )
@@ -32,18 +33,22 @@
 from bzrlib.plugins.history_db import schema
 
 
+NULL_PARENTS = (revision.NULL_REVISION,)
+
 class Importer(object):
     """Import data from bzr into the history_db."""
 
-    def __init__(self, db_path, a_branch):
+    def __init__(self, db_path, a_branch, incremental=False):
         db_conn = dbapi2.connect(db_path)
+        self._incremental = incremental
         self._db_conn = db_conn
         self._ensure_schema()
         self._cursor = self._db_conn.cursor()
         self._branch = a_branch
         self._branch_tip_rev_id = a_branch.last_revision()
         self._branch_tip_key = (self._branch_tip_rev_id,)
-        self._get_graph()
+        self._graph = None
+        self._ensure_graph()
         self._rev_id_to_db_id = {}
         self._stats = defaultdict(lambda: 0)
 
@@ -51,12 +56,17 @@
         if not schema.is_initialized(self._db_conn, dbapi2.OperationalError):
             schema.create_sqlite_db(self._db_conn)
             trace.note('Initialized database')
+            # We know we can't do this incrementally, because nothing has
+            # existed before...
+            #self._incremental = False
 
     def _ensure_revisions(self, revision_ids):
         schema.ensure_revisions(self._cursor, revision_ids,
                                 self._rev_id_to_db_id, self._graph)
 
-    def _get_graph(self):
+    def _ensure_graph(self):
+        if self._graph is not None:
+            return
         repo = self._branch.repository
         self._graph = repo.revisions.get_known_graph_ancestry(
             [self._branch_tip_key])
@@ -97,6 +107,10 @@
         parent_map = dict(
             (n.key[0], [p[0] for p in self._graph.get_parent_keys(n.key)])
             for n in nodes)
+        self._insert_parent_map(parent_map)
+
+    def _insert_parent_map(self, parent_map):
+        """Insert all the entries in this parent map into the parent table."""
         rev_ids = set(parent_map)
         map(rev_ids.update, parent_map.itervalues())
         self._ensure_revisions(rev_ids)
@@ -110,6 +124,8 @@
                                  "VALUES (?, ?, ?)", data)
 
     def do_import(self, expand_all=False):
+        if self._incremental:
+            self._update_ancestry(self._branch_tip_rev_id)
         merge_sorted = self._import_tip(self._branch_tip_rev_id)
         if not expand_all:
             return
@@ -187,6 +203,105 @@
                 self._db_conn.commit()
         return merge_sorted
 
+    def _update_ancestry(self, new_tip_rev_id):
+        """Walk the parents of this tip, updating 'revision' and 'parent'
+
+        self._rev_id_to_db_id will be updated.
+        """
+        (known, parent_map,
+         children) = self._find_known_ancestors(new_tip_rev_id)
+        self._compute_gdfo_and_insert(known, children, parent_map)
+        self._insert_parent_map(parent_map)
+        self._db_conn.commit()
+
+    def _find_known_ancestors(self, new_tip_rev_id):
+        """Starting at tip, find ancestors we already have"""
+        needed = [new_tip_rev_id]
+        all_needed = set(new_tip_rev_id)
+        children = {}
+        parent_map = {}
+        known = {}
+        while needed:
+            rev_id = needed.pop()
+            if rev_id in known:
+                # We may add particular parents multiple times, just ignore
+                # them once they've been found
+                continue
+            res = self._cursor.execute("SELECT gdfo"
+                                       "  FROM revision WHERE revision_id = ?",
+                                       (rev_id,)).fetchone()
+            if res is not None:
+                known[rev_id] = res[0]
+                continue
+            # We don't have this entry recorded yet, add the parents to the
+            # search
+            pmap = self._branch.repository.get_parent_map([rev_id])
+            parent_map.update(pmap)
+            parent_ids = pmap.get(rev_id, ())
+            if not parent_ids or parent_ids == NULL_PARENTS:
+                # We can insert this rev directly, because we know its gdfo,
+                # as it has no parents.
+                parent_map[rev_id] = ()
+                self._cursor.execute("INSERT INTO revision (revision_id, gdfo)"
+                                     " VALUES (?, ?)", (rev_id, 1))
+                # Wrap around to populate known quickly
+                needed.append(rev_id)
+                continue
+            for parent_id in pmap[rev_id]:
+                if parent_id not in known:
+                    if parent_id not in all_needed:
+                        needed.append(parent_id)
+                        all_needed.add(parent_id)
+                children.setdefault(parent_id, []).append(rev_id)
+        return known, parent_map, children
+
+    def _compute_gdfo_and_insert(self, known, children, parent_map):
+        # At this point, we should have walked to all known parents, and should
+        # be able to build up the gdfo and parent info for all keys.
+        pending = [(gdfo, rev_id) for rev_id, gdfo in known.iteritems()]
+        while pending:
+            gdfo, rev_id = pending.pop()
+            for child_id in children.get(rev_id, []):
+                if child_id in known:
+                    # XXX: Already numbered?
+                    assert known[child_id] > gdfo
+                    continue
+                parent_ids = parent_map[child_id]
+                max_gdfo = -1
+                for parent_id in parent_ids:
+                    try:
+                        this_gdfo = known[parent_id]
+                    except KeyError:
+                        # One parent hasn't been computed yet
+                        break
+                    if this_gdfo > max_gdfo:
+                        max_gdfo = this_gdfo
+                else:
+                    # All parents have their gdfo known
+                    # assert gdfo == max_gdfo
+                    child_gdfo = max_gdfo + 1
+                    known[child_id] = child_gdfo
+                    self._cursor.execute(
+                        "INSERT INTO revision (revision_id, gdfo)"
+                        " VALUES (?, ?)",
+                        (child_id, child_gdfo))
+                    # Put this into the pending queue so that *its* children
+                    # also get updated
+                    pending.append((child_gdfo, child_id))
+        if self._graph is not None:
+            for rev_id, gdfo in known.iteritems():
+                assert gdfo == self._graph._nodes[(rev_id,)].gdfo
+
+    def _get_db_id(self, revision_id):
+        return self._cursor.execute('SELECT db_id FROM revision'
+                                    ' WHERE revision_id = ?',
+                                    (revision_id,)).fetchone()[0]
+
+    def _update_dotted(self, new_tip_rev_id):
+        """We have a new 'tip' revision, Update the dotted_revno table."""
+        # Just make sure the db has valid info for all the existing entries
+        self._update_ancestry(new_tip_rev_id)
+
     def _check_range_exists_for(self, head_db_id):
         """Does the given head_db_id already have a range defined using it."""
         return self._cursor.execute("SELECT count(*) FROM mainline_parent_range"
@@ -223,6 +338,10 @@
 
     def build_mainline_cache(self):
         """Given the current branch, cache mainline information."""
+        self._build_one_mainline(self._branch_tip_rev_id)
+        # TODO: expand_all?
+
+    def _build_one_mainline(self, head_rev_id):
         # I'm not sure if this is optimal, but for now, I just walk backwards
         # through the mainline, and see if there is already a cached version,
         # or if I've stepped 100 revisions. If I've gone 100, I checkpoint, and
@@ -235,12 +354,14 @@
         #     nodes.
         #  c) If we do find a tip, see how many revisions it points to (Z). If
         #     X + Z < threshold, then collapse the ranges (this could
-        #     potentially be done multiple times.)
+        #     potentially be done multiple times.) However, I *think* that if
+        #     the policy is to collapse at 1, then you should avoid chained
+        #     collapses. (Any given revision should have only 1 partial jump
+        #     before it gets into large-range areas.)
         # The specific thresholds are arbitrary, but it should mean you would
         # average a larger 'minimum' size. And (c) helps avoid fragmentation.
         # (Where multiple imports turn a 100-revision range into 20 5-revision
         # ranges.)
-        head_rev_id = self._branch_tip_rev_id
         self._ensure_revisions([head_rev_id])
         cur_db_id = self._rev_id_to_db_id[head_rev_id]
         range_db_ids = []



More information about the bazaar-commits mailing list