Rev 3865: (mbp, for jam) retry when pack operations fail in file:///home/pqm/archives/thelove/bzr/%2Btrunk/
Canonical.com Patch Queue Manager
pqm at pqm.ubuntu.com
Thu Nov 27 08:26:56 GMT 2008
At file:///home/pqm/archives/thelove/bzr/%2Btrunk/
------------------------------------------------------------
revno: 3865
revision-id: pqm at pqm.ubuntu.com-20081127082650-adzra5ok5apue0gl
parent: pqm at pqm.ubuntu.com-20081127074015-oigcdk8r4oxi1qse
parent: mbp at sourcefrog.net-20081127072552-st8jnmahi0iy3lrt
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Thu 2008-11-27 08:26:50 +0000
message:
(mbp, for jam) retry when pack operations fail
modified:
NEWS NEWS-20050323055033-4e00b5db738777ff
bzrlib/errors.py errors.py-20050309040759-20512168c4e14fbd
bzrlib/knit.py knit.py-20051212171256-f056ac8f0fbe1bd9
bzrlib/repofmt/pack_repo.py pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
bzrlib/tests/test_knit.py test_knit.py-20051212171302-95d4c00dd5f11f2b
bzrlib/tests/test_pack_repository.py test_pack_repository-20080801043947-eaw0e6h2gu75kwmy-1
------------------------------------------------------------
revno: 3860.1.1
revision-id: mbp at sourcefrog.net-20081127072552-st8jnmahi0iy3lrt
parent: pqm at pqm.ubuntu.com-20081127031840-o1i61l3vf2yin615
parent: john at arbash-meinel.com-20081025024257-sbi10z5ddf6rc93r
committer: Martin Pool <mbp at sourcefrog.net>
branch nick: 153786-retry
timestamp: Thu 2008-11-27 18:25:52 +1100
message:
merge fix to retry when packs have change
modified:
NEWS NEWS-20050323055033-4e00b5db738777ff
bzrlib/errors.py errors.py-20050309040759-20512168c4e14fbd
bzrlib/knit.py knit.py-20051212171256-f056ac8f0fbe1bd9
bzrlib/repofmt/pack_repo.py pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
bzrlib/tests/test_knit.py test_knit.py-20051212171302-95d4c00dd5f11f2b
bzrlib/tests/test_pack_repository.py test_pack_repository-20080801043947-eaw0e6h2gu75kwmy-1
------------------------------------------------------------
revno: 3789.2.15
revision-id: john at arbash-meinel.com-20081025024257-sbi10z5ddf6rc93r
parent: john at arbash-meinel.com-20081025024012-1vobzpomrefm43a7
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: pack_retry_153786
timestamp: Fri 2008-10-24 21:42:57 -0500
message:
NEWS about fixing bug #153786
modified:
NEWS NEWS-20050323055033-4e00b5db738777ff
------------------------------------------------------------
revno: 3789.2.14
revision-id: john at arbash-meinel.com-20081025024012-1vobzpomrefm43a7
parent: john at arbash-meinel.com-20081025023735-bsht1lfu6jye6z7o
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: pack_retry_153786
timestamp: Fri 2008-10-24 21:40:12 -0500
message:
Update AggregateIndex to pass the reload_func into _DirectPackAccess
And update the pack_repository tests so they now pass, yipee!
modified:
bzrlib/repofmt/pack_repo.py pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
bzrlib/tests/test_pack_repository.py test_pack_repository-20080801043947-eaw0e6h2gu75kwmy-1
------------------------------------------------------------
revno: 3789.2.13
revision-id: john at arbash-meinel.com-20081025023735-bsht1lfu6jye6z7o
parent: john at arbash-meinel.com-20081025022225-wbds6xl8t5ptod6p
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: pack_retry_153786
timestamp: Fri 2008-10-24 21:37:35 -0500
message:
KnitVersionedFile.annotate() now retries when appropriate.
As near as I can tell, this is the last code that works in terms of records,
which is the last place that requires the indexes and packs to stay in perfect
sync.
modified:
bzrlib/knit.py knit.py-20051212171256-f056ac8f0fbe1bd9
bzrlib/tests/test_knit.py test_knit.py-20051212171302-95d4c00dd5f11f2b
------------------------------------------------------------
revno: 3789.2.12
revision-id: john at arbash-meinel.com-20081025022225-wbds6xl8t5ptod6p
parent: john at arbash-meinel.com-20081025015845-7424d9znf6v36kuv
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: pack_retry_153786
timestamp: Fri 2008-10-24 21:22:25 -0500
message:
iter_lines_added_or_present now retries.
Change the test to use 3 packs/revisions, as it makes it more likely that we
will have already processed some of the results, before we finally run into
the missing pack file.
modified:
bzrlib/knit.py knit.py-20051212171256-f056ac8f0fbe1bd9
bzrlib/tests/test_knit.py test_knit.py-20051212171302-95d4c00dd5f11f2b
------------------------------------------------------------
revno: 3789.2.11
revision-id: john at arbash-meinel.com-20081025015845-7424d9znf6v36kuv
parent: john at arbash-meinel.com-20081025014248-zhy0bg5nf238vc29
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: pack_retry_153786
timestamp: Fri 2008-10-24 20:58:45 -0500
message:
KnitVersionedFile.get_record_stream now retries *and* fails correctly.
modified:
bzrlib/knit.py knit.py-20051212171256-f056ac8f0fbe1bd9
bzrlib/tests/test_knit.py test_knit.py-20051212171302-95d4c00dd5f11f2b
------------------------------------------------------------
revno: 3789.2.10
revision-id: john at arbash-meinel.com-20081025014248-zhy0bg5nf238vc29
parent: john at arbash-meinel.com-20081025003853-3orjg3p78750qp4r
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: pack_retry_153786
timestamp: Fri 2008-10-24 20:42:48 -0500
message:
The first function for KnitVersionedFiles can now retry on request.
_get_record_map() now includes logic to retry operations if they fail due to a
missing .pack() file.
modified:
bzrlib/errors.py errors.py-20050309040759-20512168c4e14fbd
bzrlib/knit.py knit.py-20051212171256-f056ac8f0fbe1bd9
bzrlib/tests/test_knit.py test_knit.py-20051212171302-95d4c00dd5f11f2b
------------------------------------------------------------
revno: 3789.2.9
revision-id: john at arbash-meinel.com-20081025003853-3orjg3p78750qp4r
parent: john at arbash-meinel.com-20081025002729-m1a8c7gdfr1v2ezb
parent: john at arbash-meinel.com-20081025003007-5xam89uv2d1b2pdb
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: pack_retry_153786
timestamp: Fri 2008-10-24 19:38:53 -0500
message:
Merge in the readv_close patch, which brings in bzr.dev
modified:
NEWS NEWS-20050323055033-4e00b5db738777ff
bzrlib/_walkdirs_win32.pyx _walkdirs_win32.pyx-20080716220454-kweh3tgxez5dvw2l-2
bzrlib/branch.py branch.py-20050309040759-e4baf4e0d046576e
bzrlib/btree_index.py index.py-20080624222253-p0x5f92uyh5hw734-7
bzrlib/chunk_writer.py chunk_writer.py-20080630234519-6ggn4id17nipovny-1
bzrlib/help_topics/en/hooks.txt hooks.txt-20070830033044-xxu2rced13f72dka-1
bzrlib/index.py index.py-20070712131115-lolkarso50vjr64s-1
bzrlib/plugins/launchpad/account.py account.py-20071011033320-50y6vfftywf4yllw-1
bzrlib/plugins/launchpad/test_account.py test_account.py-20071011033320-50y6vfftywf4yllw-2
bzrlib/python-compat.h pythoncompat.h-20080924041409-9kvi0fgtuuqp743j-1
bzrlib/repofmt/pack_repo.py pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
bzrlib/tests/branch_implementations/test_stacking.py test_stacking.py-20080214020755-msjlkb7urobwly0f-1
bzrlib/tests/test_btree_index.py test_index.py-20080624222253-p0x5f92uyh5hw734-13
bzrlib/tests/test_chunk_writer.py test_chunk_writer.py-20080630234519-6ggn4id17nipovny-2
bzrlib/tests/test_index.py test_index.py-20070712131115-lolkarso50vjr64s-2
bzrlib/tests/test_repository.py test_repository.py-20060131075918-65c555b881612f4d
bzrlib/transport/__init__.py transport.py-20050711165921-4978aa7ce1285ad5
bzrlib/win32utils.py win32console.py-20051021033308-123c6c929d04973d
------------------------------------------------------------
revno: 3789.2.8
revision-id: john at arbash-meinel.com-20081025002729-m1a8c7gdfr1v2ezb
parent: john at arbash-meinel.com-20081024220902-ubncv0s37s7npwvp
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: pack_retry_153786
timestamp: Fri 2008-10-24 19:27:29 -0500
message:
Add a test that KnitPackRepository.get_record_stream retries when appropriate.
modified:
bzrlib/tests/test_pack_repository.py test_pack_repository-20080801043947-eaw0e6h2gu75kwmy-1
------------------------------------------------------------
revno: 3789.2.7
revision-id: john at arbash-meinel.com-20081024220902-ubncv0s37s7npwvp
parent: john at arbash-meinel.com-20081024220749-on46bi1n2ee170uh
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: pack_retry_153786
timestamp: Fri 2008-10-24 17:09:02 -0500
message:
Document what to do next
modified:
bzrlib/tests/test_knit.py test_knit.py-20051212171302-95d4c00dd5f11f2b
------------------------------------------------------------
revno: 3789.2.6
revision-id: john at arbash-meinel.com-20081024220749-on46bi1n2ee170uh
parent: john at arbash-meinel.com-20081024215259-xh1qcfeu9bdy6350
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: pack_retry_153786
timestamp: Fri 2008-10-24 17:07:49 -0500
message:
Make _DirectPackAccess.reload_or_raise maintain the logic.
This way the exception that is raised by _DirectPackAccess is also properly
handled by DPA. So we can keep the logic in sync.
modified:
bzrlib/knit.py knit.py-20051212171256-f056ac8f0fbe1bd9
bzrlib/tests/test_knit.py test_knit.py-20051212171302-95d4c00dd5f11f2b
------------------------------------------------------------
revno: 3789.2.5
revision-id: john at arbash-meinel.com-20081024215259-xh1qcfeu9bdy6350
parent: john at arbash-meinel.com-20081024205359-dqeht8yxzc6b6r89
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: pack_retry_153786
timestamp: Fri 2008-10-24 16:52:59 -0500
message:
Change _DirectPackAccess to only raise Retry when _reload_func is defined.
modified:
bzrlib/knit.py knit.py-20051212171256-f056ac8f0fbe1bd9
bzrlib/tests/test_knit.py test_knit.py-20051212171302-95d4c00dd5f11f2b
------------------------------------------------------------
revno: 3789.2.4
revision-id: john at arbash-meinel.com-20081024205359-dqeht8yxzc6b6r89
parent: john at arbash-meinel.com-20081024203126-koei1a77u0qa4abq
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: pack_retry_153786
timestamp: Fri 2008-10-24 15:53:59 -0500
message:
Add a multiple-record test, though it isn't quite what we want for the readv tests.
Mark functions that we want to make retry.
modified:
bzrlib/knit.py knit.py-20051212171256-f056ac8f0fbe1bd9
bzrlib/tests/test_knit.py test_knit.py-20051212171302-95d4c00dd5f11f2b
------------------------------------------------------------
revno: 3789.2.3
revision-id: john at arbash-meinel.com-20081024203126-koei1a77u0qa4abq
parent: john at arbash-meinel.com-20081024202643-1n4w1g6mg2jgb9r0
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: pack_retry_153786
timestamp: Fri 2008-10-24 15:31:26 -0500
message:
Change the mocking a bit, so we can be sure it is failing at the right time.
modified:
bzrlib/tests/test_knit.py test_knit.py-20051212171302-95d4c00dd5f11f2b
------------------------------------------------------------
revno: 3789.2.2
revision-id: john at arbash-meinel.com-20081024202643-1n4w1g6mg2jgb9r0
parent: john at arbash-meinel.com-20081024201311-4kteabxxs3istdan
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: pack_retry_153786
timestamp: Fri 2008-10-24 15:26:43 -0500
message:
Test that a readv() failing after yielding data will still raise Retry
modified:
bzrlib/tests/test_knit.py test_knit.py-20051212171302-95d4c00dd5f11f2b
------------------------------------------------------------
revno: 3789.2.1
revision-id: john at arbash-meinel.com-20081024201311-4kteabxxs3istdan
parent: john at arbash-meinel.com-20081023210643-6pxsgdybl89n0tz9
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: pack_retry_153786
timestamp: Fri 2008-10-24 15:13:11 -0500
message:
_DirectPackAccess can now raise RetryWithNewPacks when we think something has happened.
modified:
bzrlib/errors.py errors.py-20050309040759-20512168c4e14fbd
bzrlib/knit.py knit.py-20051212171256-f056ac8f0fbe1bd9
bzrlib/tests/test_knit.py test_knit.py-20051212171302-95d4c00dd5f11f2b
=== modified file 'NEWS'
--- a/NEWS 2008-11-27 07:40:15 +0000
+++ b/NEWS 2008-11-27 08:26:50 +0000
@@ -56,6 +56,10 @@
cause "Revision X not present in Y" when later accessing them.
(Martin Pool, #288751)
+ * Pack repositories are now able to reload the pack listing and retry
+ the current operation if another action causes the data to be
+ repacked. (John Arbash Meinel, #153786)
+
* PermissionDenied errors from smart servers no longer cause
"PermissionDenied: "None"" on the client.
(Andrew Bennetts, #299254)
=== modified file 'bzrlib/errors.py'
--- a/bzrlib/errors.py 2008-11-03 21:52:46 +0000
+++ b/bzrlib/errors.py 2008-11-27 07:25:52 +0000
@@ -1486,6 +1486,38 @@
self.options = options
+class RetryWithNewPacks(BzrError):
+ """Raised when we realize that the packs on disk have changed.
+
+ This is meant as more of a signaling exception, to trap between where a
+ local error occurred and the code that can actually handle the error and
+ code that can retry appropriately.
+ """
+
+ internal_error = True
+
+ _fmt = ("Pack files have changed, reload and retry. %(orig_error)s")
+
+ def __init__(self, reload_occurred, exc_info):
+ """create a new RestartWithNewPacks error.
+
+ :param reload_occurred: Set to True if we know that the packs have
+ already been reloaded, and we are failing because of an in-memory
+ cache miss. If set to True then we will ignore if a reload says
+ nothing has changed, because we assume it has already reloaded. If
+ False, then a reload with nothing changed will force an error.
+ :param exc_info: The original exception traceback, so if there is a
+ problem we can raise the original error (value from sys.exc_info())
+ """
+ BzrError.__init__(self)
+ self.reload_occurred = reload_occurred
+ self.exc_info = exc_info
+ self.orig_error = exc_info[1]
+ # TODO: The global error handler should probably treat this by
+ # raising/printing the original exception with a bit about
+ # RetryWithNewPacks also not being caught
+
+
class NoSuchExportFormat(BzrError):
_fmt = "Export format %(format)r not supported"
=== modified file 'bzrlib/knit.py'
--- a/bzrlib/knit.py 2008-11-27 06:17:08 +0000
+++ b/bzrlib/knit.py 2008-11-27 08:26:50 +0000
@@ -64,6 +64,7 @@
from itertools import izip, chain
import operator
import os
+import sys
from bzrlib.lazy_import import lazy_import
lazy_import(globals(), """
@@ -707,7 +708,7 @@
"""
def __init__(self, index, data_access, max_delta_chain=200,
- annotated=False):
+ annotated=False, reload_func=None):
"""Create a KnitVersionedFiles with index and data_access.
:param index: The index for the knit data.
@@ -717,6 +718,9 @@
insertion. Set to 0 to prohibit the use of deltas.
:param annotated: Set to True to cause annotations to be calculated and
stored during insertion.
+ :param reload_func: An function that can be called if we think we need
+ to reload the pack listing and try again. See
+ 'bzrlib.repofmt.pack_repo.AggregateIndex' for the signature.
"""
self._index = index
self._access = data_access
@@ -726,6 +730,7 @@
else:
self._factory = KnitPlainFactory()
self._fallback_vfs = []
+ self._reload_func = reload_func
def __repr__(self):
return "%s(%r, %r)" % (
@@ -1109,17 +1114,28 @@
:param allow_missing: If some records are missing, rather than
error, just return the data that could be generated.
"""
- position_map = self._get_components_positions(keys,
- allow_missing=allow_missing)
- # key = component_id, r = record_details, i_m = index_memo, n = next
- records = [(key, i_m) for key, (r, i_m, n)
- in position_map.iteritems()]
- record_map = {}
- for key, record, digest in \
- self._read_records_iter(records):
- (record_details, index_memo, next) = position_map[key]
- record_map[key] = record, record_details, digest, next
- return record_map
+ # This retries the whole request if anything fails. Potentially we
+ # could be a bit more selective. We could track the keys whose records
+ # we have successfully found, and then only request the new records
+ # from there. However, _get_components_positions grabs the whole build
+ # chain, which means we'll likely try to grab the same records again
+ # anyway. Also, can the build chains change as part of a pack
+ # operation? We wouldn't want to end up with a broken chain.
+ while True:
+ try:
+ position_map = self._get_components_positions(keys,
+ allow_missing=allow_missing)
+ # key = component_id, r = record_details, i_m = index_memo,
+ # n = next
+ records = [(key, i_m) for key, (r, i_m, n)
+ in position_map.iteritems()]
+ record_map = {}
+ for key, record, digest in self._read_records_iter(records):
+ (record_details, index_memo, next) = position_map[key]
+ record_map[key] = record, record_details, digest, next
+ return record_map
+ except errors.RetryWithNewPacks, e:
+ self._access.reload_or_raise(e)
def _split_by_prefix(self, keys):
"""For the given keys, split them up based on their prefix.
@@ -1160,6 +1176,22 @@
if not self._index.has_graph:
# Cannot topological order when no graph has been stored.
ordering = 'unordered'
+
+ remaining_keys = keys
+ while True:
+ try:
+ keys = set(remaining_keys)
+ for content_factory in self._get_remaining_record_stream(keys,
+ ordering, include_delta_closure):
+ remaining_keys.discard(content_factory.key)
+ yield content_factory
+ return
+ except errors.RetryWithNewPacks, e:
+ self._access.reload_or_raise(e)
+
+ def _get_remaining_record_stream(self, keys, ordering,
+ include_delta_closure):
+ """This function is the 'retry' portion for get_record_stream."""
if include_delta_closure:
positions = self._get_components_positions(keys, allow_missing=True)
else:
@@ -1457,30 +1489,39 @@
pb = progress.DummyProgress()
keys = set(keys)
total = len(keys)
- # we don't care about inclusions, the caller cares.
- # but we need to setup a list of records to visit.
- # we need key, position, length
- key_records = []
- build_details = self._index.get_build_details(keys)
- for key, details in build_details.iteritems():
- if key in keys:
- key_records.append((key, details[0]))
- keys.remove(key)
- records_iter = enumerate(self._read_records_iter(key_records))
- for (key_idx, (key, data, sha_value)) in records_iter:
- pb.update('Walking content.', key_idx, total)
- compression_parent = build_details[key][1]
- if compression_parent is None:
- # fulltext
- line_iterator = self._factory.get_fulltext_content(data)
- else:
- # Delta
- line_iterator = self._factory.get_linedelta_content(data)
- # XXX: It might be more efficient to yield (key,
- # line_iterator) in the future. However for now, this is a simpler
- # change to integrate into the rest of the codebase. RBC 20071110
- for line in line_iterator:
- yield line, key
+ done = False
+ while not done:
+ try:
+ # we don't care about inclusions, the caller cares.
+ # but we need to setup a list of records to visit.
+ # we need key, position, length
+ key_records = []
+ build_details = self._index.get_build_details(keys)
+ for key, details in build_details.iteritems():
+ if key in keys:
+ key_records.append((key, details[0]))
+ records_iter = enumerate(self._read_records_iter(key_records))
+ for (key_idx, (key, data, sha_value)) in records_iter:
+ pb.update('Walking content.', key_idx, total)
+ compression_parent = build_details[key][1]
+ if compression_parent is None:
+ # fulltext
+ line_iterator = self._factory.get_fulltext_content(data)
+ else:
+ # Delta
+ line_iterator = self._factory.get_linedelta_content(data)
+ # Now that we are yielding the data for this key, remove it
+ # from the list
+ keys.remove(key)
+ # XXX: It might be more efficient to yield (key,
+ # line_iterator) in the future. However for now, this is a
+ # simpler change to integrate into the rest of the
+ # codebase. RBC 20071110
+ for line in line_iterator:
+ yield line, key
+ done = True
+ except errors.RetryWithNewPacks, e:
+ self._access.reload_or_raise(e)
# If there are still keys we've not yet found, we look in the fallback
# vfs, and hope to find them there. Note that if the keys are found
# but had no changes or no content, the fallback may not return
@@ -1893,7 +1934,6 @@
extra information about the content which needs to be passed to
Factory.parse_record
"""
- prefixes = self._partition_keys(keys)
parent_map = self.get_parent_map(keys)
result = {}
for key in keys:
@@ -2419,15 +2459,19 @@
class _DirectPackAccess(object):
"""Access to data in one or more packs with less translation."""
- def __init__(self, index_to_packs):
+ def __init__(self, index_to_packs, reload_func=None):
"""Create a _DirectPackAccess object.
:param index_to_packs: A dict mapping index objects to the transport
and file names for obtaining data.
+ :param reload_func: A function to call if we determine that the pack
+ files have moved and we need to reload our caches. See
+ bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
"""
self._container_writer = None
self._write_index = None
self._indices = index_to_packs
+ self._reload_func = reload_func
def add_raw_records(self, key_sizes, raw_data):
"""Add raw knit bytes to a storage area.
@@ -2479,10 +2523,29 @@
if current_index is not None:
request_lists.append((current_index, current_list))
for index, offsets in request_lists:
- transport, path = self._indices[index]
- reader = pack.make_readv_reader(transport, path, offsets)
- for names, read_func in reader.iter_records():
- yield read_func(None)
+ try:
+ transport, path = self._indices[index]
+ except KeyError:
+ # A KeyError here indicates that someone has triggered an index
+ # reload, and this index has gone missing, we need to start
+ # over.
+ if self._reload_func is None:
+ # If we don't have a _reload_func there is nothing that can
+ # be done
+ raise
+ raise errors.RetryWithNewPacks(reload_occurred=True,
+ exc_info=sys.exc_info())
+ try:
+ reader = pack.make_readv_reader(transport, path, offsets)
+ for names, read_func in reader.iter_records():
+ yield read_func(None)
+ except errors.NoSuchFile:
+ # A NoSuchFile error indicates that a pack file has gone
+ # missing on disk, we need to trigger a reload, and start over.
+ if self._reload_func is None:
+ raise
+ raise errors.RetryWithNewPacks(reload_occurred=False,
+ exc_info=sys.exc_info())
def set_writer(self, writer, index, transport_packname):
"""Set a writer to use for adding data."""
@@ -2491,6 +2554,35 @@
self._container_writer = writer
self._write_index = index
+ def reload_or_raise(self, retry_exc):
+ """Try calling the reload function, or re-raise the original exception.
+
+ This should be called after _DirectPackAccess raises a
+ RetryWithNewPacks exception. This function will handle the common logic
+ of determining when the error is fatal versus being temporary.
+ It will also make sure that the original exception is raised, rather
+ than the RetryWithNewPacks exception.
+
+ If this function returns, then the calling function should retry
+ whatever operation was being performed. Otherwise an exception will
+ be raised.
+
+ :param retry_exc: A RetryWithNewPacks exception.
+ """
+ is_error = False
+ if self._reload_func is None:
+ is_error = True
+ elif not self._reload_func():
+ # The reload claimed that nothing changed
+ if not retry_exc.reload_occurred:
+ # If there wasn't an earlier reload, then we really were
+ # expecting to find changes. We didn't find them, so this is a
+ # hard error
+ is_error = True
+ if is_error:
+ exc_class, exc_value, exc_traceback = retry_exc.exc_info
+ raise exc_class, exc_value, exc_traceback
+
# Deprecated, use PatienceSequenceMatcher instead
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
@@ -2769,11 +2861,17 @@
if len(self._knit._fallback_vfs) > 0:
# stacked knits can't use the fast path at present.
return self._simple_annotate(key)
- records = self._get_build_graph(key)
- if key in self._ghosts:
- raise errors.RevisionNotPresent(key, self._knit)
- self._annotate_records(records)
- return self._annotated_lines[key]
+ while True:
+ try:
+ records = self._get_build_graph(key)
+ if key in self._ghosts:
+ raise errors.RevisionNotPresent(key, self._knit)
+ self._annotate_records(records)
+ return self._annotated_lines[key]
+ except errors.RetryWithNewPacks, e:
+ self._knit._access.reload_or_raise(e)
+ # The cached build_details are no longer valid
+ self._all_build_details.clear()
def _simple_annotate(self, key):
"""Return annotated fulltext, rediffing from the full texts.
=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py 2008-11-21 20:24:15 +0000
+++ b/bzrlib/repofmt/pack_repo.py 2008-11-27 07:25:52 +0000
@@ -474,7 +474,8 @@
self._reload_func = reload_func
self.index_to_pack = {}
self.combined_index = CombinedGraphIndex([], reload_func=reload_func)
- self.data_access = _DirectPackAccess(self.index_to_pack)
+ self.data_access = _DirectPackAccess(self.index_to_pack,
+ reload_func=reload_func)
self.add_callback = None
def replace_indices(self, index_to_pack, indices):
=== modified file 'bzrlib/tests/test_knit.py'
--- a/bzrlib/tests/test_knit.py 2008-11-14 07:52:16 +0000
+++ b/bzrlib/tests/test_knit.py 2008-11-27 07:25:52 +0000
@@ -48,6 +48,7 @@
_KnitKeyAccess,
make_file_factory,
)
+from bzrlib.repofmt import pack_repo
from bzrlib.tests import (
Feature,
KnownFailure,
@@ -270,6 +271,24 @@
return queue_call
+class MockReadvFailingTransport(MockTransport):
+ """Fail in the middle of a readv() result.
+
+ This Transport will successfully yield the first two requested hunks, but
+ raise NoSuchFile for the rest.
+ """
+
+ def readv(self, relpath, offsets):
+ count = 0
+ for result in MockTransport.readv(self, relpath, offsets):
+ count += 1
+ # we use 2 because the first offset is the pack header, the second
+ # is the first actual content requset
+ if count > 2:
+ raise errors.NoSuchFile(relpath)
+ yield result
+
+
class KnitRecordAccessTestsMixin(object):
"""Tests for getting and putting knit records."""
@@ -304,7 +323,11 @@
mapper = ConstantMapper("foo")
access = _KnitKeyAccess(self.get_transport(), mapper)
return access
-
+
+
+class _TestException(Exception):
+ """Just an exception for local tests to use."""
+
class TestPackKnitAccess(TestCaseWithMemoryTransport, KnitRecordAccessTestsMixin):
"""Tests for the pack based access."""
@@ -322,6 +345,88 @@
access.set_writer(writer, index, (transport, packname))
return access, writer
+ def make_pack_file(self):
+ """Create a pack file with 2 records."""
+ access, writer = self._get_access(packname='packname', index='foo')
+ memos = []
+ memos.extend(access.add_raw_records([('key1', 10)], '1234567890'))
+ memos.extend(access.add_raw_records([('key2', 5)], '12345'))
+ writer.end()
+ return memos
+
+ def make_vf_for_retrying(self):
+ """Create 3 packs and a reload function.
+
+ Originally, 2 pack files will have the data, but one will be missing.
+ And then the third will be used in place of the first two if reload()
+ is called.
+
+ :return: (versioned_file, reload_counter)
+ versioned_file a KnitVersionedFiles using the packs for access
+ """
+ tree = self.make_branch_and_memory_tree('tree')
+ tree.lock_write()
+ try:
+ tree.add([''], ['root-id'])
+ tree.commit('one', rev_id='rev-1')
+ tree.commit('two', rev_id='rev-2')
+ tree.commit('three', rev_id='rev-3')
+ # Pack these two revisions into another pack file, but don't remove
+ # the originials
+ repo = tree.branch.repository
+ collection = repo._pack_collection
+ collection.ensure_loaded()
+ orig_packs = collection.packs
+ packer = pack_repo.Packer(collection, orig_packs, '.testpack')
+ new_pack = packer.pack()
+
+ vf = tree.branch.repository.revisions
+ finally:
+ tree.unlock()
+ tree.branch.repository.lock_read()
+ self.addCleanup(tree.branch.repository.unlock)
+ del tree
+ # Set up a reload() function that switches to using the new pack file
+ new_index = new_pack.revision_index
+ access_tuple = new_pack.access_tuple()
+ reload_counter = [0, 0, 0]
+ def reload():
+ reload_counter[0] += 1
+ if reload_counter[1] > 0:
+ # We already reloaded, nothing more to do
+ reload_counter[2] += 1
+ return False
+ reload_counter[1] += 1
+ vf._index._graph_index._indices[:] = [new_index]
+ vf._access._indices.clear()
+ vf._access._indices[new_index] = access_tuple
+ return True
+ # Delete one of the pack files so the data will need to be reloaded. We
+ # will delete the file with 'rev-2' in it
+ trans, name = orig_packs[1].access_tuple()
+ trans.delete(name)
+ # We don't have the index trigger reloading because we want to test
+ # that we reload when the .pack disappears
+ vf._access._reload_func = reload
+ return vf, reload_counter
+
+ def make_reload_func(self, return_val=True):
+ reload_called = [0]
+ def reload():
+ reload_called[0] += 1
+ return return_val
+ return reload_called, reload
+
+ def make_retry_exception(self):
+ # We raise a real exception so that sys.exc_info() is properly
+ # populated
+ try:
+ raise _TestException('foobar')
+ except _TestException, e:
+ retry_exc = errors.RetryWithNewPacks(reload_occurred=False,
+ exc_info=sys.exc_info())
+ return retry_exc
+
def test_read_from_several_packs(self):
access, writer = self._get_access()
memos = []
@@ -363,6 +468,197 @@
writer.end()
self.assertEqual(['1234567890'], list(access.get_raw_records(memos)))
+ def test_missing_index_raises_retry(self):
+ memos = self.make_pack_file()
+ transport = self.get_transport()
+ reload_called, reload_func = self.make_reload_func()
+ # Note that the index key has changed from 'foo' to 'bar'
+ access = _DirectPackAccess({'bar':(transport, 'packname')},
+ reload_func=reload_func)
+ e = self.assertListRaises(errors.RetryWithNewPacks,
+ access.get_raw_records, memos)
+ # Because a key was passed in which does not match our index list, we
+ # assume that the listing was already reloaded
+ self.assertTrue(e.reload_occurred)
+ self.assertIsInstance(e.exc_info, tuple)
+ self.assertIs(e.exc_info[0], KeyError)
+ self.assertIsInstance(e.exc_info[1], KeyError)
+
+ def test_missing_index_raises_key_error_with_no_reload(self):
+ memos = self.make_pack_file()
+ transport = self.get_transport()
+ # Note that the index key has changed from 'foo' to 'bar'
+ access = _DirectPackAccess({'bar':(transport, 'packname')})
+ e = self.assertListRaises(KeyError, access.get_raw_records, memos)
+
+ def test_missing_file_raises_retry(self):
+ memos = self.make_pack_file()
+ transport = self.get_transport()
+ reload_called, reload_func = self.make_reload_func()
+ # Note that the 'filename' has been changed to 'different-packname'
+ access = _DirectPackAccess({'foo':(transport, 'different-packname')},
+ reload_func=reload_func)
+ e = self.assertListRaises(errors.RetryWithNewPacks,
+ access.get_raw_records, memos)
+ # The file has gone missing, so we assume we need to reload
+ self.assertFalse(e.reload_occurred)
+ self.assertIsInstance(e.exc_info, tuple)
+ self.assertIs(e.exc_info[0], errors.NoSuchFile)
+ self.assertIsInstance(e.exc_info[1], errors.NoSuchFile)
+ self.assertEqual('different-packname', e.exc_info[1].path)
+
+ def test_missing_file_raises_no_such_file_with_no_reload(self):
+ memos = self.make_pack_file()
+ transport = self.get_transport()
+ # Note that the 'filename' has been changed to 'different-packname'
+ access = _DirectPackAccess({'foo':(transport, 'different-packname')})
+ e = self.assertListRaises(errors.NoSuchFile,
+ access.get_raw_records, memos)
+
+ def test_failing_readv_raises_retry(self):
+ memos = self.make_pack_file()
+ transport = self.get_transport()
+ failing_transport = MockReadvFailingTransport(
+ [transport.get_bytes('packname')])
+ reload_called, reload_func = self.make_reload_func()
+ access = _DirectPackAccess({'foo':(failing_transport, 'packname')},
+ reload_func=reload_func)
+ # Asking for a single record will not trigger the Mock failure
+ self.assertEqual(['1234567890'],
+ list(access.get_raw_records(memos[:1])))
+ self.assertEqual(['12345'],
+ list(access.get_raw_records(memos[1:2])))
+ # A multiple offset readv() will fail mid-way through
+ e = self.assertListRaises(errors.RetryWithNewPacks,
+ access.get_raw_records, memos)
+ # The file has gone missing, so we assume we need to reload
+ self.assertFalse(e.reload_occurred)
+ self.assertIsInstance(e.exc_info, tuple)
+ self.assertIs(e.exc_info[0], errors.NoSuchFile)
+ self.assertIsInstance(e.exc_info[1], errors.NoSuchFile)
+ self.assertEqual('packname', e.exc_info[1].path)
+
+ def test_failing_readv_raises_no_such_file_with_no_reload(self):
+ memos = self.make_pack_file()
+ transport = self.get_transport()
+ failing_transport = MockReadvFailingTransport(
+ [transport.get_bytes('packname')])
+ reload_called, reload_func = self.make_reload_func()
+ access = _DirectPackAccess({'foo':(failing_transport, 'packname')})
+ # Asking for a single record will not trigger the Mock failure
+ self.assertEqual(['1234567890'],
+ list(access.get_raw_records(memos[:1])))
+ self.assertEqual(['12345'],
+ list(access.get_raw_records(memos[1:2])))
+ # A multiple offset readv() will fail mid-way through
+ e = self.assertListRaises(errors.NoSuchFile,
+ access.get_raw_records, memos)
+
+ def test_reload_or_raise_no_reload(self):
+ access = _DirectPackAccess({}, reload_func=None)
+ retry_exc = self.make_retry_exception()
+ # Without a reload_func, we will just re-raise the original exception
+ self.assertRaises(_TestException, access.reload_or_raise, retry_exc)
+
+ def test_reload_or_raise_reload_changed(self):
+ reload_called, reload_func = self.make_reload_func(return_val=True)
+ access = _DirectPackAccess({}, reload_func=reload_func)
+ retry_exc = self.make_retry_exception()
+ access.reload_or_raise(retry_exc)
+ self.assertEqual([1], reload_called)
+ retry_exc.reload_occurred=True
+ access.reload_or_raise(retry_exc)
+ self.assertEqual([2], reload_called)
+
+ def test_reload_or_raise_reload_no_change(self):
+ reload_called, reload_func = self.make_reload_func(return_val=False)
+ access = _DirectPackAccess({}, reload_func=reload_func)
+ retry_exc = self.make_retry_exception()
+ # If reload_occurred is False, then we consider it an error to have
+ # reload_func() return False (no changes).
+ self.assertRaises(_TestException, access.reload_or_raise, retry_exc)
+ self.assertEqual([1], reload_called)
+ retry_exc.reload_occurred=True
+ # If reload_occurred is True, then we assume nothing changed because
+ # it had changed earlier, but didn't change again
+ access.reload_or_raise(retry_exc)
+ self.assertEqual([2], reload_called)
+
+ def test_annotate_retries(self):
+ vf, reload_counter = self.make_vf_for_retrying()
+ # It is a little bit bogus to annotate the Revision VF, but it works,
+ # as we have ancestry stored there
+ key = ('rev-3',)
+ reload_lines = vf.annotate(key)
+ self.assertEqual([1, 1, 0], reload_counter)
+ plain_lines = vf.annotate(key)
+ self.assertEqual([1, 1, 0], reload_counter) # No extra reloading
+ if reload_lines != plain_lines:
+ self.fail('Annotation was not identical with reloading.')
+ # Now delete the packs-in-use, which should trigger another reload, but
+ # this time we just raise an exception because we can't recover
+ for trans, name in vf._access._indices.itervalues():
+ trans.delete(name)
+ self.assertRaises(errors.NoSuchFile, vf.annotate, key)
+ self.assertEqual([2, 1, 1], reload_counter)
+
+ def test__get_record_map_retries(self):
+ vf, reload_counter = self.make_vf_for_retrying()
+ keys = [('rev-1',), ('rev-2',), ('rev-3',)]
+ records = vf._get_record_map(keys)
+ self.assertEqual(keys, sorted(records.keys()))
+ self.assertEqual([1, 1, 0], reload_counter)
+ # Now delete the packs-in-use, which should trigger another reload, but
+ # this time we just raise an exception because we can't recover
+ for trans, name in vf._access._indices.itervalues():
+ trans.delete(name)
+ self.assertRaises(errors.NoSuchFile, vf._get_record_map, keys)
+ self.assertEqual([2, 1, 1], reload_counter)
+
+ def test_get_record_stream_retries(self):
+ vf, reload_counter = self.make_vf_for_retrying()
+ keys = [('rev-1',), ('rev-2',), ('rev-3',)]
+ record_stream = vf.get_record_stream(keys, 'topological', False)
+ record = record_stream.next()
+ self.assertEqual(('rev-1',), record.key)
+ self.assertEqual([0, 0, 0], reload_counter)
+ record = record_stream.next()
+ self.assertEqual(('rev-2',), record.key)
+ self.assertEqual([1, 1, 0], reload_counter)
+ record = record_stream.next()
+ self.assertEqual(('rev-3',), record.key)
+ self.assertEqual([1, 1, 0], reload_counter)
+ # Now delete all pack files, and see that we raise the right error
+ for trans, name in vf._access._indices.itervalues():
+ trans.delete(name)
+ self.assertListRaises(errors.NoSuchFile,
+ vf.get_record_stream, keys, 'topological', False)
+
+ def test_iter_lines_added_or_present_in_keys_retries(self):
+ vf, reload_counter = self.make_vf_for_retrying()
+ keys = [('rev-1',), ('rev-2',), ('rev-3',)]
+ # Unfortunately, iter_lines_added_or_present_in_keys iterates the
+ # result in random order (determined by the iteration order from a
+ # set()), so we don't have any solid way to trigger whether data is
+ # read before or after. However we tried to delete the middle node to
+ # exercise the code well.
+ # What we care about is that all lines are always yielded, but not
+ # duplicated
+ count = 0
+ reload_lines = sorted(vf.iter_lines_added_or_present_in_keys(keys))
+ self.assertEqual([1, 1, 0], reload_counter)
+ # Now do it again, to make sure the result is equivalent
+ plain_lines = sorted(vf.iter_lines_added_or_present_in_keys(keys))
+ self.assertEqual([1, 1, 0], reload_counter) # No extra reloading
+ self.assertEqual(plain_lines, reload_lines)
+ self.assertEqual(21, len(plain_lines))
+ # Now delete all pack files, and see that we raise the right error
+ for trans, name in vf._access._indices.itervalues():
+ trans.delete(name)
+ self.assertListRaises(errors.NoSuchFile,
+ vf.iter_lines_added_or_present_in_keys, keys)
+ self.assertEqual([2, 1, 1], reload_counter)
+
class LowLevelKnitDataTests(TestCase):
@@ -373,6 +669,26 @@
gz_file.close()
return sio.getvalue()
+ def make_multiple_records(self):
+ """Create the content for multiple records."""
+ sha1sum = osutils.sha('foo\nbar\n').hexdigest()
+ total_txt = []
+ gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
+ 'foo\n'
+ 'bar\n'
+ 'end rev-id-1\n'
+ % (sha1sum,))
+ record_1 = (0, len(gz_txt), sha1sum)
+ total_txt.append(gz_txt)
+ sha1sum = osutils.sha('baz\n').hexdigest()
+ gz_txt = self.create_gz_content('version rev-id-2 1 %s\n'
+ 'baz\n'
+ 'end rev-id-2\n'
+ % (sha1sum,))
+ record_2 = (record_1[1], len(gz_txt), sha1sum)
+ total_txt.append(gz_txt)
+ return total_txt, record_1, record_2
+
def test_valid_knit_data(self):
sha1sum = osutils.sha('foo\nbar\n').hexdigest()
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
@@ -392,6 +708,24 @@
raw_contents = list(knit._read_records_iter_raw(records))
self.assertEqual([(('rev-id-1',), gz_txt, sha1sum)], raw_contents)
+ def test_multiple_records_valid(self):
+ total_txt, record_1, record_2 = self.make_multiple_records()
+ transport = MockTransport([''.join(total_txt)])
+ access = _KnitKeyAccess(transport, ConstantMapper('filename'))
+ knit = KnitVersionedFiles(None, access)
+ records = [(('rev-id-1',), (('rev-id-1',), record_1[0], record_1[1])),
+ (('rev-id-2',), (('rev-id-2',), record_2[0], record_2[1]))]
+
+ contents = list(knit._read_records_iter(records))
+ self.assertEqual([(('rev-id-1',), ['foo\n', 'bar\n'], record_1[2]),
+ (('rev-id-2',), ['baz\n'], record_2[2])],
+ contents)
+
+ raw_contents = list(knit._read_records_iter_raw(records))
+ self.assertEqual([(('rev-id-1',), total_txt[0], record_1[2]),
+ (('rev-id-2',), total_txt[1], record_2[2])],
+ raw_contents)
+
def test_not_enough_lines(self):
sha1sum = osutils.sha('foo\n').hexdigest()
# record says 2 lines data says 1
=== modified file 'bzrlib/tests/test_pack_repository.py'
--- a/bzrlib/tests/test_pack_repository.py 2008-11-12 02:53:12 +0000
+++ b/bzrlib/tests/test_pack_repository.py 2008-11-27 07:25:52 +0000
@@ -420,6 +420,35 @@
finally:
tree.unlock()
+ def test_concurrent_pack_during_get_record_reloads(self):
+ tree = self.make_branch_and_tree('tree')
+ tree.lock_write()
+ try:
+ rev1 = tree.commit('one')
+ rev2 = tree.commit('two')
+ keys = [(rev1,), (rev2,)]
+ r2 = repository.Repository.open('tree')
+ r2.lock_read()
+ try:
+ # At this point, we will start grabbing a record stream, and
+ # trigger a repack mid-way
+ packed = False
+ result = {}
+ record_stream = r2.revisions.get_record_stream(keys,
+ 'unordered', False)
+ for record in record_stream:
+ result[record.key] = record
+ if not packed:
+ tree.branch.repository.pack()
+ packed = True
+ # The first record will be found in the original location, but
+ # after the pack, we have to reload to find the next record
+ self.assertEqual(sorted(keys), sorted(result.keys()))
+ finally:
+ r2.unlock()
+ finally:
+ tree.unlock()
+
def test_lock_write_does_not_physically_lock(self):
repo = self.make_repository('.', format=self.get_format())
repo.lock_write()
More information about the bazaar-commits
mailing list