Rev 4028: Cherrypick and polish the RemoteSink for streaming push. in http://people.ubuntu.com/~robertc/baz2.0/fetch.RemoteSink

Robert Collins robertc at robertcollins.net
Fri Feb 20 08:26:53 GMT 2009


At http://people.ubuntu.com/~robertc/baz2.0/fetch.RemoteSink

------------------------------------------------------------
revno: 4028
revision-id: robertc at robertcollins.net-20090220082650-wmzch4en338bymkm
parent: robertc at robertcollins.net-20090220075016-18yco3ld54qc1188
committer: Robert Collins <robertc at robertcollins.net>
branch nick: fetch.RemoteSink
timestamp: Fri 2009-02-20 19:26:50 +1100
message:
  Cherrypick and polish the RemoteSink for streaming push.
=== modified file 'NEWS'
--- a/NEWS	2009-02-20 07:50:16 +0000
+++ b/NEWS	2009-02-20 08:26:50 +0000
@@ -66,6 +66,9 @@
       allowing for decoupling over a network connection. (Andrew Bennetts,
       Robert Collins)
 
+    * There is a RemoteSink object which handles pushing to smart servers.
+      (Andrew Bennetts, Robert Collins)
+
     * ``bzrlib.tests.run_suite`` accepts a runner_class parameter
       supporting the use of different runners. (Robert Collins)
 

=== modified file 'bzrlib/remote.py'
--- a/bzrlib/remote.py	2009-02-20 07:13:04 +0000
+++ b/bzrlib/remote.py	2009-02-20 08:26:50 +0000
@@ -18,7 +18,6 @@
 # across to run on the server.
 
 import bz2
