Rev 4044: (robertc) Move write locking and write group responsibilities into in file:///home/pqm/archives/thelove/bzr/%2Btrunk/
Canonical.com Patch Queue Manager
pqm at pqm.ubuntu.com
Wed Feb 25 01:12:00 GMT 2009
At file:///home/pqm/archives/thelove/bzr/%2Btrunk/
------------------------------------------------------------
revno: 4044
revision-id: pqm at pqm.ubuntu.com-20090225011157-xn0w5zux5mrges6j
parent: pqm at pqm.ubuntu.com-20090225000405-09p33ue22l4h19yk
parent: robertc at robertcollins.net-20090225003109-9ngqolksoevjw5ay
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Wed 2009-02-25 01:11:57 +0000
message:
(robertc) Move write locking and write group responsibilities into
the Sink objects themselves,
allowing complete avoidance of unnecessary calls when the sink is a
RemoteSink. (Robert Collins)
modified:
bzrlib/fetch.py fetch.py-20050818234941-26fea6105696365d
bzrlib/remote.py remote.py-20060720103555-yeeg2x51vn0rbtdp-1
bzrlib/repository.py rev_storage.py-20051111201905-119e9401e46257e3
bzrlib/smart/repository.py repository.py-20061128022038-vr5wy5bubyb8xttk-1
bzrlib/tests/blackbox/test_push.py test_push.py-20060329002750-929af230d5d22663
bzrlib/tests/branch_implementations/test_push.py test_push.py-20070130153159-fhfap8uoifevg30j-1
------------------------------------------------------------
revno: 4032.3.8
revision-id: robertc at robertcollins.net-20090225003109-9ngqolksoevjw5ay
parent: robertc at robertcollins.net-20090224212243-wpleuzp5uin2h89p
parent: pqm at pqm.ubuntu.com-20090225000405-09p33ue22l4h19yk
committer: Robert Collins <robertc at robertcollins.net>
branch nick: integration
timestamp: Wed 2009-02-25 11:31:09 +1100
message:
Merge bzr.dev, adjust effort tests reducing the effort in line with the write_group avoidance.
modified:
NEWS NEWS-20050323055033-4e00b5db738777ff
bzrlib/remote.py remote.py-20060720103555-yeeg2x51vn0rbtdp-1
bzrlib/repository.py rev_storage.py-20051111201905-119e9401e46257e3
bzrlib/smart/repository.py repository.py-20061128022038-vr5wy5bubyb8xttk-1
bzrlib/tests/branch_implementations/test_push.py test_push.py-20070130153159-fhfap8uoifevg30j-1
bzrlib/tests/per_repository/test_repository.py test_repository.py-20060131092128-ad07f494f5c9d26c
bzrlib/tests/test_read_bundle.py test_read_bundle.py-20060615211421-ud8cwr1ulgd914zf-1
setup.py setup.py-20050314065409-02f8a0a6e3f9bc70
tools/win32/build_release.py build_release.py-20081105204355-2ghh5cv01v1x4rzz-1
------------------------------------------------------------
revno: 4032.3.7
revision-id: robertc at robertcollins.net-20090224212243-wpleuzp5uin2h89p
parent: robertc at robertcollins.net-20090224091304-k97x0yqk5yjy8jbl
committer: Robert Collins <robertc at robertcollins.net>
branch nick: push.roundtrips
timestamp: Wed 2009-02-25 08:22:43 +1100
message:
Move write locking and write group responsibilities into the Sink objects themselves, allowing complete avoidance of unnecessary calls when the sink is a RemoteSink.
modified:
bzrlib/fetch.py fetch.py-20050818234941-26fea6105696365d
bzrlib/remote.py remote.py-20060720103555-yeeg2x51vn0rbtdp-1
bzrlib/repository.py rev_storage.py-20051111201905-119e9401e46257e3
bzrlib/smart/repository.py repository.py-20061128022038-vr5wy5bubyb8xttk-1
bzrlib/tests/blackbox/test_push.py test_push.py-20060329002750-929af230d5d22663
=== modified file 'bzrlib/fetch.py'
--- a/bzrlib/fetch.py 2009-02-23 15:29:35 +0000
+++ b/bzrlib/fetch.py 2009-02-24 21:22:43 +0000
@@ -104,22 +104,11 @@
self.nested_pb = None
self.from_repository.lock_read()
try:
- self.to_repository.lock_write()
try:
- self.to_repository.start_write_group()
- try:
- self.__fetch()
- except:
- self.to_repository.abort_write_group(suppress_errors=True)
- raise
- else:
- self.to_repository.commit_write_group()
+ self.__fetch()
finally:
- try:
- if self.nested_pb is not None:
- self.nested_pb.finished()
- finally:
- self.to_repository.unlock()
+ if self.nested_pb is not None:
+ self.nested_pb.finished()
finally:
self.from_repository.unlock()
@@ -161,14 +150,20 @@
try:
from_format = self.from_repository._format
stream = self.get_stream(search, pp)
- missing_keys = self.sink.insert_stream(stream, from_format)
+ resume_tokens, missing_keys = self.sink.insert_stream(
+ stream, from_format, [])
if missing_keys:
stream = self.get_stream_for_missing_keys(missing_keys)
- missing_keys = self.sink.insert_stream(stream, from_format)
+ resume_tokens, missing_keys = self.sink.insert_stream(
+ stream, from_format, resume_tokens)
if missing_keys:
raise AssertionError(
"second push failed to complete a fetch %r." % (
missing_keys,))
+ if resume_tokens:
+ raise AssertionError(
+ "second push failed to commit the fetch %r." % (
+ resume_tokens,))
self.sink.finished()
finally:
if self.pb is not None:
@@ -195,7 +190,6 @@
revisions])
elif knit_kind == "inventory":
# Now copy the file texts.
- to_texts = self.to_repository.texts
from_texts = self.from_repository.texts
yield ('texts', from_texts.get_record_stream(
text_keys, self.to_repository._fetch_order,
=== modified file 'bzrlib/remote.py'
--- a/bzrlib/remote.py 2009-02-24 23:19:53 +0000
+++ b/bzrlib/remote.py 2009-02-25 00:31:09 +0000
@@ -1372,25 +1372,24 @@
def __init__(self, target_repo):
repository.StreamSink.__init__(self, target_repo)
- self._resume_tokens = []
- def _insert_real(self, stream, src_format):
+ def _insert_real(self, stream, src_format, resume_tokens):
self.target_repo._ensure_real()
sink = self.target_repo._real_repository._get_sink()
- result = sink.insert_stream(stream, src_format)
+ result = sink.insert_stream(stream, src_format, resume_tokens)
if not result:
self.target_repo.autopack()
return result
- def insert_stream(self, stream, src_format):
+ def insert_stream(self, stream, src_format, resume_tokens):
repo = self.target_repo
client = repo._client
medium = client._medium
if medium._is_remote_before((1, 13)):
# No possible way this can work.
- return self._insert_real(stream, src_format)
+ return self._insert_real(stream, src_format, resume_tokens)
path = repo.bzrdir._path_for_remote_call(client)
- if not self._resume_tokens:
+ if not resume_tokens:
# XXX: Ugly but important for correctness, *will* be fixed during
# 1.13 cycle. Pushing a stream that is interrupted results in a
# fallback to the _real_repositories sink *with a partial stream*.
@@ -1401,29 +1400,28 @@
# buffering etc.
byte_stream = self._stream_to_byte_stream([], src_format)
try:
- resume_tokens = ''
response = client.call_with_body_stream(
- ('Repository.insert_stream', path, resume_tokens), byte_stream)
+ ('Repository.insert_stream', path, ''), byte_stream)
except errors.UnknownSmartMethod:
medium._remember_remote_is_before((1,13))
- return self._insert_real(stream, src_format)
+ return self._insert_real(stream, src_format, resume_tokens)
byte_stream = self._stream_to_byte_stream(stream, src_format)
- resume_tokens = ' '.join(self._resume_tokens)
+ resume_tokens = ' '.join(resume_tokens)
response = client.call_with_body_stream(
('Repository.insert_stream', path, resume_tokens), byte_stream)
if response[0][0] not in ('ok', 'missing-basis'):
raise errors.UnexpectedSmartServerResponse(response)
if response[0][0] == 'missing-basis':
tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
- self._resume_tokens = tokens
- return missing_keys
+ resume_tokens = tokens
+ return resume_tokens, missing_keys
else:
if self.target_repo._real_repository is not None:
collection = getattr(self.target_repo._real_repository,
'_pack_collection', None)
if collection is not None:
collection.reload_pack_names()
- return []
+ return [], set()
def _stream_to_byte_stream(self, stream, src_format):
bytes = []
=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py 2009-02-24 22:19:10 +0000
+++ b/bzrlib/repository.py 2009-02-25 00:31:09 +0000
@@ -3685,15 +3685,30 @@
def __init__(self, target_repo):
self.target_repo = target_repo
- def insert_stream(self, stream, src_format):
+ def insert_stream(self, stream, src_format, resume_tokens):
"""Insert a stream's content into the target repository.
:param src_format: a bzr repository format.
- :return: an iterable of keys additional items required before the
- insertion can be completed.
+ :return: a list of resume tokens and an iterable of keys additional
+ items required before the insertion can be completed.
"""
- result = []
+ self.target_repo.lock_write()
+ try:
+ if resume_tokens:
+ self.target_repo.resume_write_group(resume_tokens)
+ else:
+ self.target_repo.start_write_group()
+ try:
+ # locked_insert_stream performs a commit|suspend.
+ return self._locked_insert_stream(stream, src_format)
+ except:
+ self.target_repo.abort_write_group(suppress_errors=True)
+ raise
+ finally:
+ self.target_repo.unlock()
+
+ def _locked_insert_stream(self, stream, src_format):
to_serializer = self.target_repo._format._serializer
src_serializer = src_format._serializer
for substream_type, substream in stream:
@@ -3720,7 +3735,30 @@
self.target_repo.signatures.insert_record_stream(substream)
else:
raise AssertionError('kaboom! %s' % (substream_type,))
- return result
+ try:
+ missing_keys = set()
+ for prefix, versioned_file in (
+ ('texts', self.target_repo.texts),
+ ('inventories', self.target_repo.inventories),
+ ('revisions', self.target_repo.revisions),
+ ('signatures', self.target_repo.signatures),
+ ):
+ missing_keys.update((prefix,) + key for key in
+ versioned_file.get_missing_compression_parent_keys())
+ except NotImplementedError:
+ # cannot even attempt suspending, and missing would have failed
+ # during stream insertion.
+ missing_keys = set()
+ else:
+ if missing_keys:
+ # suspend the write group and tell the caller what we is
+ # missing. We know we can suspend or else we would not have
+ # entered this code path. (All repositories that can handle
+ # missing keys can handle suspending a write group).
+ write_group_tokens = self.target_repo.suspend_write_group()
+ return write_group_tokens, missing_keys
+ self.target_repo.commit_write_group()
+ return [], set()
def _extract_and_insert_inventories(self, substream, serializer):
"""Generate a new inventory versionedfile in target, converting data.
=== modified file 'bzrlib/smart/repository.py'
--- a/bzrlib/smart/repository.py 2009-02-24 06:02:51 +0000
+++ b/bzrlib/smart/repository.py 2009-02-25 00:31:09 +0000
@@ -18,12 +18,12 @@
import bz2
import os
+import Queue
import struct
import sys
import tarfile
import tempfile
import threading
-import Queue
from bzrlib import (
errors,
@@ -420,10 +420,7 @@
"""StreamSink.insert_stream for a remote repository."""
repository.lock_write()
tokens = [token for token in resume_tokens.split(' ') if token]
- if tokens:
- repository.resume_write_group(tokens)
- else:
- repository.start_write_group()
+ self.tokens = tokens
self.repository = repository
self.stream_decoder = pack.ContainerPushParser()
self.src_format = None
@@ -448,8 +445,13 @@
self.queue.put((substream_type, [record]))
def _inserter_thread(self):
- self.repository._get_sink().insert_stream(self.blocking_read_stream(),
- self.src_format)
+ try:
+ self.insert_result = self.repository._get_sink().insert_stream(
+ self.blocking_read_stream(), self.src_format, self.tokens)
+ self.insert_ok = True
+ except:
+ self.insert_exception = sys.exc_info()
+ self.insert_ok = False
def blocking_read_stream(self):
while True:
@@ -463,32 +465,17 @@
self.queue.put(StopIteration)
if self.insert_thread is not None:
self.insert_thread.join()
- try:
- missing_keys = set()
- for prefix, versioned_file in (
- ('texts', self.repository.texts),
- ('inventories', self.repository.inventories),
- ('revisions', self.repository.revisions),
- ('signatures', self.repository.signatures),
- ):
- missing_keys.update((prefix,) + key for key in
- versioned_file.get_missing_compression_parent_keys())
- except NotImplementedError:
- # cannot even attempt suspending.
- pass
+ if not self.insert_ok:
+ exc_info = self.insert_exception
+ raise exc_info[0], exc_info[1], exc_info[2]
+ write_group_tokens, missing_keys = self.insert_result
+ if write_group_tokens or missing_keys:
+ # bzip needed? missing keys should typically be a small set.
+ # Should this be a streaming body response ?
+ missing_keys = sorted(missing_keys)
+ bytes = bencode.bencode((write_group_tokens, missing_keys))
+ self.repository.unlock()
+ return SuccessfulSmartServerResponse(('missing-basis', bytes))
else:
- if missing_keys:
- # suspend the write group and tell the caller what we is
- # missing. We know we can suspend or else we would not have
- # entered this code path. (All repositories that can handle
- # missing keys can handle suspending a write group).
- write_group_tokens = self.repository.suspend_write_group()
- # bzip needed? missing keys should typically be a small set.
- # Should this be a streaming body response ?
- missing_keys = sorted(missing_keys)
- bytes = bencode.bencode((write_group_tokens, missing_keys))
- return SuccessfulSmartServerResponse(('missing-basis', bytes))
- # All finished.
- self.repository.commit_write_group()
- self.repository.unlock()
- return SuccessfulSmartServerResponse(('ok', ))
+ self.repository.unlock()
+ return SuccessfulSmartServerResponse(('ok', ))
=== modified file 'bzrlib/tests/blackbox/test_push.py'
--- a/bzrlib/tests/blackbox/test_push.py 2009-02-24 08:09:17 +0000
+++ b/bzrlib/tests/blackbox/test_push.py 2009-02-24 21:22:43 +0000
@@ -202,7 +202,7 @@
# being too low. If rpc_count increases, more network roundtrips have
# become necessary for this use case. Please do not adjust this number
# upwards without agreement from bzr's network support maintainers.
- self.assertEqual(36, rpc_count)
+ self.assertEqual(32, rpc_count)
def test_push_smart_stacked_streaming_acceptance(self):
self.setup_smart_server_with_call_log()
@@ -219,7 +219,7 @@
# being too low. If rpc_count increases, more network roundtrips have
# become necessary for this use case. Please do not adjust this number
# upwards without agreement from bzr's network support maintainers.
- self.assertEqual(65, rpc_count)
+ self.assertEqual(61, rpc_count)
remote = Branch.open('public')
self.assertEndsWith(remote.get_stacked_on_url(), '/parent')
=== modified file 'bzrlib/tests/branch_implementations/test_push.py'
--- a/bzrlib/tests/branch_implementations/test_push.py 2009-02-24 22:39:38 +0000
+++ b/bzrlib/tests/branch_implementations/test_push.py 2009-02-25 00:31:09 +0000
@@ -267,7 +267,7 @@
# remote graph any further.
self.assertEqual(
['Repository.insert_stream', 'Repository.insert_stream', 'get',
- 'delete', 'Branch.set_last_revision_info', 'Branch.unlock'],
+ 'Branch.set_last_revision_info', 'Branch.unlock'],
calls_after_insert_stream)
def disableOptimisticGetParentMap(self):
More information about the bazaar-commits
mailing list