Rev 4028: (robertc) Add RemoteSink to support streaming push to non-stacked in file:///home/pqm/archives/thelove/bzr/%2Btrunk/
Canonical.com Patch Queue Manager
pqm at pqm.ubuntu.com
Fri Feb 20 22:53:41 GMT 2009
At file:///home/pqm/archives/thelove/bzr/%2Btrunk/
------------------------------------------------------------
revno: 4028
revision-id: pqm at pqm.ubuntu.com-20090220225337-14hb4kaptjpvwr0l
parent: pqm at pqm.ubuntu.com-20090220100854-p9g7snhipls2cj0z
parent: robertc at robertcollins.net-20090220221045-bmuh2t0wf09szn3z
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Fri 2009-02-20 22:53:37 +0000
message:
(robertc) Add RemoteSink to support streaming push to non-stacked
repositories over bzr+ssh:// (Andrew Bennetts, Robert Collins)
modified:
NEWS NEWS-20050323055033-4e00b5db738777ff
bzrlib/remote.py remote.py-20060720103555-yeeg2x51vn0rbtdp-1
bzrlib/repository.py rev_storage.py-20051111201905-119e9401e46257e3
bzrlib/smart/protocol.py protocol.py-20061108035435-ot0lstk2590yqhzr-1
bzrlib/smart/repository.py repository.py-20061128022038-vr5wy5bubyb8xttk-1
bzrlib/smart/request.py request.py-20061108095550-gunadhxmzkdjfeek-1
bzrlib/tests/blackbox/test_push.py test_push.py-20060329002750-929af230d5d22663
bzrlib/tests/test_smart_transport.py test_ssh_transport.py-20060608202016-c25gvf1ob7ypbus6-2
bzrlib/versionedfile.py versionedfile.py-20060222045106-5039c71ee3b65490
------------------------------------------------------------
revno: 4022.1.8
revision-id: robertc at robertcollins.net-20090220221045-bmuh2t0wf09szn3z
parent: robertc at robertcollins.net-20090220104851-m0s7qwe6jzqkgj1f
committer: Robert Collins <robertc at robertcollins.net>
branch nick: integration
timestamp: Sat 2009-02-21 09:10:45 +1100
message:
Fix test_call_with_body_stream_error which was broken by a debugging change to still pass tests.
modified:
bzrlib/smart/protocol.py protocol.py-20061108035435-ot0lstk2590yqhzr-1
bzrlib/tests/test_smart_transport.py test_ssh_transport.py-20060608202016-c25gvf1ob7ypbus6-2
------------------------------------------------------------
revno: 4022.1.7
revision-id: robertc at robertcollins.net-20090220104851-m0s7qwe6jzqkgj1f
parent: robertc at robertcollins.net-20090220082650-wmzch4en338bymkm
parent: pqm at pqm.ubuntu.com-20090220100854-p9g7snhipls2cj0z
committer: Robert Collins <robertc at robertcollins.net>
branch nick: integration
timestamp: Fri 2009-02-20 21:48:51 +1100
message:
Merge bzr.dev (avoids criss-cross for PQM.
modified:
bzrlib/errors.py errors.py-20050309040759-20512168c4e14fbd
bzrlib/knit.py knit.py-20051212171256-f056ac8f0fbe1bd9
bzrlib/remote.py remote.py-20060720103555-yeeg2x51vn0rbtdp-1
bzrlib/repofmt/pack_repo.py pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
bzrlib/repository.py rev_storage.py-20051111201905-119e9401e46257e3
bzrlib/tests/per_repository/test_write_group.py test_write_group.py-20070716105516-89n34xtogq5frn0m-1
bzrlib/tests/test_errors.py test_errors.py-20060210110251-41aba2deddf936a8
bzrlib/tests/test_pack_repository.py test_pack_repository-20080801043947-eaw0e6h2gu75kwmy-1
bzrlib/tests/test_versionedfile.py test_versionedfile.py-20060222045249-db45c9ed14a1c2e5
bzrlib/versionedfile.py versionedfile.py-20060222045106-5039c71ee3b65490
------------------------------------------------------------
revno: 4022.1.6
revision-id: robertc at robertcollins.net-20090220082650-wmzch4en338bymkm
parent: robertc at robertcollins.net-20090220075016-18yco3ld54qc1188
committer: Robert Collins <robertc at robertcollins.net>
branch nick: fetch.RemoteSink
timestamp: Fri 2009-02-20 19:26:50 +1100
message:
Cherrypick and polish the RemoteSink for streaming push.
modified:
NEWS NEWS-20050323055033-4e00b5db738777ff
bzrlib/remote.py remote.py-20060720103555-yeeg2x51vn0rbtdp-1
bzrlib/repository.py rev_storage.py-20051111201905-119e9401e46257e3
bzrlib/smart/protocol.py protocol.py-20061108035435-ot0lstk2590yqhzr-1
bzrlib/smart/repository.py repository.py-20061128022038-vr5wy5bubyb8xttk-1
bzrlib/smart/request.py request.py-20061108095550-gunadhxmzkdjfeek-1
bzrlib/tests/blackbox/test_push.py test_push.py-20060329002750-929af230d5d22663
bzrlib/versionedfile.py versionedfile.py-20060222045106-5039c71ee3b65490
------------------------------------------------------------
revno: 4022.1.5
revision-id: robertc at robertcollins.net-20090220075016-18yco3ld54qc1188
parent: robertc at robertcollins.net-20090220065237-vpfwvp6f3w01teu5
parent: pqm at pqm.ubuntu.com-20090220071304-mb95xwtanwl2bqa4
committer: Robert Collins <robertc at robertcollins.net>
branch nick: fetch.RemoteSink
timestamp: Fri 2009-02-20 18:50:16 +1100
message:
Merge bzr.dev.
modified:
NEWS NEWS-20050323055033-4e00b5db738777ff
bzrlib/bzrdir.py bzrdir.py-20060131065624-156dfea39c4387cb
bzrlib/remote.py remote.py-20060720103555-yeeg2x51vn0rbtdp-1
bzrlib/smart/bzrdir.py bzrdir.py-20061122024551-ol0l0o0oofsu9b3t-1
bzrlib/smart/medium.py medium.py-20061103051856-rgu2huy59fkz902q-1
bzrlib/smart/repository.py repository.py-20061128022038-vr5wy5bubyb8xttk-1
bzrlib/smart/request.py request.py-20061108095550-gunadhxmzkdjfeek-1
bzrlib/tests/blackbox/test_push.py test_push.py-20060329002750-929af230d5d22663
bzrlib/tests/blackbox/test_shared_repository.py test_shared_repository.py-20060317053531-ed30c0d79325e483
bzrlib/tests/test_remote.py test_remote.py-20060720103555-yeeg2x51vn0rbtdp-2
bzrlib/tests/test_smart.py test_smart.py-20061122024551-ol0l0o0oofsu9b3t-2
------------------------------------------------------------
revno: 4022.1.4
revision-id: robertc at robertcollins.net-20090220065237-vpfwvp6f3w01teu5
parent: robertc at robertcollins.net-20090220062731-peh7eb800gvgb08h
parent: robertc at robertcollins.net-20090220064830-wyj52q6rl2gygqsa
committer: Robert Collins <robertc at robertcollins.net>
branch nick: fetch.RemoteSink
timestamp: Fri 2009-02-20 17:52:37 +1100
message:
Merge network serialisation of record streams.
modified:
NEWS NEWS-20050323055033-4e00b5db738777ff
bzrlib/knit.py knit.py-20051212171256-f056ac8f0fbe1bd9
bzrlib/tests/test_knit.py test_knit.py-20051212171302-95d4c00dd5f11f2b
bzrlib/tests/test_versionedfile.py test_versionedfile.py-20060222045249-db45c9ed14a1c2e5
bzrlib/versionedfile.py versionedfile.py-20060222045106-5039c71ee3b65490
bzrlib/weave.py knit.py-20050627021749-759c29984154256b
=== modified file 'NEWS'
--- a/NEWS 2009-02-20 08:40:02 +0000
+++ b/NEWS 2009-02-20 10:48:51 +0000
@@ -66,6 +66,9 @@
allowing for decoupling over a network connection. (Andrew Bennetts,
Robert Collins)
+ * There is a RemoteSink object which handles pushing to smart servers.
+ (Andrew Bennetts, Robert Collins)
+
* ``bzrlib.tests.run_suite`` accepts a runner_class parameter
supporting the use of different runners. (Robert Collins)
=== modified file 'bzrlib/remote.py'
--- a/bzrlib/remote.py 2009-02-20 09:28:11 +0000
+++ b/bzrlib/remote.py 2009-02-20 10:48:51 +0000
@@ -18,7 +18,6 @@
# across to run on the server.
import bz2
-import struct
from bzrlib import (
branch,
@@ -27,6 +26,7 @@
errors,
graph,
lockdir,
+ pack,
repository,
revision,
symbol_versioning,
@@ -43,6 +43,7 @@
from bzrlib.smart import client, vfs
from bzrlib.revision import ensure_null, NULL_REVISION
from bzrlib.trace import mutter, note, warning
+from bzrlib.versionedfile import record_to_fulltext_bytes
class _RpcHelper(object):
@@ -543,8 +544,7 @@
def _get_sink(self):
"""See Repository._get_sink()."""
- self._ensure_real()
- return self._real_repository._get_sink()
+ return RemoteStreamSink(self)
def has_revision(self, revision_id):
"""See Repository.has_revision()."""
@@ -1343,8 +1343,55 @@
raise errors.UnexpectedSmartServerResponse(response)
-def _length_prefix(bytes):
- return struct.pack('!L', len(bytes))
+class RemoteStreamSink(repository.StreamSink):
+
+ def _insert_real(self, stream, src_format):
+ self.target_repo._ensure_real()
+ sink = self.target_repo._real_repository._get_sink()
+ return sink.insert_stream(stream, src_format)
+
+ def insert_stream(self, stream, src_format):
+ repo = self.target_repo
+ # Until we can handle deltas in stack repositories we can't hand all
+ # the processing off to a remote server.
+ if self.target_repo._fallback_repositories:
+ return self._insert_real(stream, src_format)
+ client = repo._client
+ path = repo.bzrdir._path_for_remote_call(client)
+ byte_stream = self._stream_to_byte_stream(stream, src_format)
+ try:
+ response = client.call_with_body_stream(
+ ('Repository.insert_stream', path), byte_stream)
+ except errors.UnknownSmartMethod:
+ return self._insert_real(stream, src_format)
+
+ def _stream_to_byte_stream(self, stream, src_format):
+ bytes = []
+ pack_writer = pack.ContainerWriter(bytes.append)
+ pack_writer.begin()
+ pack_writer.add_bytes_record(src_format.network_name(), '')
+ adapters = {}
+ def get_adapter(adapter_key):
+ try:
+ return adapters[adapter_key]
+ except KeyError:
+ adapter_factory = adapter_registry.get(adapter_key)
+ adapter = adapter_factory(self)
+ adapters[adapter_key] = adapter
+ return adapter
+ for substream_type, substream in stream:
+ for record in substream:
+ if record.storage_kind in ('chunked', 'fulltext'):
+ serialised = record_to_fulltext_bytes(record)
+ else:
+ serialised = record.get_bytes_as(record.storage_kind)
+ pack_writer.add_bytes_record(serialised, [(substream_type,)])
+ for b in bytes:
+ yield b
+ del bytes[:]
+ pack_writer.end()
+ for b in bytes:
+ yield b
class RemoteBranchLockableFiles(LockableFiles):
=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py 2009-02-20 09:28:11 +0000
+++ b/bzrlib/repository.py 2009-02-20 10:48:51 +0000
@@ -3487,6 +3487,17 @@
def _autopack(self):
self.target.autopack()
+
+ @needs_write_lock
+ def fetch(self, revision_id=None, pb=None, find_ghosts=False):
+ """See InterRepository.fetch()."""
+ # Always fetch using the generic streaming fetch code, to allow
+ # streaming fetching into remote servers.
+ from bzrlib.fetch import RepoFetcher
+ fetcher = RepoFetcher(self.target, self.source, revision_id,
+ pb, find_ghosts)
+ self.target.autopack()
+ return fetcher.count_copied, fetcher.failed_revisions
def _get_target_pack_collection(self):
return self.target._real_repository._pack_collection
=== modified file 'bzrlib/smart/protocol.py'
--- a/bzrlib/smart/protocol.py 2009-02-02 05:56:34 +0000
+++ b/bzrlib/smart/protocol.py 2009-02-20 22:10:45 +0000
@@ -1247,11 +1247,15 @@
self._write_prefixed_body(part)
self.flush()
except Exception:
+ exc_info = sys.exc_info()
# Iterating the stream failed. Cleanly abort the request.
self._write_error_status()
# Currently the client unconditionally sends ('error',) as the
# error args.
self._write_structure(('error',))
+ self._write_end()
+ self._medium_request.finished_writing()
+ raise exc_info[0], exc_info[1], exc_info[2]
self._write_end()
self._medium_request.finished_writing()
=== modified file 'bzrlib/smart/repository.py'
--- a/bzrlib/smart/repository.py 2009-02-20 03:28:07 +0000
+++ b/bzrlib/smart/repository.py 2009-02-20 08:26:50 +0000
@@ -18,13 +18,17 @@
import bz2
import os
+import struct
import sys
+import tarfile
import tempfile
-import tarfile
+import threading
+import Queue
from bzrlib import (
errors,
osutils,
+ pack,
)
from bzrlib.bzrdir import BzrDir
from bzrlib.smart.request import (
@@ -32,8 +36,10 @@
SmartServerRequest,
SuccessfulSmartServerResponse,
)
-from bzrlib.repository import _strip_NULL_ghosts
+from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
from bzrlib import revision as _mod_revision
+from bzrlib.util import bencode
+from bzrlib.versionedfile import NetworkRecordStream
class SmartServerRepositoryRequest(SmartServerRequest):
@@ -403,3 +409,55 @@
tarball.add(dirname, '.bzr') # recursive by default
finally:
tarball.close()
+
+
+class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
+
+ def do_repository_request(self, repository):
+ """StreamSink.insert_stream for a remote repository."""
+ repository.lock_write()
+ repository.start_write_group()
+ self.repository = repository
+ self.stream_decoder = pack.ContainerPushParser()
+ self.src_format = None
+ self.queue = Queue.Queue()
+ self.insert_thread = None
+
+ def do_chunk(self, body_stream_chunk):
+ self.stream_decoder.accept_bytes(body_stream_chunk)
+ for record in self.stream_decoder.read_pending_records():
+ record_names, record_bytes = record
+ if self.src_format is None:
+ src_format_name = record_bytes
+ src_format = network_format_registry.get(src_format_name)
+ self.src_format = src_format
+ self.insert_thread = threading.Thread(target=self._inserter_thread)
+ self.insert_thread.start()
+ else:
+ record_name, = record_names
+ substream_type = record_name[0]
+ stream = NetworkRecordStream([record_bytes])
+ for record in stream.read():
+ self.queue.put((substream_type, [record]))
+
+ def _inserter_thread(self):
+ self.repository._get_sink().insert_stream(self.blocking_read_stream(),
+ self.src_format)
+
+ def blocking_read_stream(self):
+ while True:
+ item = self.queue.get()
+ if item is StopIteration:
+ return
+ else:
+ yield item
+
+ def do_end(self):
+ self.queue.put(StopIteration)
+ if self.insert_thread is not None:
+ self.insert_thread.join()
+ self.repository.commit_write_group()
+ self.repository.unlock()
+ return SuccessfulSmartServerResponse(('ok', ))
+
+
=== modified file 'bzrlib/smart/request.py'
--- a/bzrlib/smart/request.py 2009-02-20 03:28:07 +0000
+++ b/bzrlib/smart/request.py 2009-02-20 08:26:50 +0000
@@ -450,6 +450,8 @@
request_handlers.register_lazy(
'Repository.has_revision', 'bzrlib.smart.repository', 'SmartServerRequestHasRevision')
request_handlers.register_lazy(
+ 'Repository.insert_stream', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream')
+request_handlers.register_lazy(
'Repository.is_shared', 'bzrlib.smart.repository', 'SmartServerRepositoryIsShared')
request_handlers.register_lazy(
'Repository.lock_write', 'bzrlib.smart.repository', 'SmartServerRepositoryLockWrite')
=== modified file 'bzrlib/tests/blackbox/test_push.py'
--- a/bzrlib/tests/blackbox/test_push.py 2009-02-20 03:29:37 +0000
+++ b/bzrlib/tests/blackbox/test_push.py 2009-02-20 08:26:50 +0000
@@ -202,7 +202,7 @@
# being too low. If rpc_count increases, more network roundtrips have
# become necessary for this use case. Please do not adjust this number
# upwards without agreement from bzr's network support maintainers.
- self.assertEqual(74, rpc_count)
+ self.assertEqual(60, rpc_count)
def test_push_smart_stacked_streaming_acceptance(self):
self.setup_smart_server_with_call_log()
@@ -219,7 +219,7 @@
# being too low. If rpc_count increases, more network roundtrips have
# become necessary for this use case. Please do not adjust this number
# upwards without agreement from bzr's network support maintainers.
- self.assertEqual(99, rpc_count)
+ self.assertEqual(101, rpc_count)
remote = Branch.open('public')
self.assertEndsWith(remote.get_stacked_on_url(), '/parent')
=== modified file 'bzrlib/tests/test_smart_transport.py'
--- a/bzrlib/tests/test_smart_transport.py 2009-02-11 09:45:27 +0000
+++ b/bzrlib/tests/test_smart_transport.py 2009-02-20 22:10:45 +0000
@@ -2766,7 +2766,8 @@
yield 'aaa'
yield 'bbb'
raise Exception('Boom!')
- requester.call_with_body_stream(('one arg',), stream_that_fails())
+ self.assertRaises(Exception, requester.call_with_body_stream,
+ ('one arg',), stream_that_fails())
self.assertEquals(
'bzr message 3 (bzr 1.6)\n' # protocol version
'\x00\x00\x00\x02de' # headers
=== modified file 'bzrlib/versionedfile.py'
--- a/bzrlib/versionedfile.py 2009-02-20 08:40:02 +0000
+++ b/bzrlib/versionedfile.py 2009-02-20 10:48:51 +0000
@@ -22,6 +22,7 @@
from copy import copy
from cStringIO import StringIO
import os
+import struct
from zlib import adler32
from bzrlib.lazy_import import lazy_import
@@ -45,6 +46,7 @@
from bzrlib.registry import Registry
from bzrlib.symbol_versioning import *
from bzrlib.textmerge import TextMerge
+from bzrlib.util import bencode
adapter_registry = Registry()
@@ -1514,6 +1516,7 @@
'knit-annotated-ft-gz':knit.knit_network_to_record,
'knit-annotated-delta-gz':knit.knit_network_to_record,
'knit-delta-closure':knit.knit_delta_closure_to_records,
+ 'fulltext':fulltext_network_to_record,
}
def read(self):
@@ -1526,3 +1529,29 @@
for record in self._kind_factory[storage_kind](
storage_kind, bytes, line_end):
yield record
+
+
+def fulltext_network_to_record(kind, bytes, line_end):
+ """Convert a network fulltext record to record."""
+ meta_len, = struct.unpack('!L', bytes[line_end:line_end+4])
+ record_meta = record_bytes[line_end+4:line_end+4+meta_len]
+ key, parents = bencode.bdecode_as_tuple(record_meta)
+ if parents == 'nil':
+ parents = None
+ fulltext = record_bytes[line_end+4+meta_len:]
+ return FulltextContentFactory(key, parents, None, fulltext)
+
+
+def _length_prefix(bytes):
+ return struct.pack('!L', len(bytes))
+
+
+def record_to_fulltext_bytes(self, record):
+ if record.parents is None:
+ parents = 'nil'
+ else:
+ parents = record.parents
+ record_meta = bencode.bencode((record.key, parents))
+ record_content = record.get_bytes_as('fulltext')
+ return "fulltext\n%s%s%s" % (
+ _length_prefix(record_meta), record_meta, record_content)
More information about the bazaar-commits
mailing list