Rev 4044: (robertc) Move write locking and write group responsibilities into in file:///home/pqm/archives/thelove/bzr/%2Btrunk/

Canonical.com Patch Queue Manager pqm at pqm.ubuntu.com
Wed Feb 25 01:12:00 GMT 2009


At file:///home/pqm/archives/thelove/bzr/%2Btrunk/

------------------------------------------------------------
revno: 4044
revision-id: pqm at pqm.ubuntu.com-20090225011157-xn0w5zux5mrges6j
parent: pqm at pqm.ubuntu.com-20090225000405-09p33ue22l4h19yk
parent: robertc at robertcollins.net-20090225003109-9ngqolksoevjw5ay
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Wed 2009-02-25 01:11:57 +0000
message:
  (robertc) Move write locking and write group responsibilities into
  	the Sink objects themselves,
  	allowing complete avoidance of unnecessary calls when the sink is a
  	RemoteSink. (Robert Collins)
modified:
  bzrlib/fetch.py                fetch.py-20050818234941-26fea6105696365d
  bzrlib/remote.py               remote.py-20060720103555-yeeg2x51vn0rbtdp-1
  bzrlib/repository.py           rev_storage.py-20051111201905-119e9401e46257e3
  bzrlib/smart/repository.py     repository.py-20061128022038-vr5wy5bubyb8xttk-1
  bzrlib/tests/blackbox/test_push.py test_push.py-20060329002750-929af230d5d22663
  bzrlib/tests/branch_implementations/test_push.py test_push.py-20070130153159-fhfap8uoifevg30j-1
    ------------------------------------------------------------
    revno: 4032.3.8
    revision-id: robertc at robertcollins.net-20090225003109-9ngqolksoevjw5ay
    parent: robertc at robertcollins.net-20090224212243-wpleuzp5uin2h89p
    parent: pqm at pqm.ubuntu.com-20090225000405-09p33ue22l4h19yk
    committer: Robert Collins <robertc at robertcollins.net>
    branch nick: integration
    timestamp: Wed 2009-02-25 11:31:09 +1100
    message:
      Merge bzr.dev, adjust effort tests reducing the effort in line with the write_group avoidance.
    modified:
      NEWS                           NEWS-20050323055033-4e00b5db738777ff
      bzrlib/remote.py               remote.py-20060720103555-yeeg2x51vn0rbtdp-1
      bzrlib/repository.py           rev_storage.py-20051111201905-119e9401e46257e3
      bzrlib/smart/repository.py     repository.py-20061128022038-vr5wy5bubyb8xttk-1
      bzrlib/tests/branch_implementations/test_push.py test_push.py-20070130153159-fhfap8uoifevg30j-1
      bzrlib/tests/per_repository/test_repository.py test_repository.py-20060131092128-ad07f494f5c9d26c
      bzrlib/tests/test_read_bundle.py test_read_bundle.py-20060615211421-ud8cwr1ulgd914zf-1
      setup.py                       setup.py-20050314065409-02f8a0a6e3f9bc70
      tools/win32/build_release.py   build_release.py-20081105204355-2ghh5cv01v1x4rzz-1
    ------------------------------------------------------------
    revno: 4032.3.7
    revision-id: robertc at robertcollins.net-20090224212243-wpleuzp5uin2h89p
    parent: robertc at robertcollins.net-20090224091304-k97x0yqk5yjy8jbl
    committer: Robert Collins <robertc at robertcollins.net>
    branch nick: push.roundtrips
    timestamp: Wed 2009-02-25 08:22:43 +1100
    message:
      Move write locking and write group responsibilities into the Sink objects themselves, allowing complete avoidance of unnecessary calls when the sink is a RemoteSink.
    modified:
      bzrlib/fetch.py                fetch.py-20050818234941-26fea6105696365d
      bzrlib/remote.py               remote.py-20060720103555-yeeg2x51vn0rbtdp-1
      bzrlib/repository.py           rev_storage.py-20051111201905-119e9401e46257e3
      bzrlib/smart/repository.py     repository.py-20061128022038-vr5wy5bubyb8xttk-1
      bzrlib/tests/blackbox/test_push.py test_push.py-20060329002750-929af230d5d22663
