Rev 5774: Resolve all the conflicts for merging move-pack-access in http://bazaar.launchpad.net/~jameinel/bzr/integration

John Arbash Meinel john at arbash-meinel.com
Fri Apr 8 14:08:27 UTC 2011


At http://bazaar.launchpad.net/~jameinel/bzr/integration

------------------------------------------------------------
revno: 5774 [merge]
revision-id: john at arbash-meinel.com-20110408140811-9vxbb47etgwfp4z2
parent: john at arbash-meinel.com-20110408140350-fg45nkkx71l4sjvt
parent: jelmer at samba.org-20110405234314-78o98lmtky0lu8nv
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: integration
timestamp: Fri 2011-04-08 16:08:11 +0200
message:
  Resolve all the conflicts for merging move-pack-access
modified:
  bzrlib/groupcompress.py        groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
  bzrlib/knit.py                 knit.py-20051212171256-f056ac8f0fbe1bd9
  bzrlib/repofmt/groupcompress_repo.py repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
  bzrlib/repofmt/knitpack_repo.py knitpack_repo.py-20110405143430-6p75yrk99v6pb770-1
  bzrlib/repofmt/pack_repo.py    pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
  bzrlib/tests/test_knit.py      test_knit.py-20051212171302-95d4c00dd5f11f2b
-------------- next part --------------
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py	2011-04-05 14:48:54 +0000
+++ b/bzrlib/groupcompress.py	2011-04-08 14:08:11 +0000
@@ -36,6 +36,7 @@
     )
 from bzrlib.btree_index import BTreeBuilder
 from bzrlib.lru_cache import LRUSizeCache
+from bzrlib.repofmt import pack_repo
 from bzrlib.tsort import topo_sort
 from bzrlib.versionedfile import (
     adapter_registry,
@@ -1046,7 +1047,7 @@
         index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
             add_callback=graph_index.add_nodes,
             inconsistency_fatal=inconsistency_fatal)
-        access = knit._DirectPackAccess({})
+        access = pack_repo._DirectPackAccess({})
         access.set_writer(writer, graph_index, (transport, 'newpack'))
         result = GroupCompressVersionedFiles(index, access, delta)
         result.stream = stream

=== modified file 'bzrlib/knit.py'
--- a/bzrlib/knit.py	2011-04-05 14:48:54 +0000
+++ b/bzrlib/knit.py	2011-04-08 14:08:11 +0000
@@ -56,7 +56,6 @@
 from itertools import izip
 import operator
 import os
