Rev 5774: Resolve all the conflicts for merging move-pack-access in http://bazaar.launchpad.net/~jameinel/bzr/integration
John Arbash Meinel
john at arbash-meinel.com
Fri Apr 8 14:08:27 UTC 2011
At http://bazaar.launchpad.net/~jameinel/bzr/integration
------------------------------------------------------------
revno: 5774 [merge]
revision-id: john at arbash-meinel.com-20110408140811-9vxbb47etgwfp4z2
parent: john at arbash-meinel.com-20110408140350-fg45nkkx71l4sjvt
parent: jelmer at samba.org-20110405234314-78o98lmtky0lu8nv
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: integration
timestamp: Fri 2011-04-08 16:08:11 +0200
message:
Resolve all the conflicts for merging move-pack-access
modified:
bzrlib/groupcompress.py groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
bzrlib/knit.py knit.py-20051212171256-f056ac8f0fbe1bd9
bzrlib/repofmt/groupcompress_repo.py repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
bzrlib/repofmt/knitpack_repo.py knitpack_repo.py-20110405143430-6p75yrk99v6pb770-1
bzrlib/repofmt/pack_repo.py pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
bzrlib/tests/test_knit.py test_knit.py-20051212171302-95d4c00dd5f11f2b
-------------- next part --------------
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py 2011-04-05 14:48:54 +0000
+++ b/bzrlib/groupcompress.py 2011-04-08 14:08:11 +0000
@@ -36,6 +36,7 @@
)
from bzrlib.btree_index import BTreeBuilder
from bzrlib.lru_cache import LRUSizeCache
+from bzrlib.repofmt import pack_repo
from bzrlib.tsort import topo_sort
from bzrlib.versionedfile import (
adapter_registry,
@@ -1046,7 +1047,7 @@
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
add_callback=graph_index.add_nodes,
inconsistency_fatal=inconsistency_fatal)
- access = knit._DirectPackAccess({})
+ access = pack_repo._DirectPackAccess({})
access.set_writer(writer, graph_index, (transport, 'newpack'))
result = GroupCompressVersionedFiles(index, access, delta)
result.stream = stream
=== modified file 'bzrlib/knit.py'
--- a/bzrlib/knit.py 2011-04-05 14:48:54 +0000
+++ b/bzrlib/knit.py 2011-04-08 14:08:11 +0000
@@ -56,7 +56,6 @@
from itertools import izip
import operator
import os
-import sys
from bzrlib.lazy_import import lazy_import
lazy_import(globals(), """
@@ -799,7 +798,7 @@
writer.begin()
index = _KnitGraphIndex(graph_index, lambda:True, parents=parents,
deltas=delta, add_callback=graph_index.add_nodes)
- access = _DirectPackAccess({})
+ access = pack_repo._DirectPackAccess({})
access.set_writer(writer, graph_index, (transport, 'newpack'))
result = KnitVersionedFiles(index, access,
max_delta_chain=max_delta_chain)
=== modified file 'bzrlib/repofmt/groupcompress_repo.py'
--- a/bzrlib/repofmt/groupcompress_repo.py 2011-04-05 23:47:06 +0000
+++ b/bzrlib/repofmt/groupcompress_repo.py 2011-04-08 14:08:11 +0000
@@ -44,6 +44,7 @@
GroupCompressVersionedFiles,
)
from bzrlib.repofmt.pack_repo import (
+ _DirectPackAccess,
Pack,
NewPack,
PackRepository,
@@ -354,8 +355,8 @@
"""Build a VersionedFiles instance on top of this group of packs."""
index_name = index_name + '_index'
index_to_pack = {}
- access = knit._DirectPackAccess(index_to_pack,
- reload_func=self._reload_func)
+ access = _DirectPackAccess(index_to_pack,
+ reload_func=self._reload_func)
if for_write:
# Use new_pack
if self.new_pack is None:
=== modified file 'bzrlib/repofmt/knitpack_repo.py'
--- a/bzrlib/repofmt/knitpack_repo.py 2011-04-05 23:53:18 +0000
+++ b/bzrlib/repofmt/knitpack_repo.py 2011-04-08 14:08:11 +0000
@@ -56,6 +56,7 @@
KnitRepository,
)
from bzrlib.repofmt.pack_repo import (
+ _DirectPackAccess,
NewPack,
RepositoryFormatPack,
ResumedPack,
@@ -1074,7 +1075,7 @@
self.new_pack.text_index,
('blank', ), 1,
add_nodes_callback=self.new_pack.text_index.add_nodes)
- data_access = knit._DirectPackAccess(
+ data_access = _DirectPackAccess(
{self.new_pack.text_index:self.new_pack.access_tuple()})
data_access.set_writer(self.new_pack._writer, self.new_pack.text_index,
self.new_pack.access_tuple())
=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py 2011-04-08 14:03:50 +0000
+++ b/bzrlib/repofmt/pack_repo.py 2011-04-08 14:08:11 +0000
@@ -1895,3 +1895,143 @@
" context: %(context)s %(orig_error)s")
+class _DirectPackAccess(object):
+ """Access to data in one or more packs with less translation."""
+
+ def __init__(self, index_to_packs, reload_func=None, flush_func=None):
+ """Create a _DirectPackAccess object.
+
+ :param index_to_packs: A dict mapping index objects to the transport
+ and file names for obtaining data.
+ :param reload_func: A function to call if we determine that the pack
+ files have moved and we need to reload our caches. See
+ bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
+ """
+ self._container_writer = None
+ self._write_index = None
+ self._indices = index_to_packs
+ self._reload_func = reload_func
+ self._flush_func = flush_func
+
+ def add_raw_records(self, key_sizes, raw_data):
+ """Add raw knit bytes to a storage area.
+
+ The data is spooled to the container writer in one bytes-record per
+ raw data item.
+
+ :param sizes: An iterable of tuples containing the key and size of each
+ raw data segment.
+ :param raw_data: A bytestring containing the data.
+ :return: A list of memos to retrieve the record later. Each memo is an
+ opaque index memo. For _DirectPackAccess the memo is (index, pos,
+ length), where the index field is the write_index object supplied
+ to the PackAccess object.
+ """
+ if type(raw_data) is not str:
+ raise AssertionError(
+ 'data must be plain bytes was %s' % type(raw_data))
+ result = []
+ offset = 0
+ for key, size in key_sizes:
+ p_offset, p_length = self._container_writer.add_bytes_record(
+ raw_data[offset:offset+size], [])
+ offset += size
+ result.append((self._write_index, p_offset, p_length))
+ return result
+
+ def flush(self):
+ """Flush pending writes on this access object.
+
+ This will flush any buffered writes to a NewPack.
+ """
+ if self._flush_func is not None:
+ self._flush_func()
+
+ def get_raw_records(self, memos_for_retrieval):
+ """Get the raw bytes for a records.
+
+ :param memos_for_retrieval: An iterable containing the (index, pos,
+ length) memo for retrieving the bytes. The Pack access method
+ looks up the pack to use for a given record in its index_to_pack
+ map.
+ :return: An iterator over the bytes of the records.
+ """
+ # first pass, group into same-index requests
+ request_lists = []
+ current_index = None
+ for (index, offset, length) in memos_for_retrieval:
+ if current_index == index:
+ current_list.append((offset, length))
+ else:
+ if current_index is not None:
+ request_lists.append((current_index, current_list))
+ current_index = index
+ current_list = [(offset, length)]
+ # handle the last entry
+ if current_index is not None:
+ request_lists.append((current_index, current_list))
+ for index, offsets in request_lists:
+ try:
+ transport, path = self._indices[index]
+ except KeyError:
+ # A KeyError here indicates that someone has triggered an index
+ # reload, and this index has gone missing, we need to start
+ # over.
+ if self._reload_func is None:
+ # If we don't have a _reload_func there is nothing that can
+ # be done
+ raise
+ raise errors.RetryWithNewPacks(index,
+ reload_occurred=True,
+ exc_info=sys.exc_info())
+ try:
+ reader = pack.make_readv_reader(transport, path, offsets)
+ for names, read_func in reader.iter_records():
+ yield read_func(None)
+ except errors.NoSuchFile:
+ # A NoSuchFile error indicates that a pack file has gone
+ # missing on disk, we need to trigger a reload, and start over.
+ if self._reload_func is None:
+ raise
+ raise errors.RetryWithNewPacks(transport.abspath(path),
+ reload_occurred=False,
+ exc_info=sys.exc_info())
+
+ def set_writer(self, writer, index, transport_packname):
+ """Set a writer to use for adding data."""
+ if index is not None:
+ self._indices[index] = transport_packname
+ self._container_writer = writer
+ self._write_index = index
+
+ def reload_or_raise(self, retry_exc):
+ """Try calling the reload function, or re-raise the original exception.
+
+ This should be called after _DirectPackAccess raises a
+ RetryWithNewPacks exception. This function will handle the common logic
+ of determining when the error is fatal versus being temporary.
+ It will also make sure that the original exception is raised, rather
+ than the RetryWithNewPacks exception.
+
+ If this function returns, then the calling function should retry
+ whatever operation was being performed. Otherwise an exception will
+ be raised.
+
+ :param retry_exc: A RetryWithNewPacks exception.
+ """
+ is_error = False
+ if self._reload_func is None:
+ is_error = True
+ elif not self._reload_func():
+ # The reload claimed that nothing changed
+ if not retry_exc.reload_occurred:
+ # If there wasn't an earlier reload, then we really were
+ # expecting to find changes. We didn't find them, so this is a
+ # hard error
+ is_error = True
+ if is_error:
+ exc_class, exc_value, exc_traceback = retry_exc.exc_info
+ raise exc_class, exc_value, exc_traceback
+
+
+
=== modified file 'bzrlib/tests/test_knit.py'
--- a/bzrlib/tests/test_knit.py 2011-04-05 23:47:06 +0000
+++ b/bzrlib/tests/test_knit.py 2011-04-08 14:08:11 +0000
@@ -325,7 +325,7 @@
transport.append_bytes(packname, bytes)
writer = pack.ContainerWriter(write_data)
writer.begin()
- access = _DirectPackAccess({})
+ access = pack_repo._DirectPackAccess({})
access.set_writer(writer, index, (transport, packname))
return access, writer
@@ -456,7 +456,7 @@
memos.extend(access.add_raw_records([('key', 5)], 'alpha'))
writer.end()
transport = self.get_transport()
- access = _DirectPackAccess({"FOO":(transport, 'packfile'),
+ access = pack_repo._DirectPackAccess({"FOO":(transport, 'packfile'),
"FOOBAR":(transport, 'pack2'),
"BAZ":(transport, 'pack3')})
self.assertEqual(['1234567890', '12345', 'alpha'],
@@ -472,7 +472,7 @@
def test_set_writer(self):
"""The writer should be settable post construction."""
- access = _DirectPackAccess({})
+ access = pack_repo._DirectPackAccess({})
transport = self.get_transport()
packname = 'packfile'
index = 'foo'
@@ -490,7 +490,7 @@
transport = self.get_transport()
reload_called, reload_func = self.make_reload_func()
# Note that the index key has changed from 'foo' to 'bar'
- access = _DirectPackAccess({'bar':(transport, 'packname')},
+ access = pack_repo._DirectPackAccess({'bar':(transport, 'packname')},
reload_func=reload_func)
e = self.assertListRaises(errors.RetryWithNewPacks,
access.get_raw_records, memos)
@@ -505,7 +505,7 @@
memos = self.make_pack_file()
transport = self.get_transport()
# Note that the index key has changed from 'foo' to 'bar'
- access = _DirectPackAccess({'bar':(transport, 'packname')})
+ access = pack_repo._DirectPackAccess({'bar':(transport, 'packname')})
e = self.assertListRaises(KeyError, access.get_raw_records, memos)
def test_missing_file_raises_retry(self):
@@ -513,8 +513,9 @@
transport = self.get_transport()
reload_called, reload_func = self.make_reload_func()
# Note that the 'filename' has been changed to 'different-packname'
- access = _DirectPackAccess({'foo':(transport, 'different-packname')},
- reload_func=reload_func)
+ access = pack_repo._DirectPackAccess(
+ {'foo':(transport, 'different-packname')},
+ reload_func=reload_func)
e = self.assertListRaises(errors.RetryWithNewPacks,
access.get_raw_records, memos)
# The file has gone missing, so we assume we need to reload
@@ -528,7 +529,8 @@
memos = self.make_pack_file()
transport = self.get_transport()
# Note that the 'filename' has been changed to 'different-packname'
- access = _DirectPackAccess({'foo':(transport, 'different-packname')})
+ access = pack_repo._DirectPackAccess(
+ {'foo':(transport, 'different-packname')})
e = self.assertListRaises(errors.NoSuchFile,
access.get_raw_records, memos)
@@ -538,8 +540,9 @@
failing_transport = MockReadvFailingTransport(
[transport.get_bytes('packname')])
reload_called, reload_func = self.make_reload_func()
- access = _DirectPackAccess({'foo':(failing_transport, 'packname')},
- reload_func=reload_func)
+ access = pack_repo._DirectPackAccess(
+ {'foo':(failing_transport, 'packname')},
+ reload_func=reload_func)
# Asking for a single record will not trigger the Mock failure
self.assertEqual(['1234567890'],
list(access.get_raw_records(memos[:1])))
@@ -561,7 +564,8 @@
failing_transport = MockReadvFailingTransport(
[transport.get_bytes('packname')])
reload_called, reload_func = self.make_reload_func()
- access = _DirectPackAccess({'foo':(failing_transport, 'packname')})
+ access = pack_repo._DirectPackAccess(
+ {'foo':(failing_transport, 'packname')})
# Asking for a single record will not trigger the Mock failure
self.assertEqual(['1234567890'],
list(access.get_raw_records(memos[:1])))
@@ -572,14 +576,14 @@
access.get_raw_records, memos)
def test_reload_or_raise_no_reload(self):
- access = _DirectPackAccess({}, reload_func=None)
+ access = pack_repo._DirectPackAccess({}, reload_func=None)
retry_exc = self.make_retry_exception()
# Without a reload_func, we will just re-raise the original exception
self.assertRaises(_TestException, access.reload_or_raise, retry_exc)
def test_reload_or_raise_reload_changed(self):
reload_called, reload_func = self.make_reload_func(return_val=True)
- access = _DirectPackAccess({}, reload_func=reload_func)
+ access = pack_repo._DirectPackAccess({}, reload_func=reload_func)
retry_exc = self.make_retry_exception()
access.reload_or_raise(retry_exc)
self.assertEqual([1], reload_called)
@@ -589,7 +593,7 @@
def test_reload_or_raise_reload_no_change(self):
reload_called, reload_func = self.make_reload_func(return_val=False)
- access = _DirectPackAccess({}, reload_func=reload_func)
+ access = pack_repo._DirectPackAccess({}, reload_func=reload_func)
retry_exc = self.make_retry_exception()
# If reload_occurred is False, then we consider it an error to have
# reload_func() return False (no changes).
More information about the bazaar-commits
mailing list