Rev 4028: Cherrypick and polish the RemoteSink for streaming push. in http://people.ubuntu.com/~robertc/baz2.0/fetch.RemoteSink
Robert Collins
robertc at robertcollins.net
Fri Feb 20 08:26:53 GMT 2009
At http://people.ubuntu.com/~robertc/baz2.0/fetch.RemoteSink
------------------------------------------------------------
revno: 4028
revision-id: robertc at robertcollins.net-20090220082650-wmzch4en338bymkm
parent: robertc at robertcollins.net-20090220075016-18yco3ld54qc1188
committer: Robert Collins <robertc at robertcollins.net>
branch nick: fetch.RemoteSink
timestamp: Fri 2009-02-20 19:26:50 +1100
message:
Cherrypick and polish the RemoteSink for streaming push.
=== modified file 'NEWS'
--- a/NEWS 2009-02-20 07:50:16 +0000
+++ b/NEWS 2009-02-20 08:26:50 +0000
@@ -66,6 +66,9 @@
allowing for decoupling over a network connection. (Andrew Bennetts,
Robert Collins)
+ * There is a RemoteSink object which handles pushing to smart servers.
+ (Andrew Bennetts, Robert Collins)
+
* ``bzrlib.tests.run_suite`` accepts a runner_class parameter
supporting the use of different runners. (Robert Collins)
=== modified file 'bzrlib/remote.py'
--- a/bzrlib/remote.py 2009-02-20 07:13:04 +0000
+++ b/bzrlib/remote.py 2009-02-20 08:26:50 +0000
@@ -18,7 +18,6 @@
# across to run on the server.
import bz2
-import struct
from bzrlib import (
branch,
@@ -27,6 +26,7 @@
errors,
graph,
lockdir,
+ pack,
repository,
revision,
symbol_versioning,
@@ -43,6 +43,7 @@
from bzrlib.smart import client, vfs
from bzrlib.revision import ensure_null, NULL_REVISION
from bzrlib.trace import mutter, note, warning
+from bzrlib.versionedfile import record_to_fulltext_bytes
class _RpcHelper(object):
@@ -535,8 +536,7 @@
def _get_sink(self):
"""See Repository._get_sink()."""
- self._ensure_real()
- return self._real_repository._get_sink()
+ return RemoteStreamSink(self)
def has_revision(self, revision_id):
"""See Repository.has_revision()."""
@@ -1335,8 +1335,55 @@
raise errors.UnexpectedSmartServerResponse(response)
-def _length_prefix(bytes):
- return struct.pack('!L', len(bytes))
+class RemoteStreamSink(repository.StreamSink):
+
+ def _insert_real(self, stream, src_format):
+ self.target_repo._ensure_real()
+ sink = self.target_repo._real_repository._get_sink()
+ return sink.insert_stream(stream, src_format)
+
+ def insert_stream(self, stream, src_format):
+ repo = self.target_repo
+ # Until we can handle deltas in stack repositories we can't hand all
+ # the processing off to a remote server.
+ if self.target_repo._fallback_repositories:
+ return self._insert_real(stream, src_format)
+ client = repo._client
+ path = repo.bzrdir._path_for_remote_call(client)
+ byte_stream = self._stream_to_byte_stream(stream, src_format)
+ try:
+ response = client.call_with_body_stream(
+ ('Repository.insert_stream', path), byte_stream)
+ except errors.UnknownSmartMethod:
+ return self._insert_real(stream, src_format)
+
+ def _stream_to_byte_stream(self, stream, src_format):
+ bytes = []
+ pack_writer = pack.ContainerWriter(bytes.append)
+ pack_writer.begin()
+ pack_writer.add_bytes_record(src_format.network_name(), '')
+ adapters = {}
+ def get_adapter(adapter_key):
+ try:
+ return adapters[adapter_key]
+ except KeyError:
+ adapter_factory = adapter_registry.get(adapter_key)
+ adapter = adapter_factory(self)
+ adapters[adapter_key] = adapter
+ return adapter
+ for substream_type, substream in stream:
+ for record in substream:
+ if record.storage_kind in ('chunked', 'fulltext'):
+ serialised = record_to_fulltext_bytes(record)
+ else:
+ serialised = record.get_bytes_as(record.storage_kind)
+ pack_writer.add_bytes_record(serialised, [(substream_type,)])
+ for b in bytes:
+ yield b
+ del bytes[:]
+ pack_writer.end()
+ for b in bytes:
+ yield b
class RemoteBranchLockableFiles(LockableFiles):
=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py 2009-02-20 05:05:25 +0000
+++ b/bzrlib/repository.py 2009-02-20 08:26:50 +0000
@@ -3472,6 +3472,17 @@
def _autopack(self):
self.target.autopack()
+
+ @needs_write_lock
+ def fetch(self, revision_id=None, pb=None, find_ghosts=False):
+ """See InterRepository.fetch()."""
+ # Always fetch using the generic streaming fetch code, to allow
+ # streaming fetching into remote servers.
+ from bzrlib.fetch import RepoFetcher
+ fetcher = RepoFetcher(self.target, self.source, revision_id,
+ pb, find_ghosts)
+ self.target.autopack()
+ return fetcher.count_copied, fetcher.failed_revisions
def _get_target_pack_collection(self):
return self.target._real_repository._pack_collection
=== modified file 'bzrlib/smart/protocol.py'
--- a/bzrlib/smart/protocol.py 2009-02-02 05:56:34 +0000
+++ b/bzrlib/smart/protocol.py 2009-02-20 08:26:50 +0000
@@ -1247,11 +1247,13 @@
self._write_prefixed_body(part)
self.flush()
except Exception:
+ exc_info = sys.exc_info()
# Iterating the stream failed. Cleanly abort the request.
self._write_error_status()
# Currently the client unconditionally sends ('error',) as the
# error args.
self._write_structure(('error',))
+ raise exc_info[0], exc_info[1], exc_info[2]
self._write_end()
self._medium_request.finished_writing()
=== modified file 'bzrlib/smart/repository.py'
--- a/bzrlib/smart/repository.py 2009-02-20 03:28:07 +0000
+++ b/bzrlib/smart/repository.py 2009-02-20 08:26:50 +0000
@@ -18,13 +18,17 @@
import bz2
import os
+import struct
import sys
+import tarfile
import tempfile
-import tarfile
+import threading
+import Queue
from bzrlib import (
errors,
osutils,
+ pack,
)
from bzrlib.bzrdir import BzrDir
from bzrlib.smart.request import (
@@ -32,8 +36,10 @@
SmartServerRequest,
SuccessfulSmartServerResponse,
)
-from bzrlib.repository import _strip_NULL_ghosts
+from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
from bzrlib import revision as _mod_revision
+from bzrlib.util import bencode
+from bzrlib.versionedfile import NetworkRecordStream
class SmartServerRepositoryRequest(SmartServerRequest):
@@ -403,3 +409,55 @@
tarball.add(dirname, '.bzr') # recursive by default
finally:
tarball.close()
+
+
+class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
+
+ def do_repository_request(self, repository):
+ """StreamSink.insert_stream for a remote repository."""
+ repository.lock_write()
+ repository.start_write_group()
+ self.repository = repository
+ self.stream_decoder = pack.ContainerPushParser()
+ self.src_format = None
+ self.queue = Queue.Queue()
+ self.insert_thread = None
+
+ def do_chunk(self, body_stream_chunk):
+ self.stream_decoder.accept_bytes(body_stream_chunk)
+ for record in self.stream_decoder.read_pending_records():
+ record_names, record_bytes = record
+ if self.src_format is None:
+ src_format_name = record_bytes
+ src_format = network_format_registry.get(src_format_name)
+ self.src_format = src_format
+ self.insert_thread = threading.Thread(target=self._inserter_thread)
+ self.insert_thread.start()
+ else:
+ record_name, = record_names
+ substream_type = record_name[0]
+ stream = NetworkRecordStream([record_bytes])
+ for record in stream.read():
+ self.queue.put((substream_type, [record]))
+
+ def _inserter_thread(self):
+ self.repository._get_sink().insert_stream(self.blocking_read_stream(),
+ self.src_format)
+
+ def blocking_read_stream(self):
+ while True:
+ item = self.queue.get()
+ if item is StopIteration:
+ return
+ else:
+ yield item
+
+ def do_end(self):
+ self.queue.put(StopIteration)
+ if self.insert_thread is not None:
+ self.insert_thread.join()
+ self.repository.commit_write_group()
+ self.repository.unlock()
+ return SuccessfulSmartServerResponse(('ok', ))
+
+
=== modified file 'bzrlib/smart/request.py'
--- a/bzrlib/smart/request.py 2009-02-20 03:28:07 +0000
+++ b/bzrlib/smart/request.py 2009-02-20 08:26:50 +0000
@@ -450,6 +450,8 @@
request_handlers.register_lazy(
'Repository.has_revision', 'bzrlib.smart.repository', 'SmartServerRequestHasRevision')
request_handlers.register_lazy(
+ 'Repository.insert_stream', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream')
+request_handlers.register_lazy(
'Repository.is_shared', 'bzrlib.smart.repository', 'SmartServerRepositoryIsShared')
request_handlers.register_lazy(
'Repository.lock_write', 'bzrlib.smart.repository', 'SmartServerRepositoryLockWrite')
=== modified file 'bzrlib/tests/blackbox/test_push.py'
--- a/bzrlib/tests/blackbox/test_push.py 2009-02-20 03:29:37 +0000
+++ b/bzrlib/tests/blackbox/test_push.py 2009-02-20 08:26:50 +0000
@@ -202,7 +202,7 @@
# being too low. If rpc_count increases, more network roundtrips have
# become necessary for this use case. Please do not adjust this number
# upwards without agreement from bzr's network support maintainers.
- self.assertEqual(74, rpc_count)
+ self.assertEqual(60, rpc_count)
def test_push_smart_stacked_streaming_acceptance(self):
self.setup_smart_server_with_call_log()
@@ -219,7 +219,7 @@
# being too low. If rpc_count increases, more network roundtrips have
# become necessary for this use case. Please do not adjust this number
# upwards without agreement from bzr's network support maintainers.
- self.assertEqual(99, rpc_count)
+ self.assertEqual(101, rpc_count)
remote = Branch.open('public')
self.assertEndsWith(remote.get_stacked_on_url(), '/parent')
=== modified file 'bzrlib/versionedfile.py'
--- a/bzrlib/versionedfile.py 2009-02-20 06:00:22 +0000
+++ b/bzrlib/versionedfile.py 2009-02-20 08:26:50 +0000
@@ -22,6 +22,7 @@
from copy import copy
from cStringIO import StringIO
import os
+import struct
from zlib import adler32
from bzrlib.lazy_import import lazy_import
@@ -45,6 +46,7 @@
from bzrlib.registry import Registry
from bzrlib.symbol_versioning import *
from bzrlib.textmerge import TextMerge
+from bzrlib.util import bencode
adapter_registry = Registry()
@@ -1502,6 +1504,7 @@
'knit-annotated-ft-gz':knit.knit_network_to_record,
'knit-annotated-delta-gz':knit.knit_network_to_record,
'knit-delta-closure':knit.knit_delta_closure_to_records,
+ 'fulltext':fulltext_network_to_record,
}
def read(self):
@@ -1514,3 +1517,29 @@
for record in self._kind_factory[storage_kind](
storage_kind, bytes, line_end):
yield record
+
+
+def fulltext_network_to_record(kind, bytes, line_end):
+ """Convert a network fulltext record to record."""
+ meta_len, = struct.unpack('!L', bytes[line_end:line_end+4])
+ record_meta = record_bytes[line_end+4:line_end+4+meta_len]
+ key, parents = bencode.bdecode_as_tuple(record_meta)
+ if parents == 'nil':
+ parents = None
+ fulltext = record_bytes[line_end+4+meta_len:]
+ return FulltextContentFactory(key, parents, None, fulltext)
+
+
+def _length_prefix(bytes):
+ return struct.pack('!L', len(bytes))
+
+
+def record_to_fulltext_bytes(self, record):
+ if record.parents is None:
+ parents = 'nil'
+ else:
+ parents = record.parents
+ record_meta = bencode.bencode((record.key, parents))
+ record_content = record.get_bytes_as('fulltext')
+ return "fulltext\n%s%s%s" % (
+ _length_prefix(record_meta), record_meta, record_content)
More information about the bazaar-commits
mailing list