Rev 33: Start working on an 'incremental' mode. in http://bzr.arbash-meinel.com/plugins/history_db
John Arbash Meinel
john at arbash-meinel.com
Sun Apr 4 19:37:05 BST 2010
At http://bzr.arbash-meinel.com/plugins/history_db
------------------------------------------------------------
revno: 33
revision-id: john at arbash-meinel.com-20100404183702-tmfo01aayvl46vuu
parent: john at arbash-meinel.com-20100404165308-qzj0az9u5x5qoi6u
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: history_db
timestamp: Sun 2010-04-04 13:37:02 -0500
message:
Start working on an 'incremental' mode.
The idea is that we can compute gdfo ourselves from the graph (which has
been implemented). And then we should be able to compute a merge_sort
for just the new revisions, which does not have to be O(history).
-------------- next part --------------
=== modified file '__init__.py'
--- a/__init__.py 2010-04-04 15:32:49 +0000
+++ b/__init__.py 2010-04-04 18:37:02 +0000
@@ -34,16 +34,18 @@
option.Option('directory', type=unicode, short_name='d',
help='Import this location instead of "."'),
option.Option('expand-all', help='Expand all revnos'),
+ option.Option('incremental', short_name='i',
+ help='Consider this an incremental update.')
]
- def run(self, directory='.', db=None, expand_all=False):
+ def run(self, directory='.', db=None, expand_all=False, incremental=False):
import pprint
from bzrlib.plugins.history_db import history_db
from bzrlib import branch
b = branch.Branch.open(directory)
b.lock_read()
try:
- importer = history_db.Importer(db, b)
+ importer = history_db.Importer(db, b, incremental=incremental)
importer.do_import(expand_all=expand_all)
importer.build_mainline_cache()
finally:
=== modified file 'history_db.py'
--- a/history_db.py 2010-04-04 16:53:08 +0000
+++ b/history_db.py 2010-04-04 18:37:02 +0000
@@ -25,6 +25,7 @@
import time
from bzrlib import (
+ revision,
trace,
ui,
)
@@ -32,18 +33,22 @@
from bzrlib.plugins.history_db import schema
+NULL_PARENTS = (revision.NULL_REVISION,)
+
class Importer(object):
"""Import data from bzr into the history_db."""
- def __init__(self, db_path, a_branch):
+ def __init__(self, db_path, a_branch, incremental=False):
db_conn = dbapi2.connect(db_path)
+ self._incremental = incremental
self._db_conn = db_conn
self._ensure_schema()
self._cursor = self._db_conn.cursor()
self._branch = a_branch
self._branch_tip_rev_id = a_branch.last_revision()
self._branch_tip_key = (self._branch_tip_rev_id,)
- self._get_graph()
+ self._graph = None
+ self._ensure_graph()
self._rev_id_to_db_id = {}
self._stats = defaultdict(lambda: 0)
@@ -51,12 +56,17 @@
if not schema.is_initialized(self._db_conn, dbapi2.OperationalError):
schema.create_sqlite_db(self._db_conn)
trace.note('Initialized database')
+ # We know we can't do this incrementally, because nothing has
+ # existed before...
+ #self._incremental = False
def _ensure_revisions(self, revision_ids):
schema.ensure_revisions(self._cursor, revision_ids,
self._rev_id_to_db_id, self._graph)
- def _get_graph(self):
+ def _ensure_graph(self):
+ if self._graph is not None:
+ return
repo = self._branch.repository
self._graph = repo.revisions.get_known_graph_ancestry(
[self._branch_tip_key])
@@ -97,6 +107,10 @@
parent_map = dict(
(n.key[0], [p[0] for p in self._graph.get_parent_keys(n.key)])
for n in nodes)
+ self._insert_parent_map(parent_map)
+
+ def _insert_parent_map(self, parent_map):
+ """Insert all the entries in this parent map into the parent table."""
rev_ids = set(parent_map)
map(rev_ids.update, parent_map.itervalues())
self._ensure_revisions(rev_ids)
@@ -110,6 +124,8 @@
"VALUES (?, ?, ?)", data)
def do_import(self, expand_all=False):
+ if self._incremental:
+ self._update_ancestry(self._branch_tip_rev_id)
merge_sorted = self._import_tip(self._branch_tip_rev_id)
if not expand_all:
return
@@ -187,6 +203,105 @@
self._db_conn.commit()
return merge_sorted
+ def _update_ancestry(self, new_tip_rev_id):
+ """Walk the parents of this tip, updating 'revision' and 'parent'
+
+ self._rev_id_to_db_id will be updated.
+ """
+ (known, parent_map,
+ children) = self._find_known_ancestors(new_tip_rev_id)
+ self._compute_gdfo_and_insert(known, children, parent_map)
+ self._insert_parent_map(parent_map)
+ self._db_conn.commit()
+
+ def _find_known_ancestors(self, new_tip_rev_id):
+ """Starting at tip, find ancestors we already have"""
+ needed = [new_tip_rev_id]
+ all_needed = set(new_tip_rev_id)
+ children = {}
+ parent_map = {}
+ known = {}
+ while needed:
+ rev_id = needed.pop()
+ if rev_id in known:
+ # We may add particular parents multiple times, just ignore
+ # them once they've been found
+ continue
+ res = self._cursor.execute("SELECT gdfo"
+ " FROM revision WHERE revision_id = ?",
+ (rev_id,)).fetchone()
+ if res is not None:
+ known[rev_id] = res[0]
+ continue
+ # We don't have this entry recorded yet, add the parents to the
+ # search
+ pmap = self._branch.repository.get_parent_map([rev_id])
+ parent_map.update(pmap)
+ parent_ids = pmap.get(rev_id, ())
+ if not parent_ids or parent_ids == NULL_PARENTS:
+ # We can insert this rev directly, because we know its gdfo,
+ # as it has no parents.
+ parent_map[rev_id] = ()
+ self._cursor.execute("INSERT INTO revision (revision_id, gdfo)"
+ " VALUES (?, ?)", (rev_id, 1))
+ # Wrap around to populate known quickly
+ needed.append(rev_id)
+ continue
+ for parent_id in pmap[rev_id]:
+ if parent_id not in known:
+ if parent_id not in all_needed:
+ needed.append(parent_id)
+ all_needed.add(parent_id)
+ children.setdefault(parent_id, []).append(rev_id)
+ return known, parent_map, children
+
+ def _compute_gdfo_and_insert(self, known, children, parent_map):
+ # At this point, we should have walked to all known parents, and should
+ # be able to build up the gdfo and parent info for all keys.
+ pending = [(gdfo, rev_id) for rev_id, gdfo in known.iteritems()]
+ while pending:
+ gdfo, rev_id = pending.pop()
+ for child_id in children.get(rev_id, []):
+ if child_id in known:
+ # XXX: Already numbered?
+ assert known[child_id] > gdfo
+ continue
+ parent_ids = parent_map[child_id]
+ max_gdfo = -1
+ for parent_id in parent_ids:
+ try:
+ this_gdfo = known[parent_id]
+ except KeyError:
+ # One parent hasn't been computed yet
+ break
+ if this_gdfo > max_gdfo:
+ max_gdfo = this_gdfo
+ else:
+ # All parents have their gdfo known
+ # assert gdfo == max_gdfo
+ child_gdfo = max_gdfo + 1
+ known[child_id] = child_gdfo
+ self._cursor.execute(
+ "INSERT INTO revision (revision_id, gdfo)"
+ " VALUES (?, ?)",
+ (child_id, child_gdfo))
+ # Put this into the pending queue so that *its* children
+ # also get updated
+ pending.append((child_gdfo, child_id))
+ if self._graph is not None:
+ for rev_id, gdfo in known.iteritems():
+ assert gdfo == self._graph._nodes[(rev_id,)].gdfo
+
+ def _get_db_id(self, revision_id):
+ return self._cursor.execute('SELECT db_id FROM revision'
+ ' WHERE revision_id = ?',
+ (revision_id,)).fetchone()[0]
+
+ def _update_dotted(self, new_tip_rev_id):
+ """We have a new 'tip' revision, Update the dotted_revno table."""
+ # Just make sure the db has valid info for all the existing entries
+ self._update_ancestry(new_tip_rev_id)
+
def _check_range_exists_for(self, head_db_id):
"""Does the given head_db_id already have a range defined using it."""
return self._cursor.execute("SELECT count(*) FROM mainline_parent_range"
@@ -223,6 +338,10 @@
def build_mainline_cache(self):
"""Given the current branch, cache mainline information."""
+ self._build_one_mainline(self._branch_tip_rev_id)
+ # TODO: expand_all?
+
+ def _build_one_mainline(self, head_rev_id):
# I'm not sure if this is optimal, but for now, I just walk backwards
# through the mainline, and see if there is already a cached version,
# or if I've stepped 100 revisions. If I've gone 100, I checkpoint, and
@@ -235,12 +354,14 @@
# nodes.
# c) If we do find a tip, see how many revisions it points to (Z). If
# X + Z < threshold, then collapse the ranges (this could
- # potentially be done multiple times.)
+ # potentially be done multiple times.) However, I *think* that if
+ # the policy is to collapse at 1, then you should avoid chained
+ # collapses. (Any given revision should have only 1 partial jump
+ # before it gets into large-range areas.)
# The specific thresholds are arbitrary, but it should mean you would
# average a larger 'minimum' size. And (c) helps avoid fragmentation.
# (Where multiple imports turn a 100-revision range into 20 5-revision
# ranges.)
- head_rev_id = self._branch_tip_rev_id
self._ensure_revisions([head_rev_id])
cur_db_id = self._rev_id_to_db_id[head_rev_id]
range_db_ids = []
More information about the bazaar-commits
mailing list