Rev 2753: Merge needed transport support. in http://people.ubuntu.com/~robertc/baz2.0/index

Robert Collins robertc at robertcollins.net
Tue Aug 28 05:26:22 BST 2007


At http://people.ubuntu.com/~robertc/baz2.0/index

------------------------------------------------------------
revno: 2753
revision-id: robertc at robertcollins.net-20070828042612-etdfssgemaes40lk
parent: pqm at pqm.ubuntu.com-20070828032606-yp46hkvcaan2ew9b
parent: robertc at robertcollins.net-20070827041949-br263tkuxayldxoc
committer: Robert Collins <robertc at robertcollins.net>
branch nick: index
timestamp: Tue 2007-08-28 14:26:12 +1000
message:
  Merge needed transport support.
modified:
  NEWS                           NEWS-20050323055033-4e00b5db738777ff
  bzrlib/osutils.py              osutils.py-20050309040759-eeaff12fbf77ac86
  bzrlib/tests/test_transport_implementations.py test_transport_implementations.py-20051227111451-f97c5c7d5c49fce7
  bzrlib/transport/__init__.py   transport.py-20050711165921-4978aa7ce1285ad5
  bzrlib/transport/decorator.py  decorator.py-20060402223305-e913a0f25319ab42
  bzrlib/transport/fakevfat.py   fakevfat.py-20060407072414-d59939fa1d6c79d9
  bzrlib/transport/ftp.py        ftp.py-20051116161804-58dc9506548c2a53
  bzrlib/transport/http/__init__.py http_transport.py-20050711212304-506c5fd1059ace96
  bzrlib/transport/local.py      local_transport.py-20050711165921-9b1f142bfe480c24
  bzrlib/transport/memory.py     memory.py-20051016101338-cd008dbdf69f04fc
  bzrlib/transport/remote.py     ssh.py-20060608202016-c25gvf1ob7ypbus6-1
  bzrlib/transport/sftp.py       sftp.py-20051019050329-ab48ce71b7e32dfe
    ------------------------------------------------------------
    revno: 2745.3.2
    revision-id: robertc at robertcollins.net-20070827041949-br263tkuxayldxoc
    parent: robertc at robertcollins.net-20070826221051-46uq33p3oqkscdd0
    committer: Robert Collins <robertc at robertcollins.net>
    branch nick: transport-get-file
    timestamp: Mon 2007-08-27 14:19:49 +1000
    message:
      * ``bzrlib.transport.Transport.put_file`` now returns the number of bytes
        put by the method call, to allow avoiding stat-after write or
        housekeeping in callers. (Robert Collins)
    modified:
      NEWS                           NEWS-20050323055033-4e00b5db738777ff
      bzrlib/osutils.py              osutils.py-20050309040759-eeaff12fbf77ac86
      bzrlib/tests/test_transport_implementations.py test_transport_implementations.py-20051227111451-f97c5c7d5c49fce7
      bzrlib/transport/__init__.py   transport.py-20050711165921-4978aa7ce1285ad5
      bzrlib/transport/ftp.py        ftp.py-20051116161804-58dc9506548c2a53
      bzrlib/transport/local.py      local_transport.py-20050711165921-9b1f142bfe480c24
      bzrlib/transport/memory.py     memory.py-20051016101338-cd008dbdf69f04fc
      bzrlib/transport/remote.py     ssh.py-20060608202016-c25gvf1ob7ypbus6-1
      bzrlib/transport/sftp.py       sftp.py-20051019050329-ab48ce71b7e32dfe
    ------------------------------------------------------------
    revno: 2745.3.1
    revision-id: robertc at robertcollins.net-20070826221051-46uq33p3oqkscdd0
    parent: pqm at pqm.ubuntu.com-20070823005013-ada9x55rc31yiwou
    committer: Robert Collins <robertc at robertcollins.net>
    branch nick: transport-get-file
    timestamp: Mon 2007-08-27 08:10:51 +1000
    message:
      * New parameter on ``bzrlib.transport.Transport.readv``
        ``adjust_for_latency`` which changes readv from returning strictly the
        requested data to inserted return larger ranges and in forward read order
        to reduce the effect of network latency. (Robert Collins)
    modified:
      NEWS                           NEWS-20050323055033-4e00b5db738777ff
      bzrlib/tests/test_transport_implementations.py test_transport_implementations.py-20051227111451-f97c5c7d5c49fce7
      bzrlib/transport/__init__.py   transport.py-20050711165921-4978aa7ce1285ad5
      bzrlib/transport/decorator.py  decorator.py-20060402223305-e913a0f25319ab42
      bzrlib/transport/fakevfat.py   fakevfat.py-20060407072414-d59939fa1d6c79d9
      bzrlib/transport/http/__init__.py http_transport.py-20050711212304-506c5fd1059ace96
      bzrlib/transport/remote.py     ssh.py-20060608202016-c25gvf1ob7ypbus6-1
      bzrlib/transport/sftp.py       sftp.py-20051019050329-ab48ce71b7e32dfe
