Rev 3865: (mbp, for jam) retry when pack operations fail in file:///home/pqm/archives/thelove/bzr/%2Btrunk/

Canonical.com Patch Queue Manager pqm at pqm.ubuntu.com
Thu Nov 27 08:26:56 GMT 2008


At file:///home/pqm/archives/thelove/bzr/%2Btrunk/

------------------------------------------------------------
revno: 3865
revision-id: pqm at pqm.ubuntu.com-20081127082650-adzra5ok5apue0gl
parent: pqm at pqm.ubuntu.com-20081127074015-oigcdk8r4oxi1qse
parent: mbp at sourcefrog.net-20081127072552-st8jnmahi0iy3lrt
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Thu 2008-11-27 08:26:50 +0000
message:
  (mbp, for jam) retry when pack operations fail
modified:
  NEWS                           NEWS-20050323055033-4e00b5db738777ff
  bzrlib/errors.py               errors.py-20050309040759-20512168c4e14fbd
  bzrlib/knit.py                 knit.py-20051212171256-f056ac8f0fbe1bd9
  bzrlib/repofmt/pack_repo.py    pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
  bzrlib/tests/test_knit.py      test_knit.py-20051212171302-95d4c00dd5f11f2b
  bzrlib/tests/test_pack_repository.py test_pack_repository-20080801043947-eaw0e6h2gu75kwmy-1
    ------------------------------------------------------------
    revno: 3860.1.1
    revision-id: mbp at sourcefrog.net-20081127072552-st8jnmahi0iy3lrt
    parent: pqm at pqm.ubuntu.com-20081127031840-o1i61l3vf2yin615
    parent: john at arbash-meinel.com-20081025024257-sbi10z5ddf6rc93r
    committer: Martin Pool <mbp at sourcefrog.net>
    branch nick: 153786-retry
    timestamp: Thu 2008-11-27 18:25:52 +1100
    message:
      merge fix to retry when packs have change
    modified:
      NEWS                           NEWS-20050323055033-4e00b5db738777ff
      bzrlib/errors.py               errors.py-20050309040759-20512168c4e14fbd
      bzrlib/knit.py                 knit.py-20051212171256-f056ac8f0fbe1bd9
      bzrlib/repofmt/pack_repo.py    pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
      bzrlib/tests/test_knit.py      test_knit.py-20051212171302-95d4c00dd5f11f2b
      bzrlib/tests/test_pack_repository.py test_pack_repository-20080801043947-eaw0e6h2gu75kwmy-1
    ------------------------------------------------------------
    revno: 3789.2.15
    revision-id: john at arbash-meinel.com-20081025024257-sbi10z5ddf6rc93r
    parent: john at arbash-meinel.com-20081025024012-1vobzpomrefm43a7
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: pack_retry_153786
    timestamp: Fri 2008-10-24 21:42:57 -0500
    message:
      NEWS about fixing bug #153786
    modified:
      NEWS                           NEWS-20050323055033-4e00b5db738777ff
    ------------------------------------------------------------
    revno: 3789.2.14
    revision-id: john at arbash-meinel.com-20081025024012-1vobzpomrefm43a7
    parent: john at arbash-meinel.com-20081025023735-bsht1lfu6jye6z7o
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: pack_retry_153786
    timestamp: Fri 2008-10-24 21:40:12 -0500
    message:
      Update AggregateIndex to pass the reload_func into _DirectPackAccess
      
      And update the pack_repository tests so they now pass, yipee!
    modified:
      bzrlib/repofmt/pack_repo.py    pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
      bzrlib/tests/test_pack_repository.py test_pack_repository-20080801043947-eaw0e6h2gu75kwmy-1
    ------------------------------------------------------------
    revno: 3789.2.13
    revision-id: john at arbash-meinel.com-20081025023735-bsht1lfu6jye6z7o
    parent: john at arbash-meinel.com-20081025022225-wbds6xl8t5ptod6p
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: pack_retry_153786
    timestamp: Fri 2008-10-24 21:37:35 -0500
    message:
      KnitVersionedFile.annotate() now retries when appropriate.
      
      As near as I can tell, this is the last code that works in terms of records,
      which is the last place that requires the indexes and packs to stay in perfect
      sync.
    modified:
      bzrlib/knit.py                 knit.py-20051212171256-f056ac8f0fbe1bd9
      bzrlib/tests/test_knit.py      test_knit.py-20051212171302-95d4c00dd5f11f2b
    ------------------------------------------------------------
    revno: 3789.2.12
    revision-id: john at arbash-meinel.com-20081025022225-wbds6xl8t5ptod6p
    parent: john at arbash-meinel.com-20081025015845-7424d9znf6v36kuv
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: pack_retry_153786
    timestamp: Fri 2008-10-24 21:22:25 -0500
    message:
      iter_lines_added_or_present now retries.
      
      Change the test to use 3 packs/revisions, as it makes it more likely that we
      will have already processed some of the results, before we finally run into
      the missing pack file.
    modified:
      bzrlib/knit.py                 knit.py-20051212171256-f056ac8f0fbe1bd9
      bzrlib/tests/test_knit.py      test_knit.py-20051212171302-95d4c00dd5f11f2b
    ------------------------------------------------------------
    revno: 3789.2.11
    revision-id: john at arbash-meinel.com-20081025015845-7424d9znf6v36kuv
    parent: john at arbash-meinel.com-20081025014248-zhy0bg5nf238vc29
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: pack_retry_153786
    timestamp: Fri 2008-10-24 20:58:45 -0500
    message:
      KnitVersionedFile.get_record_stream now retries *and* fails correctly.
    modified:
      bzrlib/knit.py                 knit.py-20051212171256-f056ac8f0fbe1bd9
      bzrlib/tests/test_knit.py      test_knit.py-20051212171302-95d4c00dd5f11f2b
    ------------------------------------------------------------
    revno: 3789.2.10
    revision-id: john at arbash-meinel.com-20081025014248-zhy0bg5nf238vc29
    parent: john at arbash-meinel.com-20081025003853-3orjg3p78750qp4r
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: pack_retry_153786
    timestamp: Fri 2008-10-24 20:42:48 -0500
    message:
      The first function for KnitVersionedFiles can now retry on request.
      
      _get_record_map() now includes logic to retry operations if they fail due to a
      missing .pack() file.
    modified:
      bzrlib/errors.py               errors.py-20050309040759-20512168c4e14fbd
      bzrlib/knit.py                 knit.py-20051212171256-f056ac8f0fbe1bd9
      bzrlib/tests/test_knit.py      test_knit.py-20051212171302-95d4c00dd5f11f2b
    ------------------------------------------------------------
    revno: 3789.2.9
    revision-id: john at arbash-meinel.com-20081025003853-3orjg3p78750qp4r
    parent: john at arbash-meinel.com-20081025002729-m1a8c7gdfr1v2ezb
    parent: john at arbash-meinel.com-20081025003007-5xam89uv2d1b2pdb
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: pack_retry_153786
    timestamp: Fri 2008-10-24 19:38:53 -0500
    message:
      Merge in the readv_close patch, which brings in bzr.dev
    modified:
      NEWS                           NEWS-20050323055033-4e00b5db738777ff
      bzrlib/_walkdirs_win32.pyx     _walkdirs_win32.pyx-20080716220454-kweh3tgxez5dvw2l-2
      bzrlib/branch.py               branch.py-20050309040759-e4baf4e0d046576e
      bzrlib/btree_index.py          index.py-20080624222253-p0x5f92uyh5hw734-7
      bzrlib/chunk_writer.py         chunk_writer.py-20080630234519-6ggn4id17nipovny-1
      bzrlib/help_topics/en/hooks.txt hooks.txt-20070830033044-xxu2rced13f72dka-1
      bzrlib/index.py                index.py-20070712131115-lolkarso50vjr64s-1
      bzrlib/plugins/launchpad/account.py account.py-20071011033320-50y6vfftywf4yllw-1
      bzrlib/plugins/launchpad/test_account.py test_account.py-20071011033320-50y6vfftywf4yllw-2
      bzrlib/python-compat.h         pythoncompat.h-20080924041409-9kvi0fgtuuqp743j-1
      bzrlib/repofmt/pack_repo.py    pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
      bzrlib/tests/branch_implementations/test_stacking.py test_stacking.py-20080214020755-msjlkb7urobwly0f-1
      bzrlib/tests/test_btree_index.py test_index.py-20080624222253-p0x5f92uyh5hw734-13
      bzrlib/tests/test_chunk_writer.py test_chunk_writer.py-20080630234519-6ggn4id17nipovny-2
      bzrlib/tests/test_index.py     test_index.py-20070712131115-lolkarso50vjr64s-2
      bzrlib/tests/test_repository.py test_repository.py-20060131075918-65c555b881612f4d
      bzrlib/transport/__init__.py   transport.py-20050711165921-4978aa7ce1285ad5
      bzrlib/win32utils.py           win32console.py-20051021033308-123c6c929d04973d
    ------------------------------------------------------------
    revno: 3789.2.8
    revision-id: john at arbash-meinel.com-20081025002729-m1a8c7gdfr1v2ezb
    parent: john at arbash-meinel.com-20081024220902-ubncv0s37s7npwvp
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: pack_retry_153786
    timestamp: Fri 2008-10-24 19:27:29 -0500
    message:
      Add a test that KnitPackRepository.get_record_stream retries when appropriate.
    modified:
      bzrlib/tests/test_pack_repository.py test_pack_repository-20080801043947-eaw0e6h2gu75kwmy-1
    ------------------------------------------------------------
    revno: 3789.2.7
    revision-id: john at arbash-meinel.com-20081024220902-ubncv0s37s7npwvp
    parent: john at arbash-meinel.com-20081024220749-on46bi1n2ee170uh
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: pack_retry_153786
    timestamp: Fri 2008-10-24 17:09:02 -0500
    message:
      Document what to do next
    modified:
      bzrlib/tests/test_knit.py      test_knit.py-20051212171302-95d4c00dd5f11f2b
    ------------------------------------------------------------
    revno: 3789.2.6
    revision-id: john at arbash-meinel.com-20081024220749-on46bi1n2ee170uh
    parent: john at arbash-meinel.com-20081024215259-xh1qcfeu9bdy6350
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: pack_retry_153786
    timestamp: Fri 2008-10-24 17:07:49 -0500
    message:
      Make _DirectPackAccess.reload_or_raise maintain the logic.
      
      This way the exception that is raised by _DirectPackAccess is also properly
      handled by DPA. So we can keep the logic in sync.
    modified:
      bzrlib/knit.py                 knit.py-20051212171256-f056ac8f0fbe1bd9
      bzrlib/tests/test_knit.py      test_knit.py-20051212171302-95d4c00dd5f11f2b
    ------------------------------------------------------------
    revno: 3789.2.5
    revision-id: john at arbash-meinel.com-20081024215259-xh1qcfeu9bdy6350
    parent: john at arbash-meinel.com-20081024205359-dqeht8yxzc6b6r89
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: pack_retry_153786
    timestamp: Fri 2008-10-24 16:52:59 -0500
    message:
      Change _DirectPackAccess to only raise Retry when _reload_func is defined.
    modified:
      bzrlib/knit.py                 knit.py-20051212171256-f056ac8f0fbe1bd9
      bzrlib/tests/test_knit.py      test_knit.py-20051212171302-95d4c00dd5f11f2b
    ------------------------------------------------------------
    revno: 3789.2.4
    revision-id: john at arbash-meinel.com-20081024205359-dqeht8yxzc6b6r89
    parent: john at arbash-meinel.com-20081024203126-koei1a77u0qa4abq
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: pack_retry_153786
    timestamp: Fri 2008-10-24 15:53:59 -0500
    message:
      Add a multiple-record test, though it isn't quite what we want for the readv tests.
      Mark functions that we want to make retry.
    modified:
      bzrlib/knit.py                 knit.py-20051212171256-f056ac8f0fbe1bd9
      bzrlib/tests/test_knit.py      test_knit.py-20051212171302-95d4c00dd5f11f2b
    ------------------------------------------------------------
    revno: 3789.2.3
    revision-id: john at arbash-meinel.com-20081024203126-koei1a77u0qa4abq
    parent: john at arbash-meinel.com-20081024202643-1n4w1g6mg2jgb9r0
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: pack_retry_153786
    timestamp: Fri 2008-10-24 15:31:26 -0500
    message:
      Change the mocking a bit, so we can be sure it is failing at the right time.
    modified:
      bzrlib/tests/test_knit.py      test_knit.py-20051212171302-95d4c00dd5f11f2b
    ------------------------------------------------------------
    revno: 3789.2.2
    revision-id: john at arbash-meinel.com-20081024202643-1n4w1g6mg2jgb9r0
    parent: john at arbash-meinel.com-20081024201311-4kteabxxs3istdan
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: pack_retry_153786
    timestamp: Fri 2008-10-24 15:26:43 -0500
    message:
      Test that a readv() failing after yielding data will still raise Retry
    modified:
      bzrlib/tests/test_knit.py      test_knit.py-20051212171302-95d4c00dd5f11f2b
    ------------------------------------------------------------
    revno: 3789.2.1
    revision-id: john at arbash-meinel.com-20081024201311-4kteabxxs3istdan
    parent: john at arbash-meinel.com-20081023210643-6pxsgdybl89n0tz9
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: pack_retry_153786
    timestamp: Fri 2008-10-24 15:13:11 -0500
    message:
      _DirectPackAccess can now raise RetryWithNewPacks when we think something has happened.
    modified:
      bzrlib/errors.py               errors.py-20050309040759-20512168c4e14fbd
      bzrlib/knit.py                 knit.py-20051212171256-f056ac8f0fbe1bd9
      bzrlib/tests/test_knit.py      test_knit.py-20051212171302-95d4c00dd5f11f2b