-import struct
 
 from bzrlib import (
     branch,
@@ -27,6 +26,7 @@
     errors,
     graph,
     lockdir,
+    pack,
     repository,
     revision,
     symbol_versioning,
@@ -43,6 +43,7 @@
 from bzrlib.smart import client, vfs
 from bzrlib.revision import ensure_null, NULL_REVISION
 from bzrlib.trace import mutter, note, warning
+from bzrlib.versionedfile import record_to_fulltext_bytes
 
 
 class _RpcHelper(object):
@@ -535,8 +536,7 @@
 
     def _get_sink(self):
         """See Repository._get_sink()."""
-        self._ensure_real()
-        return self._real_repository._get_sink()
+        return RemoteStreamSink(self)
 
     def has_revision(self, revision_id):
         """See Repository.has_revision()."""
@@ -1335,8 +1335,55 @@
             raise errors.UnexpectedSmartServerResponse(response)
 
 
-def _length_prefix(bytes):
-    return struct.pack('!L', len(bytes))
+class RemoteStreamSink(repository.StreamSink):
+
+    def _insert_real(self, stream, src_format):
+        self.target_repo._ensure_real()
+        sink = self.target_repo._real_repository._get_sink()
+        return sink.insert_stream(stream, src_format)
+
+    def insert_stream(self, stream, src_format):
+        repo = self.target_repo
+        # Until we can handle deltas in stack repositories we can't hand all
+        # the processing off to a remote server.
+        if self.target_repo._fallback_repositories:
+            return self._insert_real(stream, src_format)
+        client = repo._client
+        path = repo.bzrdir._path_for_remote_call(client)
+        byte_stream = self._stream_to_byte_stream(stream, src_format)
+        try:
+            response = client.call_with_body_stream(
+                ('Repository.insert_stream', path), byte_stream)
+        except errors.UnknownSmartMethod:
+            return self._insert_real(stream, src_format)
+            
+    def _stream_to_byte_stream(self, stream, src_format):
+        bytes = []
+        pack_writer = pack.ContainerWriter(bytes.append)
+        pack_writer.begin()
+        pack_writer.add_bytes_record(src_format.network_name(), '')
+        adapters = {}
+        def get_adapter(adapter_key):
+            try:
+                return adapters[adapter_key]
+            except KeyError:
+                adapter_factory = adapter_registry.get(adapter_key)
+                adapter = adapter_factory(self)
+                adapters[adapter_key] = adapter
+                return adapter
+        for substream_type, substream in stream:
+            for record in substream:
+                if record.storage_kind in ('chunked', 'fulltext'):
+                    serialised = record_to_fulltext_bytes(record)
+                else:
+                    serialised = record.get_bytes_as(record.storage_kind)
+                pack_writer.add_bytes_record(serialised, [(substream_type,)])
+                for b in bytes:
+                    yield b
+                del bytes[:]
+        pack_writer.end()
+        for b in bytes:
+            yield b
 
 
 class RemoteBranchLockableFiles(LockableFiles):

=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py	2009-02-20 05:05:25 +0000
+++ b/bzrlib/repository.py	2009-02-20 08:26:50 +0000
@@ -3472,6 +3472,17 @@
     
     def _autopack(self):
         self.target.autopack()
+
+    @needs_write_lock
+    def fetch(self, revision_id=None, pb=None, find_ghosts=False):
+        """See InterRepository.fetch()."""
+        # Always fetch using the generic streaming fetch code, to allow
+        # streaming fetching into remote servers.
+        from bzrlib.fetch import RepoFetcher
+        fetcher = RepoFetcher(self.target, self.source, revision_id,
+                              pb, find_ghosts)
+        self.target.autopack()
+        return fetcher.count_copied, fetcher.failed_revisions
         
     def _get_target_pack_collection(self):
         return self.target._real_repository._pack_collection

=== modified file 'bzrlib/smart/protocol.py'
--- a/bzrlib/smart/protocol.py	2009-02-02 05:56:34 +0000
+++ b/bzrlib/smart/protocol.py	2009-02-20 08:26:50 +0000
@@ -1247,11 +1247,13 @@
                 self._write_prefixed_body(part)
                 self.flush()
         except Exception:
+            exc_info = sys.exc_info()
             # Iterating the stream failed.  Cleanly abort the request.
             self._write_error_status()
             # Currently the client unconditionally sends ('error',) as the
             # error args.
             self._write_structure(('error',))
+            raise exc_info[0], exc_info[1], exc_info[2]
         self._write_end()
         self._medium_request.finished_writing()
 

=== modified file 'bzrlib/smart/repository.py'
--- a/bzrlib/smart/repository.py	2009-02-20 03:28:07 +0000
+++ b/bzrlib/smart/repository.py	2009-02-20 08:26:50 +0000
@@ -18,13 +18,17 @@
 
 import bz2
 import os
+import struct
 import sys
+import tarfile
 import tempfile
-import tarfile
+import threading
+import Queue
 
 from bzrlib import (
     errors,
     osutils,
+    pack,
     )
 from bzrlib.bzrdir import BzrDir
 from bzrlib.smart.request import (
@@ -32,8 +36,10 @@
     SmartServerRequest,
     SuccessfulSmartServerResponse,
     )
-from bzrlib.repository import _strip_NULL_ghosts
+from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
 from bzrlib import revision as _mod_revision
+from bzrlib.util import bencode
+from bzrlib.versionedfile import NetworkRecordStream
 
 
 class SmartServerRepositoryRequest(SmartServerRequest):
@@ -403,3 +409,55 @@
             tarball.add(dirname, '.bzr') # recursive by default
         finally:
             tarball.close()
+
+
+class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
+
+    def do_repository_request(self, repository):
+        """StreamSink.insert_stream for a remote repository."""
+        repository.lock_write()
+        repository.start_write_group()
+        self.repository = repository
+        self.stream_decoder = pack.ContainerPushParser()
+        self.src_format = None
+        self.queue = Queue.Queue()
+        self.insert_thread = None
+
+    def do_chunk(self, body_stream_chunk):
+        self.stream_decoder.accept_bytes(body_stream_chunk)
+        for record in self.stream_decoder.read_pending_records():
+            record_names, record_bytes = record
+            if self.src_format is None:
+                src_format_name = record_bytes
+                src_format = network_format_registry.get(src_format_name)
+                self.src_format = src_format
+                self.insert_thread = threading.Thread(target=self._inserter_thread)
+                self.insert_thread.start()
+            else:
+                record_name, = record_names
+                substream_type = record_name[0]
+                stream = NetworkRecordStream([record_bytes])
+                for record in stream.read():
+                    self.queue.put((substream_type, [record]))
+
+    def _inserter_thread(self):
+        self.repository._get_sink().insert_stream(self.blocking_read_stream(),
+                self.src_format)
+
+    def blocking_read_stream(self):
+        while True:
+            item = self.queue.get()
+            if item is StopIteration:
+                return
+            else:
+                yield item
+
+    def do_end(self):
+        self.queue.put(StopIteration)
+        if self.insert_thread is not None:
+            self.insert_thread.join()
+        self.repository.commit_write_group()
+        self.repository.unlock()
+        return SuccessfulSmartServerResponse(('ok', ))
+
+

=== modified file 'bzrlib/smart/request.py'
--- a/bzrlib/smart/request.py	2009-02-20 03:28:07 +0000
+++ b/bzrlib/smart/request.py	2009-02-20 08:26:50 +0000
@@ -450,6 +450,8 @@
 request_handlers.register_lazy(
     'Repository.has_revision', 'bzrlib.smart.repository', 'SmartServerRequestHasRevision')
 request_handlers.register_lazy(
+    'Repository.insert_stream', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream')
+request_handlers.register_lazy(
     'Repository.is_shared', 'bzrlib.smart.repository', 'SmartServerRepositoryIsShared')
 request_handlers.register_lazy(
     'Repository.lock_write', 'bzrlib.smart.repository', 'SmartServerRepositoryLockWrite')

=== modified file 'bzrlib/tests/blackbox/test_push.py'
--- a/bzrlib/tests/blackbox/test_push.py	2009-02-20 03:29:37 +0000
+++ b/bzrlib/tests/blackbox/test_push.py	2009-02-20 08:26:50 +0000
@@ -202,7 +202,7 @@
         # being too low. If rpc_count increases, more network roundtrips have
         # become necessary for this use case. Please do not adjust this number
         # upwards without agreement from bzr's network support maintainers.
-        self.assertEqual(74, rpc_count)
+        self.assertEqual(60, rpc_count)
 
     def test_push_smart_stacked_streaming_acceptance(self):
         self.setup_smart_server_with_call_log()
@@ -219,7 +219,7 @@
         # being too low. If rpc_count increases, more network roundtrips have
         # become necessary for this use case. Please do not adjust this number
         # upwards without agreement from bzr's network support maintainers.
-        self.assertEqual(99, rpc_count)
+        self.assertEqual(101, rpc_count)
         remote = Branch.open('public')
         self.assertEndsWith(remote.get_stacked_on_url(), '/parent')
 

=== modified file 'bzrlib/versionedfile.py'
--- a/bzrlib/versionedfile.py	2009-02-20 06:00:22 +0000
+++ b/bzrlib/versionedfile.py	2009-02-20 08:26:50 +0000
@@ -22,6 +22,7 @@
 from copy import copy
 from cStringIO import StringIO
 import os
+import struct
 from zlib import adler32
 
 from bzrlib.lazy_import import lazy_import
@@ -45,6 +46,7 @@
 from bzrlib.registry import Registry
 from bzrlib.symbol_versioning import *
 from bzrlib.textmerge import TextMerge
+from bzrlib.util import bencode
 
 
 adapter_registry = Registry()
@@ -1502,6 +1504,7 @@
             'knit-annotated-ft-gz':knit.knit_network_to_record,
             'knit-annotated-delta-gz':knit.knit_network_to_record,
             'knit-delta-closure':knit.knit_delta_closure_to_records,
+            'fulltext':fulltext_network_to_record,
             }
 
     def read(self):
@@ -1514,3 +1517,29 @@
             for record in self._kind_factory[storage_kind](
                 storage_kind, bytes, line_end):
                 yield record
+
+
+def fulltext_network_to_record(kind, bytes, line_end):
+    """Convert a network fulltext record to record."""
+    meta_len, = struct.unpack('!L', bytes[line_end:line_end+4])
+    record_meta = record_bytes[line_end+4:line_end+4+meta_len]
+    key, parents = bencode.bdecode_as_tuple(record_meta)
+    if parents == 'nil':
+        parents = None
+    fulltext = record_bytes[line_end+4+meta_len:]
+    return FulltextContentFactory(key, parents, None, fulltext)
+
+
+def _length_prefix(bytes):
+    return struct.pack('!L', len(bytes))
+
+
+def record_to_fulltext_bytes(self, record):
+    if record.parents is None:
+        parents = 'nil'
+    else:
+        parents = record.parents
+    record_meta = bencode.bencode((record.key, parents))
+    record_content = record.get_bytes_as('fulltext')
+    return "fulltext\n%s%s%s" % (
+        _length_prefix(record_meta), record_meta, record_content)




More information about the bazaar-commits mailing list