=== modified file 'NEWS'
--- a/NEWS	2007-08-28 00:49:10 +0000
+++ b/NEWS	2007-08-28 04:26:12 +0000
@@ -83,6 +83,15 @@
       incremental addition of data to a file without requiring that all the
       data be buffered in memory. (Robert Collins)
 
+    * New parameter on ``bzrlib.transport.Transport.readv``
+      ``adjust_for_latency`` which changes readv from returning strictly the
+      requested data to inserted return larger ranges and in forward read order
+      to reduce the effect of network latency. (Robert Collins)
+
+    * ``bzrlib.transport.Transport.put_file`` now returns the number of bytes
+      put by the method call, to allow avoiding stat-after write or
+      housekeeping in callers. (Robert Collins)
+
 
 bzr 0.90 2007-08-??
 ===================

=== modified file 'bzrlib/osutils.py'
--- a/bzrlib/osutils.py	2007-08-21 01:32:29 +0000
+++ b/bzrlib/osutils.py	2007-08-27 04:19:49 +0000
@@ -537,13 +537,19 @@
 
 
 def pumpfile(fromfile, tofile):
-    """Copy contents of one file to another."""
+    """Copy contents of one file to another.
+    
+    :return: The number of bytes copied.
+    """
     BUFSIZE = 32768
+    length = 0
     while True:
         b = fromfile.read(BUFSIZE)
         if not b:
             break
         tofile.write(b)
+        length += len(b)
+    return length
 
 
 def file_iterator(input_file, readsize=32768):

=== modified file 'bzrlib/tests/test_transport_implementations.py'
--- a/bzrlib/tests/test_transport_implementations.py	2007-08-22 01:41:24 +0000
+++ b/bzrlib/tests/test_transport_implementations.py	2007-08-27 04:19:49 +0000
@@ -374,11 +374,14 @@
                     t.put_file, 'a', StringIO('some text for a\n'))
             return
 
-        t.put_file('a', StringIO('some text for a\n'))
+        result = t.put_file('a', StringIO('some text for a\n'))
+        # put_file returns the length of the data written
+        self.assertEqual(16, result)
         self.failUnless(t.has('a'))
         self.check_transport_contents('some text for a\n', t, 'a')
         # Put also replaces contents