=== modified file 'NEWS'
--- a/NEWS	2008-11-27 07:40:15 +0000
+++ b/NEWS	2008-11-27 08:26:50 +0000
@@ -56,6 +56,10 @@
       cause "Revision X not present in Y" when later accessing them.
       (Martin Pool, #288751)
 
+    * Pack repositories are now able to reload the pack listing and retry
+      the current operation if another action causes the data to be
+      repacked.  (John Arbash Meinel, #153786)
+
     * PermissionDenied errors from smart servers no longer cause
       "PermissionDenied: "None"" on the client.
       (Andrew Bennetts, #299254)

=== modified file 'bzrlib/errors.py'
--- a/bzrlib/errors.py	2008-11-03 21:52:46 +0000
+++ b/bzrlib/errors.py	2008-11-27 07:25:52 +0000
@@ -1486,6 +1486,38 @@
         self.options = options
 
 
+class RetryWithNewPacks(BzrError):
+    """Raised when we realize that the packs on disk have changed.
+
+    This is meant as more of a signaling exception, to trap between where a
+    local error occurred and the code that can actually handle the error and
+    code that can retry appropriately.
+    """
+
+    internal_error = True
+
+    _fmt = ("Pack files have changed, reload and retry. %(orig_error)s")
+
+    def __init__(self, reload_occurred, exc_info):
+        """create a new RestartWithNewPacks error.
+
+        :param reload_occurred: Set to True if we know that the packs have
+            already been reloaded, and we are failing because of an in-memory
+            cache miss. If set to True then we will ignore if a reload says
+            nothing has changed, because we assume it has already reloaded. If
+            False, then a reload with nothing changed will force an error.
+        :param exc_info: The original exception traceback, so if there is a
+            problem we can raise the original error (value from sys.exc_info())
+        """
+        BzrError.__init__(self)
+        self.reload_occurred = reload_occurred
+        self.exc_info = exc_info
+        self.orig_error = exc_info[1]
+        # TODO: The global error handler should probably treat this by
+        #       raising/printing the original exception with a bit about
+        #       RetryWithNewPacks also not being caught
+
+
 class NoSuchExportFormat(BzrError):
     
     _fmt = "Export format %(format)r not supported"

=== modified file 'bzrlib/knit.py'
--- a/bzrlib/knit.py	2008-11-27 06:17:08 +0000
+++ b/bzrlib/knit.py	2008-11-27 08:26:50 +0000
@@ -64,6 +64,7 @@
 from itertools import izip, chain
 import operator
 import os
+import sys
 
 from bzrlib.lazy_import import lazy_import
 lazy_import(globals(), """
@@ -707,7 +708,7 @@
     """
 
     def __init__(self, index, data_access, max_delta_chain=200,
-        annotated=False):
+                 annotated=False, reload_func=None):
         """Create a KnitVersionedFiles with index and data_access.
 
         :param index: The index for the knit data.
@@ -717,6 +718,9 @@
             insertion. Set to 0 to prohibit the use of deltas.
         :param annotated: Set to True to cause annotations to be calculated and
             stored during insertion.
+        :param reload_func: An function that can be called if we think we need
+            to reload the pack listing and try again. See
+            'bzrlib.repofmt.pack_repo.AggregateIndex' for the signature.
         """
         self._index = index
         self._access = data_access
@@ -726,6 +730,7 @@
         else:
             self._factory = KnitPlainFactory()
         self._fallback_vfs = []
+        self._reload_func = reload_func
 
     def __repr__(self):
         return "%s(%r, %r)" % (
@@ -1109,17 +1114,28 @@
         :param allow_missing: If some records are missing, rather than 
             error, just return the data that could be generated.
         """
-        position_map = self._get_components_positions(keys,
-            allow_missing=allow_missing)
-        # key = component_id, r = record_details, i_m = index_memo, n = next
-        records = [(key, i_m) for key, (r, i_m, n)
-                             in position_map.iteritems()]
-        record_map = {}
-        for key, record, digest in \
-                self._read_records_iter(records):
-            (record_details, index_memo, next) = position_map[key]
-            record_map[key] = record, record_details, digest, next
-        return record_map
+        # This retries the whole request if anything fails. Potentially we
+        # could be a bit more selective. We could track the keys whose records
+        # we have successfully found, and then only request the new records
+        # from there. However, _get_components_positions grabs the whole build
+        # chain, which means we'll likely try to grab the same records again
+        # anyway. Also, can the build chains change as part of a pack
+        # operation? We wouldn't want to end up with a broken chain.
+        while True:
+            try:
+                position_map = self._get_components_positions(keys,
+                    allow_missing=allow_missing)
+                # key = component_id, r = record_details, i_m = index_memo,
+                # n = next
+                records = [(key, i_m) for key, (r, i_m, n)
+                                       in position_map.iteritems()]
+                record_map = {}
+                for key, record, digest in self._read_records_iter(records):
+                    (record_details, index_memo, next) = position_map[key]
+                    record_map[key] = record, record_details, digest, next
+                return record_map
+            except errors.RetryWithNewPacks, e:
+                self._access.reload_or_raise(e)
 
     def _split_by_prefix(self, keys):
         """For the given keys, split them up based on their prefix.
@@ -1160,6 +1176,22 @@
         if not self._index.has_graph:
             # Cannot topological order when no graph has been stored.
             ordering = 'unordered'
+
+        remaining_keys = keys
+        while True:
+            try:
+                keys = set(remaining_keys)
+                for content_factory in self._get_remaining_record_stream(keys,
+                                            ordering, include_delta_closure):
+                    remaining_keys.discard(content_factory.key)
+                    yield content_factory
+                return
+            except errors.RetryWithNewPacks, e:
+                self._access.reload_or_raise(e)
+
+    def _get_remaining_record_stream(self, keys, ordering,
+                                     include_delta_closure):
+        """This function is the 'retry' portion for get_record_stream."""
         if include_delta_closure:
             positions = self._get_components_positions(keys, allow_missing=True)
         else:
@@ -1457,30 +1489,39 @@
             pb = progress.DummyProgress()
         keys = set(keys)
         total = len(keys)
-        # we don't care about inclusions, the caller cares.
-        # but we need to setup a list of records to visit.
-        # we need key, position, length
-        key_records = []
-        build_details = self._index.get_build_details(keys)
-        for key, details in build_details.iteritems():
-            if key in keys:
-                key_records.append((key, details[0]))
-                keys.remove(key)
-        records_iter = enumerate(self._read_records_iter(key_records))
-        for (key_idx, (key, data, sha_value)) in records_iter:
-            pb.update('Walking content.', key_idx, total)
-            compression_parent = build_details[key][1]
-            if compression_parent is None:
-                # fulltext
-                line_iterator = self._factory.get_fulltext_content(data)
-            else:
-                # Delta 
-                line_iterator = self._factory.get_linedelta_content(data)
-            # XXX: It might be more efficient to yield (key,
-            # line_iterator) in the future. However for now, this is a simpler
-            # change to integrate into the rest of the codebase. RBC 20071110
-            for line in line_iterator:
-                yield line, key
+        done = False
+        while not done:
+            try:
+                # we don't care about inclusions, the caller cares.
+                # but we need to setup a list of records to visit.
+                # we need key, position, length
+                key_records = []
+                build_details = self._index.get_build_details(keys)
+                for key, details in build_details.iteritems():
+                    if key in keys:
+                        key_records.append((key, details[0]))
+                records_iter = enumerate(self._read_records_iter(key_records))
+                for (key_idx, (key, data, sha_value)) in records_iter:
+                    pb.update('Walking content.', key_idx, total)
+                    compression_parent = build_details[key][1]
+                    if compression_parent is None:
+                        # fulltext
+                        line_iterator = self._factory.get_fulltext_content(data)
+                    else:
+                        # Delta 
+                        line_iterator = self._factory.get_linedelta_content(data)
+                    # Now that we are yielding the data for this key, remove it
+                    # from the list
+                    keys.remove(key)
+                    # XXX: It might be more efficient to yield (key,
+                    # line_iterator) in the future. However for now, this is a
+                    # simpler change to integrate into the rest of the
+                    # codebase. RBC 20071110
+                    for line in line_iterator:
+                        yield line, key
+                done = True
+            except errors.RetryWithNewPacks, e:
+                self._access.reload_or_raise(e)
         # If there are still keys we've not yet found, we look in the fallback
         # vfs, and hope to find them there.  Note that if the keys are found
         # but had no changes or no content, the fallback may not return
@@ -1893,7 +1934,6 @@
                 extra information about the content which needs to be passed to
                 Factory.parse_record
         """
-        prefixes = self._partition_keys(keys)
         parent_map = self.get_parent_map(keys)
         result = {}
         for key in keys:
@@ -2419,15 +2459,19 @@
 class _DirectPackAccess(object):
     """Access to data in one or more packs with less translation."""
 
-    def __init__(self, index_to_packs):
+    def __init__(self, index_to_packs, reload_func=None):
         """Create a _DirectPackAccess object.
 
         :param index_to_packs: A dict mapping index objects to the transport
             and file names for obtaining data.
+        :param reload_func: A function to call if we determine that the pack
+            files have moved and we need to reload our caches. See
+            bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
         """
         self._container_writer = None
         self._write_index = None
         self._indices = index_to_packs
+        self._reload_func = reload_func
 
     def add_raw_records(self, key_sizes, raw_data):
         """Add raw knit bytes to a storage area.
@@ -2479,10 +2523,29 @@
         if current_index is not None:
             request_lists.append((current_index, current_list))
         for index, offsets in request_lists:
-            transport, path = self._indices[index]
-            reader = pack.make_readv_reader(transport, path, offsets)
-            for names, read_func in reader.iter_records():
-                yield read_func(None)
+            try:
+                transport, path = self._indices[index]
+            except KeyError:
+                # A KeyError here indicates that someone has triggered an index
+                # reload, and this index has gone missing, we need to start
+                # over.
+                if self._reload_func is None:
+                    # If we don't have a _reload_func there is nothing that can
+                    # be done
+                    raise
+                raise errors.RetryWithNewPacks(reload_occurred=True,
+                                               exc_info=sys.exc_info())
+            try:
+                reader = pack.make_readv_reader(transport, path, offsets)
+                for names, read_func in reader.iter_records():
+                    yield read_func(None)
+            except errors.NoSuchFile:
+                # A NoSuchFile error indicates that a pack file has gone
+                # missing on disk, we need to trigger a reload, and start over.
+                if self._reload_func is None:
+                    raise
+                raise errors.RetryWithNewPacks(reload_occurred=False,
+                                               exc_info=sys.exc_info())
 
     def set_writer(self, writer, index, transport_packname):
         """Set a writer to use for adding data."""
@@ -2491,6 +2554,35 @@
         self._container_writer = writer
         self._write_index = index
 
+    def reload_or_raise(self, retry_exc):
+        """Try calling the reload function, or re-raise the original exception.
+
+        This should be called after _DirectPackAccess raises a
+        RetryWithNewPacks exception. This function will handle the common logic
+        of determining when the error is fatal versus being temporary.
+        It will also make sure that the original exception is raised, rather
+        than the RetryWithNewPacks exception.
+
+        If this function returns, then the calling function should retry
+        whatever operation was being performed. Otherwise an exception will
+        be raised.
+
+        :param retry_exc: A RetryWithNewPacks exception.
+        """
+        is_error = False
+        if self._reload_func is None:
+            is_error = True
+        elif not self._reload_func():
+            # The reload claimed that nothing changed
+            if not retry_exc.reload_occurred:
+                # If there wasn't an earlier reload, then we really were
+                # expecting to find changes. We didn't find them, so this is a
+                # hard error
+                is_error = True
+        if is_error:
+            exc_class, exc_value, exc_traceback = retry_exc.exc_info
+            raise exc_class, exc_value, exc_traceback
+
 
 # Deprecated, use PatienceSequenceMatcher instead
 KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
@@ -2769,11 +2861,17 @@
         if len(self._knit._fallback_vfs) > 0:
             # stacked knits can't use the fast path at present.
             return self._simple_annotate(key)
-        records = self._get_build_graph(key)
-        if key in self._ghosts:
-            raise errors.RevisionNotPresent(key, self._knit)
-        self._annotate_records(records)
-        return self._annotated_lines[key]
+        while True:
+            try:
+                records = self._get_build_graph(key)
+                if key in self._ghosts:
+                    raise errors.RevisionNotPresent(key, self._knit)
+                self._annotate_records(records)
+                return self._annotated_lines[key]
+            except errors.RetryWithNewPacks, e:
+                self._knit._access.reload_or_raise(e)
+                # The cached build_details are no longer valid
+                self._all_build_details.clear()
 
     def _simple_annotate(self, key):
         """Return annotated fulltext, rediffing from the full texts.

=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py	2008-11-21 20:24:15 +0000
+++ b/bzrlib/repofmt/pack_repo.py	2008-11-27 07:25:52 +0000
@@ -474,7 +474,8 @@
         self._reload_func = reload_func
         self.index_to_pack = {}
         self.combined_index = CombinedGraphIndex([], reload_func=reload_func)
-        self.data_access = _DirectPackAccess(self.index_to_pack)
+        self.data_access = _DirectPackAccess(self.index_to_pack,
+                                             reload_func=reload_func)
         self.add_callback = None
 
     def replace_indices(self, index_to_pack, indices):

=== modified file 'bzrlib/tests/test_knit.py'
--- a/bzrlib/tests/test_knit.py	2008-11-14 07:52:16 +0000
+++ b/bzrlib/tests/test_knit.py	2008-11-27 07:25:52 +0000
@@ -48,6 +48,7 @@
     _KnitKeyAccess,
     make_file_factory,
     )
+from bzrlib.repofmt import pack_repo
 from bzrlib.tests import (
     Feature,
     KnownFailure,
@@ -270,6 +271,24 @@
         return queue_call
 
 
+class MockReadvFailingTransport(MockTransport):
+    """Fail in the middle of a readv() result.
+
+    This Transport will successfully yield the first two requested hunks, but
+    raise NoSuchFile for the rest.
+    """
+
+    def readv(self, relpath, offsets):
+        count = 0
+        for result in MockTransport.readv(self, relpath, offsets):
+            count += 1
+            # we use 2 because the first offset is the pack header, the second
+            # is the first actual content requset
+            if count > 2:
+                raise errors.NoSuchFile(relpath)
+            yield result
+
+
 class KnitRecordAccessTestsMixin(object):
     """Tests for getting and putting knit records."""
 
@@ -304,7 +323,11 @@
         mapper = ConstantMapper("foo")
         access = _KnitKeyAccess(self.get_transport(), mapper)
         return access
-    
+
+
+class _TestException(Exception):
+    """Just an exception for local tests to use."""
+
 
 class TestPackKnitAccess(TestCaseWithMemoryTransport, KnitRecordAccessTestsMixin):
     """Tests for the pack based access."""
@@ -322,6 +345,88 @@
         access.set_writer(writer, index, (transport, packname))
         return access, writer
 
+    def make_pack_file(self):
+        """Create a pack file with 2 records."""
+        access, writer = self._get_access(packname='packname', index='foo')
+        memos = []
+        memos.extend(access.add_raw_records([('key1', 10)], '1234567890'))
+        memos.extend(access.add_raw_records([('key2', 5)], '12345'))
+        writer.end()
+        return memos
+
+    def make_vf_for_retrying(self):
+        """Create 3 packs and a reload function.
+
+        Originally, 2 pack files will have the data, but one will be missing.
+        And then the third will be used in place of the first two if reload()
+        is called.
+
+        :return: (versioned_file, reload_counter)
+            versioned_file  a KnitVersionedFiles using the packs for access
+        """
+        tree = self.make_branch_and_memory_tree('tree')
+        tree.lock_write()
+        try:
+            tree.add([''], ['root-id'])
+            tree.commit('one', rev_id='rev-1')
+            tree.commit('two', rev_id='rev-2')
+            tree.commit('three', rev_id='rev-3')
+            # Pack these two revisions into another pack file, but don't remove
+            # the originials
+            repo = tree.branch.repository
+            collection = repo._pack_collection
+            collection.ensure_loaded()
+            orig_packs = collection.packs
+            packer = pack_repo.Packer(collection, orig_packs, '.testpack')
+            new_pack = packer.pack()
+
+            vf = tree.branch.repository.revisions
+        finally:
+            tree.unlock()
+        tree.branch.repository.lock_read()
+        self.addCleanup(tree.branch.repository.unlock)
+        del tree
+        # Set up a reload() function that switches to using the new pack file
+        new_index = new_pack.revision_index
+        access_tuple = new_pack.access_tuple()
+        reload_counter = [0, 0, 0]
+        def reload():
+            reload_counter[0] += 1
+            if reload_counter[1] > 0:
+                # We already reloaded, nothing more to do
+                reload_counter[2] += 1
+                return False
+            reload_counter[1] += 1
+            vf._index._graph_index._indices[:] = [new_index]
+            vf._access._indices.clear()
+            vf._access._indices[new_index] = access_tuple
+            return True
+        # Delete one of the pack files so the data will need to be reloaded. We
+        # will delete the file with 'rev-2' in it
+        trans, name = orig_packs[1].access_tuple()
+        trans.delete(name)
+        # We don't have the index trigger reloading because we want to test
+        # that we reload when the .pack disappears
+        vf._access._reload_func = reload
+        return vf, reload_counter
+
+    def make_reload_func(self, return_val=True):
+        reload_called = [0]
+        def reload():
+            reload_called[0] += 1
+            return return_val
+        return reload_called, reload
+
+    def make_retry_exception(self):
+        # We raise a real exception so that sys.exc_info() is properly
+        # populated
+        try:
+            raise _TestException('foobar')
+        except _TestException, e:
+            retry_exc = errors.RetryWithNewPacks(reload_occurred=False,
+                                                 exc_info=sys.exc_info())
+        return retry_exc
+
     def test_read_from_several_packs(self):
         access, writer = self._get_access()
         memos = []
@@ -363,6 +468,197 @@
         writer.end()
         self.assertEqual(['1234567890'], list(access.get_raw_records(memos)))
 
+    def test_missing_index_raises_retry(self):
+        memos = self.make_pack_file()
+        transport = self.get_transport()
+        reload_called, reload_func = self.make_reload_func()
+        # Note that the index key has changed from 'foo' to 'bar'
+        access = _DirectPackAccess({'bar':(transport, 'packname')},
+                                   reload_func=reload_func)
+        e = self.assertListRaises(errors.RetryWithNewPacks,
+                                  access.get_raw_records, memos)
+        # Because a key was passed in which does not match our index list, we
+        # assume that the listing was already reloaded
+        self.assertTrue(e.reload_occurred)
+        self.assertIsInstance(e.exc_info, tuple)
+        self.assertIs(e.exc_info[0], KeyError)
+        self.assertIsInstance(e.exc_info[1], KeyError)
+
+    def test_missing_index_raises_key_error_with_no_reload(self):
+        memos = self.make_pack_file()
+        transport = self.get_transport()
+        # Note that the index key has changed from 'foo' to 'bar'
+        access = _DirectPackAccess({'bar':(transport, 'packname')})
+        e = self.assertListRaises(KeyError, access.get_raw_records, memos)
+
+    def test_missing_file_raises_retry(self):
+        memos = self.make_pack_file()
+        transport = self.get_transport()
+        reload_called, reload_func = self.make_reload_func()
+        # Note that the 'filename' has been changed to 'different-packname'
+        access = _DirectPackAccess({'foo':(transport, 'different-packname')},
+                                   reload_func=reload_func)
+        e = self.assertListRaises(errors.RetryWithNewPacks,
+                                  access.get_raw_records, memos)
+        # The file has gone missing, so we assume we need to reload
+        self.assertFalse(e.reload_occurred)
+        self.assertIsInstance(e.exc_info, tuple)
+        self.assertIs(e.exc_info[0], errors.NoSuchFile)
+        self.assertIsInstance(e.exc_info[1], errors.NoSuchFile)
+        self.assertEqual('different-packname', e.exc_info[1].path)
+
+    def test_missing_file_raises_no_such_file_with_no_reload(self):
+        memos = self.make_pack_file()
+        transport = self.get_transport()
+        # Note that the 'filename' has been changed to 'different-packname'
+        access = _DirectPackAccess({'foo':(transport, 'different-packname')})
+        e = self.assertListRaises(errors.NoSuchFile,
+                                  access.get_raw_records, memos)
+
+    def test_failing_readv_raises_retry(self):
+        memos = self.make_pack_file()
+        transport = self.get_transport()
+        failing_transport = MockReadvFailingTransport(
+                                [transport.get_bytes('packname')])
+        reload_called, reload_func = self.make_reload_func()
+        access = _DirectPackAccess({'foo':(failing_transport, 'packname')},
+                                   reload_func=reload_func)
+        # Asking for a single record will not trigger the Mock failure
+        self.assertEqual(['1234567890'],
+            list(access.get_raw_records(memos[:1])))
+        self.assertEqual(['12345'],
+            list(access.get_raw_records(memos[1:2])))
+        # A multiple offset readv() will fail mid-way through
+        e = self.assertListRaises(errors.RetryWithNewPacks,
+                                  access.get_raw_records, memos)
+        # The file has gone missing, so we assume we need to reload
+        self.assertFalse(e.reload_occurred)
+        self.assertIsInstance(e.exc_info, tuple)
+        self.assertIs(e.exc_info[0], errors.NoSuchFile)
+        self.assertIsInstance(e.exc_info[1], errors.NoSuchFile)
+        self.assertEqual('packname', e.exc_info[1].path)
+
+    def test_failing_readv_raises_no_such_file_with_no_reload(self):
+        memos = self.make_pack_file()
+        transport = self.get_transport()
+        failing_transport = MockReadvFailingTransport(
+                                [transport.get_bytes('packname')])
+        reload_called, reload_func = self.make_reload_func()
+        access = _DirectPackAccess({'foo':(failing_transport, 'packname')})
+        # Asking for a single record will not trigger the Mock failure
+        self.assertEqual(['1234567890'],
+            list(access.get_raw_records(memos[:1])))
+        self.assertEqual(['12345'],
+            list(access.get_raw_records(memos[1:2])))
+        # A multiple offset readv() will fail mid-way through
+        e = self.assertListRaises(errors.NoSuchFile,
+                                  access.get_raw_records, memos)
+
+    def test_reload_or_raise_no_reload(self):
+        access = _DirectPackAccess({}, reload_func=None)
+        retry_exc = self.make_retry_exception()
+        # Without a reload_func, we will just re-raise the original exception
+        self.assertRaises(_TestException, access.reload_or_raise, retry_exc)
+
+    def test_reload_or_raise_reload_changed(self):
+        reload_called, reload_func = self.make_reload_func(return_val=True)
+        access = _DirectPackAccess({}, reload_func=reload_func)
+        retry_exc = self.make_retry_exception()
+        access.reload_or_raise(retry_exc)
+        self.assertEqual([1], reload_called)
+        retry_exc.reload_occurred=True
+        access.reload_or_raise(retry_exc)
+        self.assertEqual([2], reload_called)
+
+    def test_reload_or_raise_reload_no_change(self):
+        reload_called, reload_func = self.make_reload_func(return_val=False)
+        access = _DirectPackAccess({}, reload_func=reload_func)
+        retry_exc = self.make_retry_exception()
+        # If reload_occurred is False, then we consider it an error to have
+        # reload_func() return False (no changes).
+        self.assertRaises(_TestException, access.reload_or_raise, retry_exc)
+        self.assertEqual([1], reload_called)
+        retry_exc.reload_occurred=True
+        # If reload_occurred is True, then we assume nothing changed because
+        # it had changed earlier, but didn't change again
+        access.reload_or_raise(retry_exc)
+        self.assertEqual([2], reload_called)
+
+    def test_annotate_retries(self):
+        vf, reload_counter = self.make_vf_for_retrying()
+        # It is a little bit bogus to annotate the Revision VF, but it works,
+        # as we have ancestry stored there
+        key = ('rev-3',)
+        reload_lines = vf.annotate(key)
+        self.assertEqual([1, 1, 0], reload_counter)
+        plain_lines = vf.annotate(key)
+        self.assertEqual([1, 1, 0], reload_counter) # No extra reloading
+        if reload_lines != plain_lines:
+            self.fail('Annotation was not identical with reloading.')
+        # Now delete the packs-in-use, which should trigger another reload, but
+        # this time we just raise an exception because we can't recover
+        for trans, name in vf._access._indices.itervalues():
+            trans.delete(name)
+        self.assertRaises(errors.NoSuchFile, vf.annotate, key)
+        self.assertEqual([2, 1, 1], reload_counter)
+
+    def test__get_record_map_retries(self):
+        vf, reload_counter = self.make_vf_for_retrying()
+        keys = [('rev-1',), ('rev-2',), ('rev-3',)]
+        records = vf._get_record_map(keys)
+        self.assertEqual(keys, sorted(records.keys()))
+        self.assertEqual([1, 1, 0], reload_counter)
+        # Now delete the packs-in-use, which should trigger another reload, but
+        # this time we just raise an exception because we can't recover
+        for trans, name in vf._access._indices.itervalues():
+            trans.delete(name)
+        self.assertRaises(errors.NoSuchFile, vf._get_record_map, keys)
+        self.assertEqual([2, 1, 1], reload_counter)
+
+    def test_get_record_stream_retries(self):
+        vf, reload_counter = self.make_vf_for_retrying()
+        keys = [('rev-1',), ('rev-2',), ('rev-3',)]
+        record_stream = vf.get_record_stream(keys, 'topological', False)
+        record = record_stream.next()
+        self.assertEqual(('rev-1',), record.key)
+        self.assertEqual([0, 0, 0], reload_counter)
+        record = record_stream.next()
+        self.assertEqual(('rev-2',), record.key)
+        self.assertEqual([1, 1, 0], reload_counter)
+        record = record_stream.next()
+        self.assertEqual(('rev-3',), record.key)
+        self.assertEqual([1, 1, 0], reload_counter)
+        # Now delete all pack files, and see that we raise the right error
+        for trans, name in vf._access._indices.itervalues():
+            trans.delete(name)
+        self.assertListRaises(errors.NoSuchFile,
+            vf.get_record_stream, keys, 'topological', False)
+
+    def test_iter_lines_added_or_present_in_keys_retries(self):
+        vf, reload_counter = self.make_vf_for_retrying()
+        keys = [('rev-1',), ('rev-2',), ('rev-3',)]
+        # Unfortunately, iter_lines_added_or_present_in_keys iterates the
+        # result in random order (determined by the iteration order from a
+        # set()), so we don't have any solid way to trigger whether data is
+        # read before or after. However we tried to delete the middle node to
+        # exercise the code well.
+        # What we care about is that all lines are always yielded, but not
+        # duplicated
+        count = 0
+        reload_lines = sorted(vf.iter_lines_added_or_present_in_keys(keys))
+        self.assertEqual([1, 1, 0], reload_counter)
+        # Now do it again, to make sure the result is equivalent
+        plain_lines = sorted(vf.iter_lines_added_or_present_in_keys(keys))
+        self.assertEqual([1, 1, 0], reload_counter) # No extra reloading
+        self.assertEqual(plain_lines, reload_lines)
+        self.assertEqual(21, len(plain_lines))
+        # Now delete all pack files, and see that we raise the right error
+        for trans, name in vf._access._indices.itervalues():
+            trans.delete(name)
+        self.assertListRaises(errors.NoSuchFile,
+            vf.iter_lines_added_or_present_in_keys, keys)
+        self.assertEqual([2, 1, 1], reload_counter)
+
 
 class LowLevelKnitDataTests(TestCase):
 
@@ -373,6 +669,26 @@
         gz_file.close()
         return sio.getvalue()
 
+    def make_multiple_records(self):
+        """Create the content for multiple records."""
+        sha1sum = osutils.sha('foo\nbar\n').hexdigest()
+        total_txt = []
+        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
+                                        'foo\n'
+                                        'bar\n'
+                                        'end rev-id-1\n'
+                                        % (sha1sum,))
+        record_1 = (0, len(gz_txt), sha1sum)
+        total_txt.append(gz_txt)
+        sha1sum = osutils.sha('baz\n').hexdigest()
+        gz_txt = self.create_gz_content('version rev-id-2 1 %s\n'
+                                        'baz\n'
+                                        'end rev-id-2\n'
+                                        % (sha1sum,))
+        record_2 = (record_1[1], len(gz_txt), sha1sum)
+        total_txt.append(gz_txt)
+        return total_txt, record_1, record_2
+
     def test_valid_knit_data(self):
         sha1sum = osutils.sha('foo\nbar\n').hexdigest()
         gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
@@ -392,6 +708,24 @@
         raw_contents = list(knit._read_records_iter_raw(records))
         self.assertEqual([(('rev-id-1',), gz_txt, sha1sum)], raw_contents)
 
+    def test_multiple_records_valid(self):
+        total_txt, record_1, record_2 = self.make_multiple_records()
+        transport = MockTransport([''.join(total_txt)])
+        access = _KnitKeyAccess(transport, ConstantMapper('filename'))
+        knit = KnitVersionedFiles(None, access)
+        records = [(('rev-id-1',), (('rev-id-1',), record_1[0], record_1[1])),
+                   (('rev-id-2',), (('rev-id-2',), record_2[0], record_2[1]))]
+
+        contents = list(knit._read_records_iter(records))
+        self.assertEqual([(('rev-id-1',), ['foo\n', 'bar\n'], record_1[2]),
+                          (('rev-id-2',), ['baz\n'], record_2[2])],
+                         contents)
+
+        raw_contents = list(knit._read_records_iter_raw(records))
+        self.assertEqual([(('rev-id-1',), total_txt[0], record_1[2]),
+                          (('rev-id-2',), total_txt[1], record_2[2])],
+                         raw_contents)
+
     def test_not_enough_lines(self):
         sha1sum = osutils.sha('foo\n').hexdigest()
         # record says 2 lines data says 1

=== modified file 'bzrlib/tests/test_pack_repository.py'
--- a/bzrlib/tests/test_pack_repository.py	2008-11-12 02:53:12 +0000
+++ b/bzrlib/tests/test_pack_repository.py	2008-11-27 07:25:52 +0000
@@ -420,6 +420,35 @@
         finally:
             tree.unlock()
 
+    def test_concurrent_pack_during_get_record_reloads(self):
+        tree = self.make_branch_and_tree('tree')
+        tree.lock_write()
+        try:
+            rev1 = tree.commit('one')
+            rev2 = tree.commit('two')
+            keys = [(rev1,), (rev2,)]
+            r2 = repository.Repository.open('tree')
+            r2.lock_read()
+            try:
+                # At this point, we will start grabbing a record stream, and
+                # trigger a repack mid-way
+                packed = False
+                result = {}
+                record_stream = r2.revisions.get_record_stream(keys,
+                                    'unordered', False)
+                for record in record_stream:
+                    result[record.key] = record
+                    if not packed:
+                        tree.branch.repository.pack()
+                        packed = True
+                # The first record will be found in the original location, but
+                # after the pack, we have to reload to find the next record
+                self.assertEqual(sorted(keys), sorted(result.keys()))
+            finally:
+                r2.unlock()
+        finally:
+            tree.unlock()
+
     def test_lock_write_does_not_physically_lock(self):
         repo = self.make_repository('.', format=self.get_format())
         repo.lock_write()




More information about the bazaar-commits mailing list