=== modified file 'bzrlib/fetch.py'
--- a/bzrlib/fetch.py	2009-02-23 15:29:35 +0000
+++ b/bzrlib/fetch.py	2009-02-24 21:22:43 +0000
@@ -104,22 +104,11 @@
             self.nested_pb = None
         self.from_repository.lock_read()
         try:
-            self.to_repository.lock_write()
             try:
-                self.to_repository.start_write_group()
-                try:
-                    self.__fetch()
-                except:
-                    self.to_repository.abort_write_group(suppress_errors=True)
-                    raise
-                else:
-                    self.to_repository.commit_write_group()
+                self.__fetch()
             finally:
-                try:
-                    if self.nested_pb is not None:
-                        self.nested_pb.finished()
-                finally:
-                    self.to_repository.unlock()
+                if self.nested_pb is not None:
+                    self.nested_pb.finished()
         finally:
             self.from_repository.unlock()
 
@@ -161,14 +150,20 @@
         try:
             from_format = self.from_repository._format
             stream = self.get_stream(search, pp)
-            missing_keys = self.sink.insert_stream(stream, from_format)
+            resume_tokens, missing_keys = self.sink.insert_stream(
+                stream, from_format, [])
             if missing_keys:
                 stream = self.get_stream_for_missing_keys(missing_keys)
-                missing_keys = self.sink.insert_stream(stream, from_format)
+                resume_tokens, missing_keys = self.sink.insert_stream(
+                    stream, from_format, resume_tokens)
             if missing_keys:
                 raise AssertionError(
                     "second push failed to complete a fetch %r." % (
                         missing_keys,))
+            if resume_tokens:
+                raise AssertionError(
+                    "second push failed to commit the fetch %r." % (
+                        resume_tokens,))
             self.sink.finished()
         finally:
             if self.pb is not None:
@@ -195,7 +190,6 @@
                     revisions])
             elif knit_kind == "inventory":
                 # Now copy the file texts.