-        t.put_file('a', StringIO('new\ncontents for\na\n'))
+        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
+        self.assertEqual(19, result)
         self.check_transport_contents('new\ncontents for\na\n', t, 'a')
         self.assertRaises(NoSuchFile,
                           t.put_file, 'path/doesnt/exist/c',
@@ -1485,6 +1488,70 @@
         self.assertEqual(d[2], (0, '0'))
         self.assertEqual(d[3], (3, '34'))
 
+    def test_readv_with_adjust_for_latency(self):
+        transport = self.get_transport()
+        # the adjust for latency flag expands the data region returned
+        # according to a per-transport heuristic, so testing is a little
+        # tricky as we need more data than the largest combining that our
+        # transports do. To accomodate this we generate random data and cross
+        # reference the returned data with the random data. To avoid doing
+        # multiple large random byte look ups we do several tests on the same
+        # backing data.
+        content = osutils.rand_bytes(200*1024)
+        if transport.is_readonly():
+            file('a', 'w').write(content)
+        else:
+            transport.put_bytes('a', content)
+        def check_result_data(result_vector):
+            for item in result_vector:
+                data_len = len(item[1])
+                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
+
+        # start corner case
+        result = list(transport.readv('a', ((0, 30),),
+            adjust_for_latency=True))
+        # we expect 1 result, from 0, to something > 30
+        self.assertEqual(1, len(result))
+        self.assertEqual(0, result[0][0])
+        self.assertTrue(len(result[0][1]) >= 30)
+        check_result_data(result)
+        # end of file corner case
+        result = list(transport.readv('a', ((204700, 100),),
+            adjust_for_latency=True))
+        # we expect 1 result, from 204800- its length, to the end
+        self.assertEqual(1, len(result))
+        data_len = len(result[0][1])
+        self.assertEqual(204800-data_len, result[0][0])
+        self.assertTrue(data_len >= 100)
+        check_result_data(result)
+        # out of order ranges are made in order
+        result = list(transport.readv('a', ((204700, 100), (0, 50)),
+            adjust_for_latency=True))
+        # we expect 2 results, in order, start and end.
+        self.assertEqual(2, len(result))
+        # start
+        data_len = len(result[0][1])
+        self.assertEqual(0, result[0][0])
+        self.assertTrue(data_len >= 30)
+        # end
+        data_len = len(result[1][1])
+        self.assertEqual(204800-data_len, result[1][0])
+        self.assertTrue(data_len >= 100)
+        check_result_data(result)
+        # close ranges get combined (even if out of order)
+        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
+            result = list(transport.readv('a', request_vector,
+                adjust_for_latency=True))
+            self.assertEqual(1, len(result))
+            data_len = len(result[0][1])
+            # minimmum length is from 400 to 1034 - 634
+            self.assertTrue(data_len >= 634)
+            # must contain the region 400 to 1034
+            self.assertTrue(result[0][0] <= 400)
+            self.assertTrue(result[0][0] + data_len >= 1034)
+            check_result_data(result)
+        
+
     def test_get_with_open_write_stream_sees_all_content(self):
         t = self.get_transport()
         if t.is_readonly():

=== modified file 'bzrlib/transport/__init__.py'
--- a/bzrlib/transport/__init__.py	2007-08-22 01:41:24 +0000
+++ b/bzrlib/transport/__init__.py	2007-08-27 04:19:49 +0000
@@ -406,7 +406,7 @@
         """
         assert not isinstance(from_file, basestring), \
             '_pump should only be called on files not %s' % (type(from_file,))
-        osutils.pumpfile(from_file, to_file)
+        return osutils.pumpfile(from_file, to_file)
 
     def _get_total(self, multi):
         """Try to figure out how many entries are in multi,
@@ -635,7 +635,56 @@
         """
         raise errors.NoSmartMedium(self)
 
-    def readv(self, relpath, offsets):
+    def readv(self, relpath, offsets, adjust_for_latency=False):
+        """Get parts of the file at the given relative path.
+
+        :param relpath: The path to read data from.
+        :param offsets: A list of (offset, size) tuples.
+        :param adjust_for_latency: Adjust the requested offsets to accomdate
+            transport latency. This may re-order the offsets, expand them to
+            grab adjacent data when there is likely a high cost to requesting
+            data relative to delivering it.
+        :return: A list or generator of (offset, data) tuples
+        """
+        if adjust_for_latency:
+            offsets = sorted(offsets)
+            # short circuit empty requests
+            if len(offsets) == 0:
+                def empty_yielder():
+                    # Quick thunk to stop this function becoming a generator
+                    # itself, rather we return a generator that has nothing to
+                    # yield.
+                    if False:
+                        yield None
+                return empty_yielder()
+            # expand by page size at either end
+            expansion = self.recommended_page_size() / 2
+            new_offsets = []
+            for offset, length in offsets:
+                new_offset = offset - expansion
+                new_length = length + expansion
+                if new_offset < 0:
+                    # don't ask for anything < 0
+                    new_length -= new_offset
+                    new_offset = 0
+                new_offsets.append((new_offset, new_length))
+            # combine the expanded offsets
+            offsets = []
+            current_offset, current_length = new_offsets[0]
+            current_finish = current_length + current_offset
+            for offset, length in new_offsets[1:]:
+                if offset > current_finish:
+                    offsets.append((current_offset, current_length))
+                    current_offset = offset
+                    current_length = length
+                    continue
+                finish = offset + length
+                if finish > current_finish:
+                    current_finish = finish
+            offsets.append((current_offset, current_length))
+        return self._readv(relpath, offsets)
+
+    def _readv(self, relpath, offsets):
         """Get parts of the file at the given relative path.
 
         :offsets: A list of (offset, size) tuples.
@@ -794,6 +843,7 @@
         :param f:       File-like object.
         :param mode: The mode for the newly created file,
                      None means just use the default.
+        :return: The length of the file that was written.
         """
         # We would like to mark this as NotImplemented, but most likely
         # transports have defined it in terms of the old api.

=== modified file 'bzrlib/transport/decorator.py'
--- a/bzrlib/transport/decorator.py	2007-08-22 01:41:24 +0000
+++ b/bzrlib/transport/decorator.py	2007-08-26 22:10:51 +0000
@@ -138,6 +138,10 @@
         """See Transport.list_dir()."""
         return self._decorated.list_dir(relpath)
 
+    def _readv(self, relpath, offsets):
+        """See Transport._readv."""
+        return self._decorated._readv(relpath, offsets)
+
     def recommended_page_size(self):
         """See Transport.recommended_page_size()."""
         return self._decorated.recommended_page_size()

=== modified file 'bzrlib/transport/fakevfat.py'
--- a/bzrlib/transport/fakevfat.py	2007-08-15 06:53:07 +0000
+++ b/bzrlib/transport/fakevfat.py	2007-08-26 22:10:51 +0000
@@ -92,7 +92,7 @@
     def has(self, relpath):
         return self._decorated.has(self._squash_name(relpath))
 
-    def readv(self, relpath, offsets):
+    def _readv(self, relpath, offsets):
         return self._decorated.readv(self._squash_name(relpath), offsets)
 
     def put_file(self, relpath, f, mode=None):

=== modified file 'bzrlib/transport/ftp.py'
--- a/bzrlib/transport/ftp.py	2007-08-22 01:41:24 +0000
+++ b/bzrlib/transport/ftp.py	2007-08-27 04:19:49 +0000
@@ -271,14 +271,33 @@
         abspath = self._remote_path(relpath)
         tmp_abspath = '%s.tmp.%.9f.%d.%d' % (abspath, time.time(),
                         os.getpid(), random.randint(0,0x7FFFFFFF))
+        bytes = None
         if getattr(fp, 'read', None) is None:
-            fp = StringIO(fp)
+            # hand in a string IO
+            bytes = fp
+            fp = StringIO(bytes)
+        else:
+            # capture the byte count; .read() may be read only so
+            # decorate it.
+            class byte_counter(object):
+                def __init__(self, fp):
+                    self.fp = fp
+                    self.counted_bytes = 0
+                def read(self, count):
+                    result = self.fp.read(count)
+                    self.counted_bytes += len(result)
+                    return result
+            fp = byte_counter(fp)
         try:
             mutter("FTP put: %s", abspath)
             f = self._get_FTP()
             try:
                 f.storbinary('STOR '+tmp_abspath, fp)
                 self._rename_and_overwrite(tmp_abspath, abspath, f)
+                if bytes is not None:
+                    return len(bytes)
+                else:
+                    return fp.counted_bytes
             except (ftplib.error_temp,EOFError), e:
                 warning("Failure during ftp PUT. Deleting temporary file.")
                 try:

=== modified file 'bzrlib/transport/http/__init__.py'
--- a/bzrlib/transport/http/__init__.py	2007-08-05 01:47:30 +0000
+++ b/bzrlib/transport/http/__init__.py	2007-08-26 22:10:51 +0000
@@ -252,7 +252,7 @@
     # to avoid downloading the whole file.
     _max_readv_combined = 0
 
-    def readv(self, relpath, offsets):
+    def _readv(self, relpath, offsets):
         """Get parts of the file at the given relative path.
 
         :param offsets: A list of (offset, size) tuples.

=== modified file 'bzrlib/transport/local.py'
--- a/bzrlib/transport/local.py	2007-08-22 01:41:24 +0000
+++ b/bzrlib/transport/local.py	2007-08-27 04:19:49 +0000
@@ -164,10 +164,11 @@
         except (IOError, OSError),e:
             self._translate_error(e, path)
         try:
-            self._pump(f, fp)
+            length = self._pump(f, fp)
             fp.commit()
         finally:
             fp.close()
+        return length
 
     def put_bytes(self, relpath, bytes, mode=None):
         """Copy the string into the location.

=== modified file 'bzrlib/transport/memory.py'
--- a/bzrlib/transport/memory.py	2007-08-15 06:53:07 +0000
+++ b/bzrlib/transport/memory.py	2007-08-27 04:19:49 +0000
@@ -158,6 +158,7 @@
                 'undefined', bytes, 0, 1,
                 'put_file must be given a file of bytes, not unicode.')
         self._files[_abspath] = (bytes, mode)
+        return len(bytes)
 
     def mkdir(self, relpath, mode=None):
         """See Transport.mkdir()."""

=== modified file 'bzrlib/transport/remote.py'
--- a/bzrlib/transport/remote.py	2007-08-15 06:53:07 +0000
+++ b/bzrlib/transport/remote.py	2007-08-27 04:19:49 +0000
@@ -234,6 +234,7 @@
             (self._remote_path(relpath), self._serialise_optional_mode(mode)),
             upload_contents)
         self._translate_error(resp)
+        return len(upload_contents)
 
     def put_bytes_non_atomic(self, relpath, bytes, mode=None,
                              create_parent_dir=False,
@@ -290,7 +291,7 @@
         # the external path for RemoteTransports is the base
         return self.base
 
-    def readv(self, relpath, offsets):
+    def _readv(self, relpath, offsets):
         if not offsets:
             return
 

=== modified file 'bzrlib/transport/sftp.py'
--- a/bzrlib/transport/sftp.py	2007-08-22 01:41:24 +0000
+++ b/bzrlib/transport/sftp.py	2007-08-27 04:19:49 +0000
@@ -247,7 +247,7 @@
             self._translate_io_exception(e, path, ': error retrieving',
                 failure_exc=errors.ReadError)
 
-    def readv(self, relpath, offsets):
+    def _readv(self, relpath, offsets):
         """See Transport.readv()"""
         # We overload the default readv() because we want to use a file
         # that does not have prefetch enabled.
@@ -386,7 +386,7 @@
         :param mode: The final mode for the file
         """
         final_path = self._remote_path(relpath)
-        self._put(final_path, f, mode=mode)
+        return self._put(final_path, f, mode=mode)
 
     def _put(self, abspath, f, mode=None):
         """Helper function so both put() and copy_abspaths can reuse the code"""
@@ -397,7 +397,7 @@
         try:
             try:
                 fout.set_pipelined(True)
-                self._pump(f, fout)
+                length = self._pump(f, fout)
             except (IOError, paramiko.SSHException), e:
                 self._translate_io_exception(e, tmp_abspath)
             # XXX: This doesn't truly help like we would like it to.
@@ -418,6 +418,7 @@
             fout.close()
             closed = True
             self._rename_and_overwrite(tmp_abspath, abspath)
+            return length
         except Exception, e:
             # If we fail, try to clean up the temporary file
             # before we throw the exception



More information about the bazaar-commits mailing list