Rev 2753: Merge needed transport support. in http://people.ubuntu.com/~robertc/baz2.0/index
Robert Collins
robertc at robertcollins.net
Tue Aug 28 05:26:22 BST 2007
At http://people.ubuntu.com/~robertc/baz2.0/index
------------------------------------------------------------
revno: 2753
revision-id: robertc at robertcollins.net-20070828042612-etdfssgemaes40lk
parent: pqm at pqm.ubuntu.com-20070828032606-yp46hkvcaan2ew9b
parent: robertc at robertcollins.net-20070827041949-br263tkuxayldxoc
committer: Robert Collins <robertc at robertcollins.net>
branch nick: index
timestamp: Tue 2007-08-28 14:26:12 +1000
message:
Merge needed transport support.
modified:
NEWS NEWS-20050323055033-4e00b5db738777ff
bzrlib/osutils.py osutils.py-20050309040759-eeaff12fbf77ac86
bzrlib/tests/test_transport_implementations.py test_transport_implementations.py-20051227111451-f97c5c7d5c49fce7
bzrlib/transport/__init__.py transport.py-20050711165921-4978aa7ce1285ad5
bzrlib/transport/decorator.py decorator.py-20060402223305-e913a0f25319ab42
bzrlib/transport/fakevfat.py fakevfat.py-20060407072414-d59939fa1d6c79d9
bzrlib/transport/ftp.py ftp.py-20051116161804-58dc9506548c2a53
bzrlib/transport/http/__init__.py http_transport.py-20050711212304-506c5fd1059ace96
bzrlib/transport/local.py local_transport.py-20050711165921-9b1f142bfe480c24
bzrlib/transport/memory.py memory.py-20051016101338-cd008dbdf69f04fc
bzrlib/transport/remote.py ssh.py-20060608202016-c25gvf1ob7ypbus6-1
bzrlib/transport/sftp.py sftp.py-20051019050329-ab48ce71b7e32dfe
------------------------------------------------------------
revno: 2745.3.2
revision-id: robertc at robertcollins.net-20070827041949-br263tkuxayldxoc
parent: robertc at robertcollins.net-20070826221051-46uq33p3oqkscdd0
committer: Robert Collins <robertc at robertcollins.net>
branch nick: transport-get-file
timestamp: Mon 2007-08-27 14:19:49 +1000
message:
* ``bzrlib.transport.Transport.put_file`` now returns the number of bytes
put by the method call, to allow avoiding stat-after write or
housekeeping in callers. (Robert Collins)
modified:
NEWS NEWS-20050323055033-4e00b5db738777ff
bzrlib/osutils.py osutils.py-20050309040759-eeaff12fbf77ac86
bzrlib/tests/test_transport_implementations.py test_transport_implementations.py-20051227111451-f97c5c7d5c49fce7
bzrlib/transport/__init__.py transport.py-20050711165921-4978aa7ce1285ad5
bzrlib/transport/ftp.py ftp.py-20051116161804-58dc9506548c2a53
bzrlib/transport/local.py local_transport.py-20050711165921-9b1f142bfe480c24
bzrlib/transport/memory.py memory.py-20051016101338-cd008dbdf69f04fc
bzrlib/transport/remote.py ssh.py-20060608202016-c25gvf1ob7ypbus6-1
bzrlib/transport/sftp.py sftp.py-20051019050329-ab48ce71b7e32dfe
------------------------------------------------------------
revno: 2745.3.1
revision-id: robertc at robertcollins.net-20070826221051-46uq33p3oqkscdd0
parent: pqm at pqm.ubuntu.com-20070823005013-ada9x55rc31yiwou
committer: Robert Collins <robertc at robertcollins.net>
branch nick: transport-get-file
timestamp: Mon 2007-08-27 08:10:51 +1000
message:
* New parameter on ``bzrlib.transport.Transport.readv``
``adjust_for_latency`` which changes readv from returning strictly the
requested data to inserted return larger ranges and in forward read order
to reduce the effect of network latency. (Robert Collins)
modified:
NEWS NEWS-20050323055033-4e00b5db738777ff
bzrlib/tests/test_transport_implementations.py test_transport_implementations.py-20051227111451-f97c5c7d5c49fce7
bzrlib/transport/__init__.py transport.py-20050711165921-4978aa7ce1285ad5
bzrlib/transport/decorator.py decorator.py-20060402223305-e913a0f25319ab42
bzrlib/transport/fakevfat.py fakevfat.py-20060407072414-d59939fa1d6c79d9
bzrlib/transport/http/__init__.py http_transport.py-20050711212304-506c5fd1059ace96
bzrlib/transport/remote.py ssh.py-20060608202016-c25gvf1ob7ypbus6-1
bzrlib/transport/sftp.py sftp.py-20051019050329-ab48ce71b7e32dfe
=== modified file 'NEWS'
--- a/NEWS 2007-08-28 00:49:10 +0000
+++ b/NEWS 2007-08-28 04:26:12 +0000
@@ -83,6 +83,15 @@
incremental addition of data to a file without requiring that all the
data be buffered in memory. (Robert Collins)
+ * New parameter on ``bzrlib.transport.Transport.readv``
+ ``adjust_for_latency`` which changes readv from returning strictly the
+ requested data to inserted return larger ranges and in forward read order
+ to reduce the effect of network latency. (Robert Collins)
+
+ * ``bzrlib.transport.Transport.put_file`` now returns the number of bytes
+ put by the method call, to allow avoiding stat-after write or
+ housekeeping in callers. (Robert Collins)
+
bzr 0.90 2007-08-??
===================
=== modified file 'bzrlib/osutils.py'
--- a/bzrlib/osutils.py 2007-08-21 01:32:29 +0000
+++ b/bzrlib/osutils.py 2007-08-27 04:19:49 +0000
@@ -537,13 +537,19 @@
def pumpfile(fromfile, tofile):
- """Copy contents of one file to another."""
+ """Copy contents of one file to another.
+
+ :return: The number of bytes copied.
+ """
BUFSIZE = 32768
+ length = 0
while True:
b = fromfile.read(BUFSIZE)
if not b:
break
tofile.write(b)
+ length += len(b)
+ return length
def file_iterator(input_file, readsize=32768):
=== modified file 'bzrlib/tests/test_transport_implementations.py'
--- a/bzrlib/tests/test_transport_implementations.py 2007-08-22 01:41:24 +0000
+++ b/bzrlib/tests/test_transport_implementations.py 2007-08-27 04:19:49 +0000
@@ -374,11 +374,14 @@
t.put_file, 'a', StringIO('some text for a\n'))
return
- t.put_file('a', StringIO('some text for a\n'))
+ result = t.put_file('a', StringIO('some text for a\n'))
+ # put_file returns the length of the data written
+ self.assertEqual(16, result)
self.failUnless(t.has('a'))
self.check_transport_contents('some text for a\n', t, 'a')
# Put also replaces contents
- t.put_file('a', StringIO('new\ncontents for\na\n'))
+ result = t.put_file('a', StringIO('new\ncontents for\na\n'))
+ self.assertEqual(19, result)
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
self.assertRaises(NoSuchFile,
t.put_file, 'path/doesnt/exist/c',
@@ -1485,6 +1488,70 @@
self.assertEqual(d[2], (0, '0'))
self.assertEqual(d[3], (3, '34'))
+ def test_readv_with_adjust_for_latency(self):
+ transport = self.get_transport()
+ # the adjust for latency flag expands the data region returned
+ # according to a per-transport heuristic, so testing is a little
+ # tricky as we need more data than the largest combining that our
+ # transports do. To accomodate this we generate random data and cross
+ # reference the returned data with the random data. To avoid doing
+ # multiple large random byte look ups we do several tests on the same
+ # backing data.
+ content = osutils.rand_bytes(200*1024)
+ if transport.is_readonly():
+ file('a', 'w').write(content)
+ else:
+ transport.put_bytes('a', content)
+ def check_result_data(result_vector):
+ for item in result_vector:
+ data_len = len(item[1])
+ self.assertEqual(content[item[0]:item[0] + data_len], item[1])
+
+ # start corner case
+ result = list(transport.readv('a', ((0, 30),),
+ adjust_for_latency=True))
+ # we expect 1 result, from 0, to something > 30
+ self.assertEqual(1, len(result))
+ self.assertEqual(0, result[0][0])
+ self.assertTrue(len(result[0][1]) >= 30)
+ check_result_data(result)
+ # end of file corner case
+ result = list(transport.readv('a', ((204700, 100),),
+ adjust_for_latency=True))
+ # we expect 1 result, from 204800- its length, to the end
+ self.assertEqual(1, len(result))
+ data_len = len(result[0][1])
+ self.assertEqual(204800-data_len, result[0][0])
+ self.assertTrue(data_len >= 100)
+ check_result_data(result)
+ # out of order ranges are made in order
+ result = list(transport.readv('a', ((204700, 100), (0, 50)),
+ adjust_for_latency=True))
+ # we expect 2 results, in order, start and end.
+ self.assertEqual(2, len(result))
+ # start
+ data_len = len(result[0][1])
+ self.assertEqual(0, result[0][0])
+ self.assertTrue(data_len >= 30)
+ # end
+ data_len = len(result[1][1])
+ self.assertEqual(204800-data_len, result[1][0])
+ self.assertTrue(data_len >= 100)
+ check_result_data(result)
+ # close ranges get combined (even if out of order)
+ for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
+ result = list(transport.readv('a', request_vector,
+ adjust_for_latency=True))
+ self.assertEqual(1, len(result))
+ data_len = len(result[0][1])
+ # minimmum length is from 400 to 1034 - 634
+ self.assertTrue(data_len >= 634)
+ # must contain the region 400 to 1034
+ self.assertTrue(result[0][0] <= 400)
+ self.assertTrue(result[0][0] + data_len >= 1034)
+ check_result_data(result)
+
+
def test_get_with_open_write_stream_sees_all_content(self):
t = self.get_transport()
if t.is_readonly():
=== modified file 'bzrlib/transport/__init__.py'
--- a/bzrlib/transport/__init__.py 2007-08-22 01:41:24 +0000
+++ b/bzrlib/transport/__init__.py 2007-08-27 04:19:49 +0000
@@ -406,7 +406,7 @@
"""
assert not isinstance(from_file, basestring), \
'_pump should only be called on files not %s' % (type(from_file,))
- osutils.pumpfile(from_file, to_file)
+ return osutils.pumpfile(from_file, to_file)
def _get_total(self, multi):
"""Try to figure out how many entries are in multi,
@@ -635,7 +635,56 @@
"""
raise errors.NoSmartMedium(self)
- def readv(self, relpath, offsets):
+ def readv(self, relpath, offsets, adjust_for_latency=False):
+ """Get parts of the file at the given relative path.
+
+ :param relpath: The path to read data from.
+ :param offsets: A list of (offset, size) tuples.
+ :param adjust_for_latency: Adjust the requested offsets to accomdate
+ transport latency. This may re-order the offsets, expand them to
+ grab adjacent data when there is likely a high cost to requesting
+ data relative to delivering it.
+ :return: A list or generator of (offset, data) tuples
+ """
+ if adjust_for_latency:
+ offsets = sorted(offsets)
+ # short circuit empty requests
+ if len(offsets) == 0:
+ def empty_yielder():
+ # Quick thunk to stop this function becoming a generator
+ # itself, rather we return a generator that has nothing to
+ # yield.
+ if False:
+ yield None
+ return empty_yielder()
+ # expand by page size at either end
+ expansion = self.recommended_page_size() / 2
+ new_offsets = []
+ for offset, length in offsets:
+ new_offset = offset - expansion
+ new_length = length + expansion
+ if new_offset < 0:
+ # don't ask for anything < 0
+ new_length -= new_offset
+ new_offset = 0
+ new_offsets.append((new_offset, new_length))
+ # combine the expanded offsets
+ offsets = []
+ current_offset, current_length = new_offsets[0]
+ current_finish = current_length + current_offset
+ for offset, length in new_offsets[1:]:
+ if offset > current_finish:
+ offsets.append((current_offset, current_length))
+ current_offset = offset
+ current_length = length
+ continue
+ finish = offset + length
+ if finish > current_finish:
+ current_finish = finish
+ offsets.append((current_offset, current_length))
+ return self._readv(relpath, offsets)
+
+ def _readv(self, relpath, offsets):
"""Get parts of the file at the given relative path.
:offsets: A list of (offset, size) tuples.
@@ -794,6 +843,7 @@
:param f: File-like object.
:param mode: The mode for the newly created file,
None means just use the default.
+ :return: The length of the file that was written.
"""
# We would like to mark this as NotImplemented, but most likely
# transports have defined it in terms of the old api.
=== modified file 'bzrlib/transport/decorator.py'
--- a/bzrlib/transport/decorator.py 2007-08-22 01:41:24 +0000
+++ b/bzrlib/transport/decorator.py 2007-08-26 22:10:51 +0000
@@ -138,6 +138,10 @@
"""See Transport.list_dir()."""
return self._decorated.list_dir(relpath)
+ def _readv(self, relpath, offsets):
+ """See Transport._readv."""
+ return self._decorated._readv(relpath, offsets)
+
def recommended_page_size(self):
"""See Transport.recommended_page_size()."""
return self._decorated.recommended_page_size()
=== modified file 'bzrlib/transport/fakevfat.py'
--- a/bzrlib/transport/fakevfat.py 2007-08-15 06:53:07 +0000
+++ b/bzrlib/transport/fakevfat.py 2007-08-26 22:10:51 +0000
@@ -92,7 +92,7 @@
def has(self, relpath):
return self._decorated.has(self._squash_name(relpath))
- def readv(self, relpath, offsets):
+ def _readv(self, relpath, offsets):
return self._decorated.readv(self._squash_name(relpath), offsets)
def put_file(self, relpath, f, mode=None):
=== modified file 'bzrlib/transport/ftp.py'
--- a/bzrlib/transport/ftp.py 2007-08-22 01:41:24 +0000
+++ b/bzrlib/transport/ftp.py 2007-08-27 04:19:49 +0000
@@ -271,14 +271,33 @@
abspath = self._remote_path(relpath)
tmp_abspath = '%s.tmp.%.9f.%d.%d' % (abspath, time.time(),
os.getpid(), random.randint(0,0x7FFFFFFF))
+ bytes = None
if getattr(fp, 'read', None) is None:
- fp = StringIO(fp)
+ # hand in a string IO
+ bytes = fp
+ fp = StringIO(bytes)
+ else:
+ # capture the byte count; .read() may be read only so
+ # decorate it.
+ class byte_counter(object):
+ def __init__(self, fp):
+ self.fp = fp
+ self.counted_bytes = 0
+ def read(self, count):
+ result = self.fp.read(count)
+ self.counted_bytes += len(result)
+ return result
+ fp = byte_counter(fp)
try:
mutter("FTP put: %s", abspath)
f = self._get_FTP()
try:
f.storbinary('STOR '+tmp_abspath, fp)
self._rename_and_overwrite(tmp_abspath, abspath, f)
+ if bytes is not None:
+ return len(bytes)
+ else:
+ return fp.counted_bytes
except (ftplib.error_temp,EOFError), e:
warning("Failure during ftp PUT. Deleting temporary file.")
try:
=== modified file 'bzrlib/transport/http/__init__.py'
--- a/bzrlib/transport/http/__init__.py 2007-08-05 01:47:30 +0000
+++ b/bzrlib/transport/http/__init__.py 2007-08-26 22:10:51 +0000
@@ -252,7 +252,7 @@
# to avoid downloading the whole file.
_max_readv_combined = 0
- def readv(self, relpath, offsets):
+ def _readv(self, relpath, offsets):
"""Get parts of the file at the given relative path.
:param offsets: A list of (offset, size) tuples.
=== modified file 'bzrlib/transport/local.py'
--- a/bzrlib/transport/local.py 2007-08-22 01:41:24 +0000
+++ b/bzrlib/transport/local.py 2007-08-27 04:19:49 +0000
@@ -164,10 +164,11 @@
except (IOError, OSError),e:
self._translate_error(e, path)
try:
- self._pump(f, fp)
+ length = self._pump(f, fp)
fp.commit()
finally:
fp.close()
+ return length
def put_bytes(self, relpath, bytes, mode=None):
"""Copy the string into the location.
=== modified file 'bzrlib/transport/memory.py'
--- a/bzrlib/transport/memory.py 2007-08-15 06:53:07 +0000
+++ b/bzrlib/transport/memory.py 2007-08-27 04:19:49 +0000
@@ -158,6 +158,7 @@
'undefined', bytes, 0, 1,
'put_file must be given a file of bytes, not unicode.')
self._files[_abspath] = (bytes, mode)
+ return len(bytes)
def mkdir(self, relpath, mode=None):
"""See Transport.mkdir()."""
=== modified file 'bzrlib/transport/remote.py'
--- a/bzrlib/transport/remote.py 2007-08-15 06:53:07 +0000
+++ b/bzrlib/transport/remote.py 2007-08-27 04:19:49 +0000
@@ -234,6 +234,7 @@
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
upload_contents)
self._translate_error(resp)
+ return len(upload_contents)
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
create_parent_dir=False,
@@ -290,7 +291,7 @@
# the external path for RemoteTransports is the base
return self.base
- def readv(self, relpath, offsets):
+ def _readv(self, relpath, offsets):
if not offsets:
return
=== modified file 'bzrlib/transport/sftp.py'
--- a/bzrlib/transport/sftp.py 2007-08-22 01:41:24 +0000
+++ b/bzrlib/transport/sftp.py 2007-08-27 04:19:49 +0000
@@ -247,7 +247,7 @@
self._translate_io_exception(e, path, ': error retrieving',
failure_exc=errors.ReadError)
- def readv(self, relpath, offsets):
+ def _readv(self, relpath, offsets):
"""See Transport.readv()"""
# We overload the default readv() because we want to use a file
# that does not have prefetch enabled.
@@ -386,7 +386,7 @@
:param mode: The final mode for the file
"""
final_path = self._remote_path(relpath)
- self._put(final_path, f, mode=mode)
+ return self._put(final_path, f, mode=mode)
def _put(self, abspath, f, mode=None):
"""Helper function so both put() and copy_abspaths can reuse the code"""
@@ -397,7 +397,7 @@
try:
try:
fout.set_pipelined(True)
- self._pump(f, fout)
+ length = self._pump(f, fout)
except (IOError, paramiko.SSHException), e:
self._translate_io_exception(e, tmp_abspath)
# XXX: This doesn't truly help like we would like it to.
@@ -418,6 +418,7 @@
fout.close()
closed = True
self._rename_and_overwrite(tmp_abspath, abspath)
+ return length
except Exception, e:
# If we fail, try to clean up the temporary file
# before we throw the exception
More information about the bazaar-commits
mailing list