-                to_texts = self.to_repository.texts
                 from_texts = self.from_repository.texts
                 yield ('texts', from_texts.get_record_stream(
                     text_keys, self.to_repository._fetch_order,

=== modified file 'bzrlib/remote.py'
--- a/bzrlib/remote.py	2009-02-24 23:19:53 +0000
+++ b/bzrlib/remote.py	2009-02-25 00:31:09 +0000
@@ -1372,25 +1372,24 @@
 
     def __init__(self, target_repo):
         repository.StreamSink.__init__(self, target_repo)
-        self._resume_tokens = []
 
-    def _insert_real(self, stream, src_format):
+    def _insert_real(self, stream, src_format, resume_tokens):
         self.target_repo._ensure_real()
         sink = self.target_repo._real_repository._get_sink()
-        result = sink.insert_stream(stream, src_format)
+        result = sink.insert_stream(stream, src_format, resume_tokens)
         if not result:
             self.target_repo.autopack()
         return result
 
-    def insert_stream(self, stream, src_format):
+    def insert_stream(self, stream, src_format, resume_tokens):
         repo = self.target_repo
         client = repo._client
         medium = client._medium
         if medium._is_remote_before((1, 13)):
             # No possible way this can work.
-            return self._insert_real(stream, src_format)
+            return self._insert_real(stream, src_format, resume_tokens)
         path = repo.bzrdir._path_for_remote_call(client)
-        if not self._resume_tokens:
+        if not resume_tokens:
             # XXX: Ugly but important for correctness, *will* be fixed during
             # 1.13 cycle. Pushing a stream that is interrupted results in a
             # fallback to the _real_repositories sink *with a partial stream*.
@@ -1401,29 +1400,28 @@
             # buffering etc.
             byte_stream = self._stream_to_byte_stream([], src_format)
             try:
-                resume_tokens = ''
                 response = client.call_with_body_stream(
-                    ('Repository.insert_stream', path, resume_tokens), byte_stream)
+                    ('Repository.insert_stream', path, ''), byte_stream)
             except errors.UnknownSmartMethod:
                 medium._remember_remote_is_before((1,13))
-                return self._insert_real(stream, src_format)
+                return self._insert_real(stream, src_format, resume_tokens)
         byte_stream = self._stream_to_byte_stream(stream, src_format)
-        resume_tokens = ' '.join(self._resume_tokens)
+        resume_tokens = ' '.join(resume_tokens)
         response = client.call_with_body_stream(
             ('Repository.insert_stream', path, resume_tokens), byte_stream)
         if response[0][0] not in ('ok', 'missing-basis'):
             raise errors.UnexpectedSmartServerResponse(response)
         if response[0][0] == 'missing-basis':
             tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
-            self._resume_tokens = tokens
-            return missing_keys
+            resume_tokens = tokens
+            return resume_tokens, missing_keys
         else:
             if self.target_repo._real_repository is not None:
                 collection = getattr(self.target_repo._real_repository,
                     '_pack_collection', None)
                 if collection is not None:
                     collection.reload_pack_names()
-            return []
+            return [], set()
 
     def _stream_to_byte_stream(self, stream, src_format):
         bytes = []

=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py	2009-02-24 22:19:10 +0000
+++ b/bzrlib/repository.py	2009-02-25 00:31:09 +0000
@@ -3685,15 +3685,30 @@
     def __init__(self, target_repo):
         self.target_repo = target_repo
 
-    def insert_stream(self, stream, src_format):
+    def insert_stream(self, stream, src_format, resume_tokens):
         """Insert a stream's content into the target repository.
 
         :param src_format: a bzr repository format.
 
-        :return: an iterable of keys additional items required before the
-        insertion can be completed.
+        :return: a list of resume tokens and an  iterable of keys additional
+            items required before the insertion can be completed.
         """
-        result = []
+        self.target_repo.lock_write()
+        try:
+            if resume_tokens:
+                self.target_repo.resume_write_group(resume_tokens)
+            else:
+                self.target_repo.start_write_group()
+            try:
+                # locked_insert_stream performs a commit|suspend.
+                return self._locked_insert_stream(stream, src_format)
+            except:
+                self.target_repo.abort_write_group(suppress_errors=True)
+                raise
+        finally:
+            self.target_repo.unlock()
+
+    def _locked_insert_stream(self, stream, src_format):
         to_serializer = self.target_repo._format._serializer
         src_serializer = src_format._serializer
         for substream_type, substream in stream:
@@ -3720,7 +3735,30 @@
                 self.target_repo.signatures.insert_record_stream(substream)
             else:
                 raise AssertionError('kaboom! %s' % (substream_type,))
-        return result
+        try:
+            missing_keys = set()
+            for prefix, versioned_file in (
+                ('texts', self.target_repo.texts),
+                ('inventories', self.target_repo.inventories),
+                ('revisions', self.target_repo.revisions),
+                ('signatures', self.target_repo.signatures),
+                ):
+                missing_keys.update((prefix,) + key for key in
+                    versioned_file.get_missing_compression_parent_keys())
+        except NotImplementedError:
+            # cannot even attempt suspending, and missing would have failed
+            # during stream insertion.
+            missing_keys = set()
+        else:
+            if missing_keys:
+                # suspend the write group and tell the caller what we is
+                # missing. We know we can suspend or else we would not have
+                # entered this code path. (All repositories that can handle
+                # missing keys can handle suspending a write group).
+                write_group_tokens = self.target_repo.suspend_write_group()
+                return write_group_tokens, missing_keys
+        self.target_repo.commit_write_group()
+        return [], set()
 
     def _extract_and_insert_inventories(self, substream, serializer):
         """Generate a new inventory versionedfile in target, converting data.

=== modified file 'bzrlib/smart/repository.py'
--- a/bzrlib/smart/repository.py	2009-02-24 06:02:51 +0000
+++ b/bzrlib/smart/repository.py	2009-02-25 00:31:09 +0000
@@ -18,12 +18,12 @@
 
 import bz2
 import os
+import Queue
 import struct
 import sys
 import tarfile
 import tempfile
 import threading
-import Queue
 
 from bzrlib import (
     errors,
@@ -420,10 +420,7 @@
         """StreamSink.insert_stream for a remote repository."""
         repository.lock_write()
         tokens = [token for token in resume_tokens.split(' ') if token]
-        if tokens:
-            repository.resume_write_group(tokens)
-        else:
-            repository.start_write_group()
+        self.tokens = tokens
         self.repository = repository
         self.stream_decoder = pack.ContainerPushParser()
         self.src_format = None
@@ -448,8 +445,13 @@
                     self.queue.put((substream_type, [record]))
 
     def _inserter_thread(self):
-        self.repository._get_sink().insert_stream(self.blocking_read_stream(),
-                self.src_format)
+        try:
+            self.insert_result = self.repository._get_sink().insert_stream(
+                self.blocking_read_stream(), self.src_format, self.tokens)
+            self.insert_ok = True
+        except:
+            self.insert_exception = sys.exc_info()
+            self.insert_ok = False
 
     def blocking_read_stream(self):
         while True:
@@ -463,32 +465,17 @@
         self.queue.put(StopIteration)
         if self.insert_thread is not None:
             self.insert_thread.join()
-        try:
-            missing_keys = set()
-            for prefix, versioned_file in (
-                ('texts', self.repository.texts),
-                ('inventories', self.repository.inventories),
-                ('revisions', self.repository.revisions),
-                ('signatures', self.repository.signatures),
-                ):
-                missing_keys.update((prefix,) + key for key in
-                    versioned_file.get_missing_compression_parent_keys())
-        except NotImplementedError:
-            # cannot even attempt suspending.
-            pass
+        if not self.insert_ok:
+            exc_info = self.insert_exception
+            raise exc_info[0], exc_info[1], exc_info[2]
+        write_group_tokens, missing_keys = self.insert_result
+        if write_group_tokens or missing_keys:
+            # bzip needed? missing keys should typically be a small set.
+            # Should this be a streaming body response ?
+            missing_keys = sorted(missing_keys)
+            bytes = bencode.bencode((write_group_tokens, missing_keys))
+            self.repository.unlock()
+            return SuccessfulSmartServerResponse(('missing-basis', bytes))
         else:
-            if missing_keys:
-                # suspend the write group and tell the caller what we is
-                # missing. We know we can suspend or else we would not have
-                # entered this code path. (All repositories that can handle
-                # missing keys can handle suspending a write group).
-                write_group_tokens = self.repository.suspend_write_group()
-                # bzip needed? missing keys should typically be a small set.
-                # Should this be a streaming body response ?
-                missing_keys = sorted(missing_keys)
-                bytes = bencode.bencode((write_group_tokens, missing_keys))
-                return SuccessfulSmartServerResponse(('missing-basis', bytes))
-        # All finished.
-        self.repository.commit_write_group()
-        self.repository.unlock()
-        return SuccessfulSmartServerResponse(('ok', ))
+            self.repository.unlock()
+            return SuccessfulSmartServerResponse(('ok', ))

=== modified file 'bzrlib/tests/blackbox/test_push.py'
--- a/bzrlib/tests/blackbox/test_push.py	2009-02-24 08:09:17 +0000
+++ b/bzrlib/tests/blackbox/test_push.py	2009-02-24 21:22:43 +0000
@@ -202,7 +202,7 @@
         # being too low. If rpc_count increases, more network roundtrips have
         # become necessary for this use case. Please do not adjust this number
         # upwards without agreement from bzr's network support maintainers.
-        self.assertEqual(36, rpc_count)
+        self.assertEqual(32, rpc_count)
 
     def test_push_smart_stacked_streaming_acceptance(self):
         self.setup_smart_server_with_call_log()
@@ -219,7 +219,7 @@
         # being too low. If rpc_count increases, more network roundtrips have
         # become necessary for this use case. Please do not adjust this number
         # upwards without agreement from bzr's network support maintainers.
-        self.assertEqual(65, rpc_count)
+        self.assertEqual(61, rpc_count)
         remote = Branch.open('public')
         self.assertEndsWith(remote.get_stacked_on_url(), '/parent')
 

=== modified file 'bzrlib/tests/branch_implementations/test_push.py'
--- a/bzrlib/tests/branch_implementations/test_push.py	2009-02-24 22:39:38 +0000
+++ b/bzrlib/tests/branch_implementations/test_push.py	2009-02-25 00:31:09 +0000
@@ -267,7 +267,7 @@
         # remote graph any further.
         self.assertEqual(
             ['Repository.insert_stream', 'Repository.insert_stream', 'get',
-             'delete', 'Branch.set_last_revision_info', 'Branch.unlock'],
+             'Branch.set_last_revision_info', 'Branch.unlock'],
             calls_after_insert_stream)
 
     def disableOptimisticGetParentMap(self):




More information about the bazaar-commits mailing list