-import sys
 
 from bzrlib.lazy_import import lazy_import
 lazy_import(globals(), """
@@ -799,7 +798,7 @@
         writer.begin()
         index = _KnitGraphIndex(graph_index, lambda:True, parents=parents,
             deltas=delta, add_callback=graph_index.add_nodes)
-        access = _DirectPackAccess({})
+        access = pack_repo._DirectPackAccess({})
         access.set_writer(writer, graph_index, (transport, 'newpack'))
         result = KnitVersionedFiles(index, access,
             max_delta_chain=max_delta_chain)

=== modified file 'bzrlib/repofmt/groupcompress_repo.py'
--- a/bzrlib/repofmt/groupcompress_repo.py	2011-04-05 23:47:06 +0000
+++ b/bzrlib/repofmt/groupcompress_repo.py	2011-04-08 14:08:11 +0000
@@ -44,6 +44,7 @@
     GroupCompressVersionedFiles,
     )
 from bzrlib.repofmt.pack_repo import (
+    _DirectPackAccess,
     Pack,
     NewPack,
     PackRepository,
@@ -354,8 +355,8 @@
         """Build a VersionedFiles instance on top of this group of packs."""
         index_name = index_name + '_index'
         index_to_pack = {}
-        access = knit._DirectPackAccess(index_to_pack,
-                                        reload_func=self._reload_func)
+        access = _DirectPackAccess(index_to_pack,
+                                   reload_func=self._reload_func)
         if for_write:
             # Use new_pack
             if self.new_pack is None:

=== modified file 'bzrlib/repofmt/knitpack_repo.py'
--- a/bzrlib/repofmt/knitpack_repo.py	2011-04-05 23:53:18 +0000
+++ b/bzrlib/repofmt/knitpack_repo.py	2011-04-08 14:08:11 +0000
@@ -56,6 +56,7 @@
     KnitRepository,
     )
 from bzrlib.repofmt.pack_repo import (
+    _DirectPackAccess,
     NewPack,
     RepositoryFormatPack,
     ResumedPack,
@@ -1074,7 +1075,7 @@
             self.new_pack.text_index,
             ('blank', ), 1,
             add_nodes_callback=self.new_pack.text_index.add_nodes)
-        data_access = knit._DirectPackAccess(
+        data_access = _DirectPackAccess(
                 {self.new_pack.text_index:self.new_pack.access_tuple()})
         data_access.set_writer(self.new_pack._writer, self.new_pack.text_index,
             self.new_pack.access_tuple())

=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py	2011-04-08 14:03:50 +0000
+++ b/bzrlib/repofmt/pack_repo.py	2011-04-08 14:08:11 +0000
@@ -1895,3 +1895,143 @@
             " context: %(context)s %(orig_error)s")
 
 
+class _DirectPackAccess(object):
+    """Access to data in one or more packs with less translation."""
+
+    def __init__(self, index_to_packs, reload_func=None, flush_func=None):
+        """Create a _DirectPackAccess object.
+
+        :param index_to_packs: A dict mapping index objects to the transport
+            and file names for obtaining data.
+        :param reload_func: A function to call if we determine that the pack
+            files have moved and we need to reload our caches. See
+            bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
+        """
+        self._container_writer = None
+        self._write_index = None
+        self._indices = index_to_packs
+        self._reload_func = reload_func
+        self._flush_func = flush_func
+
+    def add_raw_records(self, key_sizes, raw_data):
+        """Add raw knit bytes to a storage area.
+
+        The data is spooled to the container writer in one bytes-record per
+        raw data item.
+
+        :param sizes: An iterable of tuples containing the key and size of each
+            raw data segment.
+        :param raw_data: A bytestring containing the data.
+        :return: A list of memos to retrieve the record later. Each memo is an
+            opaque index memo. For _DirectPackAccess the memo is (index, pos,
+            length), where the index field is the write_index object supplied
+            to the PackAccess object.
+        """
+        if type(raw_data) is not str:
+            raise AssertionError(
+                'data must be plain bytes was %s' % type(raw_data))
+        result = []
+        offset = 0
+        for key, size in key_sizes:
+            p_offset, p_length = self._container_writer.add_bytes_record(
+                raw_data[offset:offset+size], [])
+            offset += size
+            result.append((self._write_index, p_offset, p_length))
+        return result
+
+    def flush(self):
+        """Flush pending writes on this access object.
+
+        This will flush any buffered writes to a NewPack.
+        """
+        if self._flush_func is not None:
+            self._flush_func()
+
+    def get_raw_records(self, memos_for_retrieval):
+        """Get the raw bytes for a records.
+
+        :param memos_for_retrieval: An iterable containing the (index, pos,
+            length) memo for retrieving the bytes. The Pack access method
+            looks up the pack to use for a given record in its index_to_pack
+            map.
+        :return: An iterator over the bytes of the records.
+        """
+        # first pass, group into same-index requests
+        request_lists = []
+        current_index = None
+        for (index, offset, length) in memos_for_retrieval:
+            if current_index == index:
+                current_list.append((offset, length))
+            else:
+                if current_index is not None:
+                    request_lists.append((current_index, current_list))
+                current_index = index
+                current_list = [(offset, length)]
+        # handle the last entry
+        if current_index is not None:
+            request_lists.append((current_index, current_list))
+        for index, offsets in request_lists:
+            try:
+                transport, path = self._indices[index]
+            except KeyError:
+                # A KeyError here indicates that someone has triggered an index
+                # reload, and this index has gone missing, we need to start
+                # over.
+                if self._reload_func is None:
+                    # If we don't have a _reload_func there is nothing that can
+                    # be done
+                    raise
+                raise errors.RetryWithNewPacks(index,
+                                               reload_occurred=True,
+                                               exc_info=sys.exc_info())
+            try:
+                reader = pack.make_readv_reader(transport, path, offsets)
+                for names, read_func in reader.iter_records():
+                    yield read_func(None)
+            except errors.NoSuchFile:
+                # A NoSuchFile error indicates that a pack file has gone
+                # missing on disk, we need to trigger a reload, and start over.
+                if self._reload_func is None:
+                    raise
+                raise errors.RetryWithNewPacks(transport.abspath(path),
+                                               reload_occurred=False,
+                                               exc_info=sys.exc_info())
+
+    def set_writer(self, writer, index, transport_packname):
+        """Set a writer to use for adding data."""
+        if index is not None:
+            self._indices[index] = transport_packname
+        self._container_writer = writer
+        self._write_index = index
+
+    def reload_or_raise(self, retry_exc):
+        """Try calling the reload function, or re-raise the original exception.
+
+        This should be called after _DirectPackAccess raises a
+        RetryWithNewPacks exception. This function will handle the common logic
+        of determining when the error is fatal versus being temporary.
+        It will also make sure that the original exception is raised, rather
+        than the RetryWithNewPacks exception.
+
+        If this function returns, then the calling function should retry
+        whatever operation was being performed. Otherwise an exception will
+        be raised.
+
+        :param retry_exc: A RetryWithNewPacks exception.
+        """
+        is_error = False
+        if self._reload_func is None:
+            is_error = True
+        elif not self._reload_func():
+            # The reload claimed that nothing changed
+            if not retry_exc.reload_occurred:
+                # If there wasn't an earlier reload, then we really were
+                # expecting to find changes. We didn't find them, so this is a
+                # hard error
+                is_error = True
+        if is_error:
+            exc_class, exc_value, exc_traceback = retry_exc.exc_info
+            raise exc_class, exc_value, exc_traceback
+
+
+

=== modified file 'bzrlib/tests/test_knit.py'
--- a/bzrlib/tests/test_knit.py	2011-04-05 23:47:06 +0000
+++ b/bzrlib/tests/test_knit.py	2011-04-08 14:08:11 +0000
@@ -325,7 +325,7 @@
             transport.append_bytes(packname, bytes)
         writer = pack.ContainerWriter(write_data)
         writer.begin()
-        access = _DirectPackAccess({})
+        access = pack_repo._DirectPackAccess({})
         access.set_writer(writer, index, (transport, packname))
         return access, writer
 
@@ -456,7 +456,7 @@
         memos.extend(access.add_raw_records([('key', 5)], 'alpha'))
         writer.end()
         transport = self.get_transport()
-        access = _DirectPackAccess({"FOO":(transport, 'packfile'),
+        access = pack_repo._DirectPackAccess({"FOO":(transport, 'packfile'),
             "FOOBAR":(transport, 'pack2'),
             "BAZ":(transport, 'pack3')})
         self.assertEqual(['1234567890', '12345', 'alpha'],
@@ -472,7 +472,7 @@
 
     def test_set_writer(self):
         """The writer should be settable post construction."""
-        access = _DirectPackAccess({})
+        access = pack_repo._DirectPackAccess({})
         transport = self.get_transport()
         packname = 'packfile'
         index = 'foo'
@@ -490,7 +490,7 @@
         transport = self.get_transport()
         reload_called, reload_func = self.make_reload_func()
         # Note that the index key has changed from 'foo' to 'bar'
-        access = _DirectPackAccess({'bar':(transport, 'packname')},
+        access = pack_repo._DirectPackAccess({'bar':(transport, 'packname')},
                                    reload_func=reload_func)
         e = self.assertListRaises(errors.RetryWithNewPacks,
                                   access.get_raw_records, memos)
@@ -505,7 +505,7 @@
         memos = self.make_pack_file()
         transport = self.get_transport()
         # Note that the index key has changed from 'foo' to 'bar'
-        access = _DirectPackAccess({'bar':(transport, 'packname')})
+        access = pack_repo._DirectPackAccess({'bar':(transport, 'packname')})
         e = self.assertListRaises(KeyError, access.get_raw_records, memos)
 
     def test_missing_file_raises_retry(self):
@@ -513,8 +513,9 @@
         transport = self.get_transport()
         reload_called, reload_func = self.make_reload_func()
         # Note that the 'filename' has been changed to 'different-packname'
-        access = _DirectPackAccess({'foo':(transport, 'different-packname')},
-                                   reload_func=reload_func)
+        access = pack_repo._DirectPackAccess(
+            {'foo':(transport, 'different-packname')},
+            reload_func=reload_func)
         e = self.assertListRaises(errors.RetryWithNewPacks,
                                   access.get_raw_records, memos)
         # The file has gone missing, so we assume we need to reload
@@ -528,7 +529,8 @@
         memos = self.make_pack_file()
         transport = self.get_transport()
         # Note that the 'filename' has been changed to 'different-packname'
-        access = _DirectPackAccess({'foo':(transport, 'different-packname')})
+        access = pack_repo._DirectPackAccess(
+            {'foo':(transport, 'different-packname')})
         e = self.assertListRaises(errors.NoSuchFile,
                                   access.get_raw_records, memos)
 
@@ -538,8 +540,9 @@
         failing_transport = MockReadvFailingTransport(
                                 [transport.get_bytes('packname')])
         reload_called, reload_func = self.make_reload_func()
-        access = _DirectPackAccess({'foo':(failing_transport, 'packname')},
-                                   reload_func=reload_func)
+        access = pack_repo._DirectPackAccess(
+            {'foo':(failing_transport, 'packname')},
+            reload_func=reload_func)
         # Asking for a single record will not trigger the Mock failure
         self.assertEqual(['1234567890'],
             list(access.get_raw_records(memos[:1])))
@@ -561,7 +564,8 @@
         failing_transport = MockReadvFailingTransport(
                                 [transport.get_bytes('packname')])
         reload_called, reload_func = self.make_reload_func()
-        access = _DirectPackAccess({'foo':(failing_transport, 'packname')})
+        access = pack_repo._DirectPackAccess(
+            {'foo':(failing_transport, 'packname')})
         # Asking for a single record will not trigger the Mock failure
         self.assertEqual(['1234567890'],
             list(access.get_raw_records(memos[:1])))
@@ -572,14 +576,14 @@
                                   access.get_raw_records, memos)
 
     def test_reload_or_raise_no_reload(self):
-        access = _DirectPackAccess({}, reload_func=None)
+        access = pack_repo._DirectPackAccess({}, reload_func=None)
         retry_exc = self.make_retry_exception()
         # Without a reload_func, we will just re-raise the original exception
         self.assertRaises(_TestException, access.reload_or_raise, retry_exc)
 
     def test_reload_or_raise_reload_changed(self):
         reload_called, reload_func = self.make_reload_func(return_val=True)
-        access = _DirectPackAccess({}, reload_func=reload_func)
+        access = pack_repo._DirectPackAccess({}, reload_func=reload_func)
         retry_exc = self.make_retry_exception()
         access.reload_or_raise(retry_exc)
         self.assertEqual([1], reload_called)
@@ -589,7 +593,7 @@
 
     def test_reload_or_raise_reload_no_change(self):
         reload_called, reload_func = self.make_reload_func(return_val=False)
-        access = _DirectPackAccess({}, reload_func=reload_func)
+        access = pack_repo._DirectPackAccess({}, reload_func=reload_func)
         retry_exc = self.make_retry_exception()
         # If reload_occurred is False, then we consider it an error to have
         # reload_func() return False (no changes).



More information about the bazaar-commits mailing list