Rev 4028: (robertc) Add RemoteSink to support streaming push to non-stacked in file:///home/pqm/archives/thelove/bzr/%2Btrunk/

Canonical.com Patch Queue Manager pqm at pqm.ubuntu.com
Fri Feb 20 22:53:41 GMT 2009


At file:///home/pqm/archives/thelove/bzr/%2Btrunk/

------------------------------------------------------------
revno: 4028
revision-id: pqm at pqm.ubuntu.com-20090220225337-14hb4kaptjpvwr0l
parent: pqm at pqm.ubuntu.com-20090220100854-p9g7snhipls2cj0z
parent: robertc at robertcollins.net-20090220221045-bmuh2t0wf09szn3z
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Fri 2009-02-20 22:53:37 +0000
message:
  (robertc) Add RemoteSink to support streaming push to non-stacked
  	repositories over bzr+ssh:// (Andrew Bennetts, Robert Collins)
modified:
  NEWS                           NEWS-20050323055033-4e00b5db738777ff
  bzrlib/remote.py               remote.py-20060720103555-yeeg2x51vn0rbtdp-1
  bzrlib/repository.py           rev_storage.py-20051111201905-119e9401e46257e3
  bzrlib/smart/protocol.py       protocol.py-20061108035435-ot0lstk2590yqhzr-1
  bzrlib/smart/repository.py     repository.py-20061128022038-vr5wy5bubyb8xttk-1
  bzrlib/smart/request.py        request.py-20061108095550-gunadhxmzkdjfeek-1
  bzrlib/tests/blackbox/test_push.py test_push.py-20060329002750-929af230d5d22663
  bzrlib/tests/test_smart_transport.py test_ssh_transport.py-20060608202016-c25gvf1ob7ypbus6-2
  bzrlib/versionedfile.py        versionedfile.py-20060222045106-5039c71ee3b65490
    ------------------------------------------------------------
    revno: 4022.1.8
    revision-id: robertc at robertcollins.net-20090220221045-bmuh2t0wf09szn3z
    parent: robertc at robertcollins.net-20090220104851-m0s7qwe6jzqkgj1f
    committer: Robert Collins <robertc at robertcollins.net>
    branch nick: integration
    timestamp: Sat 2009-02-21 09:10:45 +1100
    message:
      Fix test_call_with_body_stream_error which was broken by a debugging change to still pass tests.
    modified:
      bzrlib/smart/protocol.py       protocol.py-20061108035435-ot0lstk2590yqhzr-1
      bzrlib/tests/test_smart_transport.py test_ssh_transport.py-20060608202016-c25gvf1ob7ypbus6-2
    ------------------------------------------------------------
    revno: 4022.1.7
    revision-id: robertc at robertcollins.net-20090220104851-m0s7qwe6jzqkgj1f
    parent: robertc at robertcollins.net-20090220082650-wmzch4en338bymkm
    parent: pqm at pqm.ubuntu.com-20090220100854-p9g7snhipls2cj0z
    committer: Robert Collins <robertc at robertcollins.net>
    branch nick: integration
    timestamp: Fri 2009-02-20 21:48:51 +1100
    message:
      Merge bzr.dev (avoids criss-cross for PQM.
    modified:
      bzrlib/errors.py               errors.py-20050309040759-20512168c4e14fbd
      bzrlib/knit.py                 knit.py-20051212171256-f056ac8f0fbe1bd9
      bzrlib/remote.py               remote.py-20060720103555-yeeg2x51vn0rbtdp-1
      bzrlib/repofmt/pack_repo.py    pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
      bzrlib/repository.py           rev_storage.py-20051111201905-119e9401e46257e3
      bzrlib/tests/per_repository/test_write_group.py test_write_group.py-20070716105516-89n34xtogq5frn0m-1
      bzrlib/tests/test_errors.py    test_errors.py-20060210110251-41aba2deddf936a8
      bzrlib/tests/test_pack_repository.py test_pack_repository-20080801043947-eaw0e6h2gu75kwmy-1
      bzrlib/tests/test_versionedfile.py test_versionedfile.py-20060222045249-db45c9ed14a1c2e5
      bzrlib/versionedfile.py        versionedfile.py-20060222045106-5039c71ee3b65490
    ------------------------------------------------------------
    revno: 4022.1.6
    revision-id: robertc at robertcollins.net-20090220082650-wmzch4en338bymkm
    parent: robertc at robertcollins.net-20090220075016-18yco3ld54qc1188
    committer: Robert Collins <robertc at robertcollins.net>
    branch nick: fetch.RemoteSink
    timestamp: Fri 2009-02-20 19:26:50 +1100
    message:
      Cherrypick and polish the RemoteSink for streaming push.
    modified:
      NEWS                           NEWS-20050323055033-4e00b5db738777ff
      bzrlib/remote.py               remote.py-20060720103555-yeeg2x51vn0rbtdp-1
      bzrlib/repository.py           rev_storage.py-20051111201905-119e9401e46257e3
      bzrlib/smart/protocol.py       protocol.py-20061108035435-ot0lstk2590yqhzr-1
      bzrlib/smart/repository.py     repository.py-20061128022038-vr5wy5bubyb8xttk-1
      bzrlib/smart/request.py        request.py-20061108095550-gunadhxmzkdjfeek-1
      bzrlib/tests/blackbox/test_push.py test_push.py-20060329002750-929af230d5d22663
      bzrlib/versionedfile.py        versionedfile.py-20060222045106-5039c71ee3b65490
    ------------------------------------------------------------
    revno: 4022.1.5
    revision-id: robertc at robertcollins.net-20090220075016-18yco3ld54qc1188
    parent: robertc at robertcollins.net-20090220065237-vpfwvp6f3w01teu5
    parent: pqm at pqm.ubuntu.com-20090220071304-mb95xwtanwl2bqa4
    committer: Robert Collins <robertc at robertcollins.net>
    branch nick: fetch.RemoteSink
    timestamp: Fri 2009-02-20 18:50:16 +1100
    message:
      Merge bzr.dev.
    modified:
      NEWS                           NEWS-20050323055033-4e00b5db738777ff
      bzrlib/bzrdir.py               bzrdir.py-20060131065624-156dfea39c4387cb
      bzrlib/remote.py               remote.py-20060720103555-yeeg2x51vn0rbtdp-1
      bzrlib/smart/bzrdir.py         bzrdir.py-20061122024551-ol0l0o0oofsu9b3t-1
      bzrlib/smart/medium.py         medium.py-20061103051856-rgu2huy59fkz902q-1
      bzrlib/smart/repository.py     repository.py-20061128022038-vr5wy5bubyb8xttk-1
      bzrlib/smart/request.py        request.py-20061108095550-gunadhxmzkdjfeek-1
      bzrlib/tests/blackbox/test_push.py test_push.py-20060329002750-929af230d5d22663
      bzrlib/tests/blackbox/test_shared_repository.py test_shared_repository.py-20060317053531-ed30c0d79325e483
      bzrlib/tests/test_remote.py    test_remote.py-20060720103555-yeeg2x51vn0rbtdp-2
      bzrlib/tests/test_smart.py     test_smart.py-20061122024551-ol0l0o0oofsu9b3t-2
    ------------------------------------------------------------
    revno: 4022.1.4
    revision-id: robertc at robertcollins.net-20090220065237-vpfwvp6f3w01teu5
    parent: robertc at robertcollins.net-20090220062731-peh7eb800gvgb08h
    parent: robertc at robertcollins.net-20090220064830-wyj52q6rl2gygqsa
    committer: Robert Collins <robertc at robertcollins.net>
    branch nick: fetch.RemoteSink
    timestamp: Fri 2009-02-20 17:52:37 +1100
    message:
      Merge network serialisation of record streams.
    modified:
      NEWS                           NEWS-20050323055033-4e00b5db738777ff
      bzrlib/knit.py                 knit.py-20051212171256-f056ac8f0fbe1bd9
      bzrlib/tests/test_knit.py      test_knit.py-20051212171302-95d4c00dd5f11f2b
      bzrlib/tests/test_versionedfile.py test_versionedfile.py-20060222045249-db45c9ed14a1c2e5
      bzrlib/versionedfile.py        versionedfile.py-20060222045106-5039c71ee3b65490
      bzrlib/weave.py                knit.py-20050627021749-759c29984154256b
=== modified file 'NEWS'
--- a/NEWS	2009-02-20 08:40:02 +0000
+++ b/NEWS	2009-02-20 10:48:51 +0000
@@ -66,6 +66,9 @@
       allowing for decoupling over a network connection. (Andrew Bennetts,
       Robert Collins)
 
+    * There is a RemoteSink object which handles pushing to smart servers.
+      (Andrew Bennetts, Robert Collins)
+
     * ``bzrlib.tests.run_suite`` accepts a runner_class parameter
       supporting the use of different runners. (Robert Collins)
 

=== modified file 'bzrlib/remote.py'
--- a/bzrlib/remote.py	2009-02-20 09:28:11 +0000
+++ b/bzrlib/remote.py	2009-02-20 10:48:51 +0000
@@ -18,7 +18,6 @@
 # across to run on the server.
 
 import bz2
-import struct
 
 from bzrlib import (
     branch,
@@ -27,6 +26,7 @@
     errors,
     graph,
     lockdir,
+    pack,
     repository,
     revision,
     symbol_versioning,
@@ -43,6 +43,7 @@
 from bzrlib.smart import client, vfs
 from bzrlib.revision import ensure_null, NULL_REVISION
 from bzrlib.trace import mutter, note, warning
+from bzrlib.versionedfile import record_to_fulltext_bytes
 
 
 class _RpcHelper(object):
@@ -543,8 +544,7 @@
 
     def _get_sink(self):
         """See Repository._get_sink()."""
-        self._ensure_real()
-        return self._real_repository._get_sink()
+        return RemoteStreamSink(self)
 
     def has_revision(self, revision_id):
         """See Repository.has_revision()."""
@@ -1343,8 +1343,55 @@
             raise errors.UnexpectedSmartServerResponse(response)
 
 
-def _length_prefix(bytes):
-    return struct.pack('!L', len(bytes))
+class RemoteStreamSink(repository.StreamSink):
+
+    def _insert_real(self, stream, src_format):
+        self.target_repo._ensure_real()
+        sink = self.target_repo._real_repository._get_sink()
+        return sink.insert_stream(stream, src_format)
+
+    def insert_stream(self, stream, src_format):
+        repo = self.target_repo
+        # Until we can handle deltas in stack repositories we can't hand all
+        # the processing off to a remote server.
+        if self.target_repo._fallback_repositories:
+            return self._insert_real(stream, src_format)
+        client = repo._client
+        path = repo.bzrdir._path_for_remote_call(client)
+        byte_stream = self._stream_to_byte_stream(stream, src_format)
+        try:
+            response = client.call_with_body_stream(
+                ('Repository.insert_stream', path), byte_stream)
+        except errors.UnknownSmartMethod:
+            return self._insert_real(stream, src_format)
+            
+    def _stream_to_byte_stream(self, stream, src_format):
+        bytes = []
+        pack_writer = pack.ContainerWriter(bytes.append)
+        pack_writer.begin()
+        pack_writer.add_bytes_record(src_format.network_name(), '')
+        adapters = {}
+        def get_adapter(adapter_key):
+            try:
+                return adapters[adapter_key]
+            except KeyError:
+                adapter_factory = adapter_registry.get(adapter_key)
+                adapter = adapter_factory(self)
+                adapters[adapter_key] = adapter
+                return adapter
+        for substream_type, substream in stream:
+            for record in substream:
+                if record.storage_kind in ('chunked', 'fulltext'):
+                    serialised = record_to_fulltext_bytes(record)
+                else:
+                    serialised = record.get_bytes_as(record.storage_kind)
+                pack_writer.add_bytes_record(serialised, [(substream_type,)])
+                for b in bytes:
+                    yield b
+                del bytes[:]
+        pack_writer.end()
+        for b in bytes:
+            yield b
 
 
 class RemoteBranchLockableFiles(LockableFiles):

=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py	2009-02-20 09:28:11 +0000
+++ b/bzrlib/repository.py	2009-02-20 10:48:51 +0000
@@ -3487,6 +3487,17 @@
     
     def _autopack(self):
         self.target.autopack()
+
+    @needs_write_lock
+    def fetch(self, revision_id=None, pb=None, find_ghosts=False):
+        """See InterRepository.fetch()."""
+        # Always fetch using the generic streaming fetch code, to allow
+        # streaming fetching into remote servers.
+        from bzrlib.fetch import RepoFetcher
+        fetcher = RepoFetcher(self.target, self.source, revision_id,
+                              pb, find_ghosts)
+        self.target.autopack()
+        return fetcher.count_copied, fetcher.failed_revisions
         
     def _get_target_pack_collection(self):
         return self.target._real_repository._pack_collection

=== modified file 'bzrlib/smart/protocol.py'
--- a/bzrlib/smart/protocol.py	2009-02-02 05:56:34 +0000
+++ b/bzrlib/smart/protocol.py	2009-02-20 22:10:45 +0000
@@ -1247,11 +1247,15 @@
                 self._write_prefixed_body(part)
                 self.flush()
         except Exception:
+            exc_info = sys.exc_info()
             # Iterating the stream failed.  Cleanly abort the request.
             self._write_error_status()
             # Currently the client unconditionally sends ('error',) as the
             # error args.
             self._write_structure(('error',))
+            self._write_end()
+            self._medium_request.finished_writing()
+            raise exc_info[0], exc_info[1], exc_info[2]
         self._write_end()
         self._medium_request.finished_writing()
 

=== modified file 'bzrlib/smart/repository.py'
--- a/bzrlib/smart/repository.py	2009-02-20 03:28:07 +0000
+++ b/bzrlib/smart/repository.py	2009-02-20 08:26:50 +0000
@@ -18,13 +18,17 @@
 
 import bz2
 import os
+import struct
 import sys
+import tarfile
 import tempfile
-import tarfile
+import threading
+import Queue
 
 from bzrlib import (
     errors,
     osutils,
+    pack,
     )
 from bzrlib.bzrdir import BzrDir
 from bzrlib.smart.request import (
@@ -32,8 +36,10 @@
     SmartServerRequest,
     SuccessfulSmartServerResponse,
     )
-from bzrlib.repository import _strip_NULL_ghosts
+from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
 from bzrlib import revision as _mod_revision
+from bzrlib.util import bencode
+from bzrlib.versionedfile import NetworkRecordStream
 
 
 class SmartServerRepositoryRequest(SmartServerRequest):
@@ -403,3 +409,55 @@
             tarball.add(dirname, '.bzr') # recursive by default
         finally:
             tarball.close()
+
+
+class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
+
+    def do_repository_request(self, repository):
+        """StreamSink.insert_stream for a remote repository."""
+        repository.lock_write()
+        repository.start_write_group()
+        self.repository = repository
+        self.stream_decoder = pack.ContainerPushParser()
+        self.src_format = None
+        self.queue = Queue.Queue()
+        self.insert_thread = None
+
+    def do_chunk(self, body_stream_chunk):
+        self.stream_decoder.accept_bytes(body_stream_chunk)
+        for record in self.stream_decoder.read_pending_records():
+            record_names, record_bytes = record
+            if self.src_format is None:
+                src_format_name = record_bytes
+                src_format = network_format_registry.get(src_format_name)
+                self.src_format = src_format
+                self.insert_thread = threading.Thread(target=self._inserter_thread)
+                self.insert_thread.start()
+            else:
+                record_name, = record_names
+                substream_type = record_name[0]
+                stream = NetworkRecordStream([record_bytes])
+                for record in stream.read():
+                    self.queue.put((substream_type, [record]))
+
+    def _inserter_thread(self):
+        self.repository._get_sink().insert_stream(self.blocking_read_stream(),
+                self.src_format)
+
+    def blocking_read_stream(self):
+        while True:
+            item = self.queue.get()
+            if item is StopIteration:
+                return
+            else:
+                yield item
+
+    def do_end(self):
+        self.queue.put(StopIteration)
+        if self.insert_thread is not None:
+            self.insert_thread.join()
+        self.repository.commit_write_group()
+        self.repository.unlock()
+        return SuccessfulSmartServerResponse(('ok', ))
+
+

=== modified file 'bzrlib/smart/request.py'
--- a/bzrlib/smart/request.py	2009-02-20 03:28:07 +0000
+++ b/bzrlib/smart/request.py	2009-02-20 08:26:50 +0000
@@ -450,6 +450,8 @@
 request_handlers.register_lazy(
     'Repository.has_revision', 'bzrlib.smart.repository', 'SmartServerRequestHasRevision')
 request_handlers.register_lazy(
+    'Repository.insert_stream', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream')
+request_handlers.register_lazy(
     'Repository.is_shared', 'bzrlib.smart.repository', 'SmartServerRepositoryIsShared')
 request_handlers.register_lazy(
     'Repository.lock_write', 'bzrlib.smart.repository', 'SmartServerRepositoryLockWrite')

=== modified file 'bzrlib/tests/blackbox/test_push.py'
--- a/bzrlib/tests/blackbox/test_push.py	2009-02-20 03:29:37 +0000
+++ b/bzrlib/tests/blackbox/test_push.py	2009-02-20 08:26:50 +0000
@@ -202,7 +202,7 @@
         # being too low. If rpc_count increases, more network roundtrips have
         # become necessary for this use case. Please do not adjust this number
         # upwards without agreement from bzr's network support maintainers.
-        self.assertEqual(74, rpc_count)
+        self.assertEqual(60, rpc_count)
 
     def test_push_smart_stacked_streaming_acceptance(self):
         self.setup_smart_server_with_call_log()
@@ -219,7 +219,7 @@
         # being too low. If rpc_count increases, more network roundtrips have
         # become necessary for this use case. Please do not adjust this number
         # upwards without agreement from bzr's network support maintainers.
-        self.assertEqual(99, rpc_count)
+        self.assertEqual(101, rpc_count)
         remote = Branch.open('public')
         self.assertEndsWith(remote.get_stacked_on_url(), '/parent')
 

=== modified file 'bzrlib/tests/test_smart_transport.py'
--- a/bzrlib/tests/test_smart_transport.py	2009-02-11 09:45:27 +0000
+++ b/bzrlib/tests/test_smart_transport.py	2009-02-20 22:10:45 +0000
@@ -2766,7 +2766,8 @@
             yield 'aaa'
             yield 'bbb'
             raise Exception('Boom!')
-        requester.call_with_body_stream(('one arg',), stream_that_fails())
+        self.assertRaises(Exception, requester.call_with_body_stream,
+            ('one arg',), stream_that_fails())
         self.assertEquals(
             'bzr message 3 (bzr 1.6)\n' # protocol version
             '\x00\x00\x00\x02de' # headers

=== modified file 'bzrlib/versionedfile.py'
--- a/bzrlib/versionedfile.py	2009-02-20 08:40:02 +0000
+++ b/bzrlib/versionedfile.py	2009-02-20 10:48:51 +0000
@@ -22,6 +22,7 @@
 from copy import copy
 from cStringIO import StringIO
 import os
+import struct
 from zlib import adler32
 
 from bzrlib.lazy_import import lazy_import
@@ -45,6 +46,7 @@
 from bzrlib.registry import Registry
 from bzrlib.symbol_versioning import *
 from bzrlib.textmerge import TextMerge
+from bzrlib.util import bencode
 
 
 adapter_registry = Registry()
@@ -1514,6 +1516,7 @@
             'knit-annotated-ft-gz':knit.knit_network_to_record,
             'knit-annotated-delta-gz':knit.knit_network_to_record,
             'knit-delta-closure':knit.knit_delta_closure_to_records,
+            'fulltext':fulltext_network_to_record,
             }
 
     def read(self):
@@ -1526,3 +1529,29 @@
             for record in self._kind_factory[storage_kind](
                 storage_kind, bytes, line_end):
                 yield record
+
+
+def fulltext_network_to_record(kind, bytes, line_end):
+    """Convert a network fulltext record to record."""
+    meta_len, = struct.unpack('!L', bytes[line_end:line_end+4])
+    record_meta = record_bytes[line_end+4:line_end+4+meta_len]
+    key, parents = bencode.bdecode_as_tuple(record_meta)
+    if parents == 'nil':
+        parents = None
+    fulltext = record_bytes[line_end+4+meta_len:]
+    return FulltextContentFactory(key, parents, None, fulltext)
+
+
+def _length_prefix(bytes):
+    return struct.pack('!L', len(bytes))
+
+
+def record_to_fulltext_bytes(self, record):
+    if record.parents is None:
+        parents = 'nil'
+    else:
+        parents = record.parents
+    record_meta = bencode.bencode((record.key, parents))
+    record_content = record.get_bytes_as('fulltext')
+    return "fulltext\n%s%s%s" % (
+        _length_prefix(record_meta), record_meta, record_content)




More information about the bazaar-commits mailing list