[Merge] Slow socket

John Arbash Meinel john at arbash-meinel.com
Mon Aug 14 18:36:16 BST 2006


Carl Friedrich Bolz wrote:
> Hi all!
> 
> The attached patch contains the slow socket wrapper that simulates
> latency and bandwith for socket based operations. It is based on some
> code by Robert and previous discussions here on the list.  There are
> also a couple of benchmarks for pushing and pulling using the socket
> wrapper.
> 
> Cheers,
> 
> Carl Friedrich
> 

Out of curiosity, what bzr did you use to generate the diff? We haven't
used '/dev/null' for a newly added file in a while.

> 
> ------------------------------------------------------------------------
> 
> === added file 'bzrlib/benchmarks/bench_sftp.py'
> --- /dev/null	
> +++ bzrlib/benchmarks/bench_sftp.py	
> @@ -0,0 +1,126 @@
> +import os
> +from bzrlib.tests.test_sftp_transport import TestCaseWithSFTPServer
> +from bzrlib.benchmarks import Benchmark
> +from bzrlib import bzrdir
> +import bzrlib.transport
> +import bzrlib.transport.http
> +from bzrlib.workingtree import WorkingTree
> +from bzrlib.tests import test_sftp_transport
> +
> +try:
> +    import paramiko
> +    paramiko_loaded = True
> +except ImportError:
> +    paramiko_loaded = False
> +
> +
> +class SFTPBenchmark(Benchmark):
> +    """A benchmark base class that provides a sftp server on localhost."""
> +
> +    def setUp(self):
> +        if not paramiko_loaded:
> +            raise TestSkipped('you must have paramiko to run this test')
> +        super(SFTPBenchmark, self).setUp()
> +        test_sftp_transport.set_transport(self) 

I *think* it is better to call super().setUp() before you raise
TestSkipped(), but I could be wrong.

> +         
> +    def create_with_commits(self, num_files, num_commits, directory_name='.'):
> +        """Create a tree with many commits.
> +        
> +        No files change are included.
> +        """
> +        files = ["%s/%s" % (directory_name, i) for i in range(num_files)]
> +        for fn in files:
> +            f = open(fn, "w")
> +            f.write("some content\n")
> +            f.close()
> +        tree = bzrdir.BzrDir.create_standalone_workingtree(directory_name)
> +        for i in range(num_files):
> +            tree.add(str(i))
> +        tree.lock_write()
> +        tree.branch.lock_write()
> +        tree.branch.repository.lock_write()

^- you should only need to lock the tree, it should unlock everything
else. (You can also fix up the place you copy/pasted, because it was a
misunderstanding of the api by Aaron) We will probably introduce an api
in the future, which only locks the working tree, etc. But for now,
compatibility says everything gets locked.

Also, your doc string is incorrect, because there *are* file changes.

> +        try:
> +            tree.commit('initial commit')
> +            for i in range(num_commits):
> +                fn = files[i % len(files)]
> +                f = open(fn, "w")
> +                f.write("\n".join([str(j) for j in (range(i) + [i, i, i])]))
> +                f.close()
> +                tree.commit("changing file %s" % fn)

^- We almost always use "wb" to ensure the contents of the file are
consistent across platforms.
I also don't understand what you are accomplishing with (range(i) +
[i,i,i]). I suppose you just want to always create a file with 'i+3' lines.

Also, what you have written will create a file with no trailing newline.
What you probably want is:

f.write('\n'.join([str(j) for j in (range(i) + [i,i,i,''])])

> +        finally:
> +            try:
> +                try:
> +                    tree.branch.repository.unlock()
> +                finally:
> +                    tree.branch.unlock()
> +            finally:
> +                tree.unlock()
> +        return tree, files

Why is this part of SFTPBenchmark, instead of Benchmark or some helper
functions?

> +
> +    def commit_some_revisions(self, tree, files, num_commits):
> +        for j in range(num_commits):
> +            for i in range(20):
> +                fn = files[i]
> +                f = open(fn, "w")
> +                f.write("\n".join([str(k) for k in (range(i) + [i, i, i])]))
> +                f.close()
> +            tree.commit("new revision")

Similarly here. I think it would be better to factor this into a
different function. Because this has a bug if there are fewer than 20
files in the list. I'm also a little surprised that you always modify
the same 20 files in each commit. This will have an effect on branch and
pull. Our latency for Pull is based on O(N files changed) because that
is how many knits we need to join. (Our bandwidth would be O(changes),
but for pull, it is important that the set of changed files increases to
get a real latency measurement).

> +
> +    def test_branch(self):
> +        os.mkdir("a")
> +        t, files = self.create_with_commits(100, 100, "a")
> +
> +        self.time(bzrdir.BzrDir.open(self.get_url('a')).sprout, "b")
> +
> +    def test_pull_1(self):
> +        os.mkdir("a")
> +        t, files = self.create_with_commits(100, 100, "a")
> +        rbzrdir = bzrdir.BzrDir.open(self.get_url('a'))
> +        b2 = rbzrdir.sprout("b") # branch
> +        # change a few files and commit
> +        self.commit_some_revisions(t, files, 1)
> +        self.time(b2.open_branch().pull, rbzrdir.open_branch())
> +        
> +    def test_pull_100(self):
> +        os.mkdir("a")
> +        t, files = self.create_with_commits(100, 100, "a")
> +        rbzrdir = bzrdir.BzrDir.open(self.get_url('a'))
> +        b2 = rbzrdir.sprout("b") # branch
> +        # change a few files and commit
> +        self.commit_some_revisions(t, files, 100)
> +        self.time(b2.open_branch().pull, rbzrdir.open_branch())
> +
> +    def create_commit_and_push(self, num_push_revisions):
> +        os.mkdir("a")
> +        t, files = self.create_with_commits(100, 100, "a")
> +        rbzrdir = bzrdir.BzrDir.open(self.get_url('a'))
> +        b2 = rbzrdir.sprout("b") # branch
> +        # change a few files and commit
> +        self.commit_some_revisions(
> +            b2.open_workingtree(), ["b/%i" for i in range(100)], 
> +            num_commits=num_push_revisions)
> +        self.time(rbzrdir.open_branch().pull, b2.open_branch())

It would probably be better to do:
wt = b2.open_workingtree()

self.commit_some_revisions(wt, ...)
self.time(rbzrdir.open_branch().pull, wt.branch)

But that is a small thing.

(open_workingtree() and open_branch() return new objects, so 2 calls to
open_branch() will return different things, and locking both will give
lock collisions)

So it is better to re-use an object if you have it. (Also the
a_branch.bzrdir.open_workingtree().branch does not yeild a_branch, it
returns a newly created branch).

> +
> +    def test_push_1(self):
> +        self.create_commit_and_push(1)
> +
> +    def test_push_10(self):
> +        self.create_commit_and_push(10)
> +
> +    def test_push_100(self):
> +        self.create_commit_and_push(100)
> +
> +
> +class SFTPSlowSocketBenchmark(SFTPBenchmark):
> +    def setUp(self):
> +        super(SFTPSlowSocketBenchmark, self).setUp()
> +        self.get_server().add_latency = 0.03
> +
> +    def time(self, *args, **kwargs):
> +        """ Time the operation as usual but add the simulated time of the
> +        SocketDelay"""
> +        from bzrlib.transport.sftp import SocketDelay
> +        start_time = SocketDelay.simulated_time
> +        super(SFTPSlowSocketBenchmark, self).time(*args, **kwargs)
> +        self._benchtime += SocketDelay.simulated_time - start_time
> +

Did we decide that simulated time was better than using real time? (For
one thing, async improvements would not show up properly here).

> 
> === modified file 'bzrlib/benchmarks/__init__.py'
> --- bzrlib/benchmarks/__init__.py	
> +++ bzrlib/benchmarks/__init__.py	
> @@ -20,6 +20,7 @@
>  from bzrlib import bzrdir, plugin
>  from bzrlib.tests.TestUtil import TestLoader
>  from bzrlib.tests.blackbox import ExternalBase
> +import bzrlib.branch
>  
>  
>  class Benchmark(ExternalBase):
> @@ -117,6 +118,7 @@
>                     'bzrlib.benchmarks.bench_status',
>                     'bzrlib.benchmarks.bench_transform',
>                     'bzrlib.benchmarks.bench_workingtree',
> +                   'bzrlib.benchmarks.bench_sftp',
>                     ]
>      suite = TestLoader().loadTestsFromModuleNames(testmod_names) 
>  
> 
> === modified file 'bzrlib/tests/test_sftp_transport.py'
> --- bzrlib/tests/test_sftp_transport.py	
> +++ bzrlib/tests/test_sftp_transport.py	
> @@ -15,8 +15,10 @@
>  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
>  
>  import os
> +from operator import lt, gt, ge
>  import socket
>  import threading
> +import time
>  
>  import bzrlib.bzrdir as bzrdir
>  import bzrlib.errors as errors
> @@ -31,6 +33,16 @@
>  except ImportError:
>      paramiko_loaded = False

-- you need an extra space here, and after this function. And you
shouldn't have the extra spaces in the doc string.

>  
> +def set_transport(testcase):
> +    """ A helper to set transports on test case instances. """
> +    from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer
> +    if getattr(testcase, '_get_remote_is_absolute', None) is None:
> +        testcase._get_remote_is_absolute = True
> +    if testcase._get_remote_is_absolute:
> +        testcase.transport_server = SFTPAbsoluteServer
> +    else:
> +        testcase.transport_server = SFTPHomeDirServer
> +    testcase.transport_readonly_server = bzrlib.transport.http.HttpServer
>  
>  class TestCaseWithSFTPServer(TestCaseWithTransport):
>      """A test case base class that provides a sftp server on localhost."""
> @@ -39,14 +51,7 @@
>          if not paramiko_loaded:
>              raise TestSkipped('you must have paramiko to run this test')
>          super(TestCaseWithSFTPServer, self).setUp()
> -        from bzrlib.transport.sftp import SFTPAbsoluteServer, SFTPHomeDirServer
> -        if getattr(self, '_get_remote_is_absolute', None) is None:
> -            self._get_remote_is_absolute = True
> -        if self._get_remote_is_absolute:
> -            self.transport_server = SFTPAbsoluteServer
> -        else:
> -            self.transport_server = SFTPHomeDirServer
> -        self.transport_readonly_server = bzrlib.transport.http.HttpServer
> +        set_transport(self) 
>  
>      def get_transport(self, path=None):
>          """Return a transport relative to self._test_root."""
> @@ -342,3 +347,91 @@
>              raise TestSkipped('Known NameError bug with paramiko-1.6.1')
>          self.assertContainsRe(err, 'Connection error')
>  
> +
> +class SFTPLatencyKnob(TestCaseWithSFTPServer):
> +    """Test that the testing SFTPServer's latency knob works."""
> +
> +    def test_make_transport_slow(self):
> +        # change the latency knob to 100ms. We take about 40ms for a 
> +        # loopback connection ordinarily.
> +        self.get_server().add_latency = 0.1
> +        self.assertConnectionTime(ge)
> +
> +    def assertConnectionTime(self, operator):
> +        from bzrlib.transport.sftp import SocketDelay
> +        start_time = SocketDelay.simulated_time
> +        transport = self.get_transport()
> +        stop_time = SocketDelay.simulated_time
> +        self.failUnless(operator(stop_time - start_time, 0.1))
> +
> +    def test_default_fast(self):
> +        self.assertConnectionTime(lt)
> +

I think Martin mentioned that he didn't really like the 'ge/lt' stuff.
And I would agree.

v- The doc string is not PEP8 (extra space at the beginning, ending """
is on the same line in a multi-line comment, etc).

> +class FakeSocket(object):
> +    """ Fake socket object used to test the SocketDelay wrapper without
> +    using a real socket."""
> +
> +    def __init__(self):
> +        self._data = ""
> +
> +    def send(self, data, flags=0):
> +        self._data += data
> +        return len(data)
> +
> +    def sendall(self, data, flags=0):
> +        self._data += data
> +        return len(data)
> +
> +    def recv(self, size, flags=0):
> +        if size < len(self._data):
> +            result = self._data[:size]
> +            self._data = self._data[size:]
> +            return result
> +        else:
> +            result = self._data
> +            self._data = ""
> +            return result
> +

-- need an extra space here

> +class TestSocketDelay(TestCase):

-- and here

> +    def setUp(self):
> +        TestCase.setUp(self)
> +
> +    def test_delay(self):
> +        from bzrlib.transport.sftp import SocketDelay
> +        sending = FakeSocket()
> +        receiving = SocketDelay(sending, 0.1, bandwidth=1000000)
> +        # check that simulated time is charged only per round-trip:
> +        t1 = SocketDelay.simulated_time
> +        receiving.send("connect1")
> +        self.assertEqual(sending.recv(1024), "connect1")
> +        t2 = SocketDelay.simulated_time
> +        self.assertAlmostEqual(t2 - t1, 0.1)
> +        receiving.send("connect2")
> +        self.assertEqual(sending.recv(1024), "connect2")
> +        sending.send("hello")
> +        self.assertEqual(receiving.recv(1024), "hello")
> +        t3 = SocketDelay.simulated_time
> +        self.assertAlmostEqual(t3 - t2, 0.1)
> +        sending.send("hello")
> +        self.assertEqual(receiving.recv(1024), "hello")
> +        sending.send("hello")
> +        self.assertEqual(receiving.recv(1024), "hello")
> +        sending.send("hello")
> +        self.assertEqual(receiving.recv(1024), "hello")
> +        t4 = SocketDelay.simulated_time
> +        self.assertAlmostEqual(t4, t3)
> +
> +    def test_bandwidth(self):
> +        from bzrlib.transport.sftp import SocketDelay
> +        sending = FakeSocket()
> +        receiving = SocketDelay(sending, 0, bandwidth=8.0/(1024*1024))
> +        # check that simulated time is charged only per round-trip:
> +        t1 = SocketDelay.simulated_time
> +        receiving.send("connect")
> +        self.assertEqual(sending.recv(1024), "connect")
> +        sending.send("a" * 100)
> +        self.assertEqual(receiving.recv(1024), "a" * 100)
> +        t2 = SocketDelay.simulated_time
> +        self.assertAlmostEqual(t2 - t1, 100 + 7)
> +
> +

So is this setting the bandwidth to 8 millibytes/sec?


> 
> === modified file 'bzrlib/transport/sftp.py'
> --- bzrlib/transport/sftp.py	
> +++ bzrlib/transport/sftp.py	
> @@ -1065,6 +1065,99 @@
>                          x)
>  
>  
> +class SocketDelay(object):
> +    """A socket decorator to make TCP appear slower.
> +
> +    This changes recv, send, and sendall to add a fixed latency to each python
> +    call if a new roundtrip is detected. That is, when a recv is called and the
> +    flag new_roundtrip is set, latency is charged. Every send and send_all
> +    sets this flag.
> +
> +    In addition every send, sendall and recv sleeps a bit per character send to
> +    simulate bandwidth.
> +
> +    The function used to sleep moves a counter forwards to not make the tests
> +    slower. It could be made more clever, by adding the time that was passing
> +    between sleep calls to the simulated time too.

^- the english here isn't very clear.

> +
> +    Not all methods are implemented, this is deliberate as this class is not a
> +    replacement for the builtin sockets layer. fileno is not implemented to
> +    prevent the proxy being bypassed. 
> +    """
> +
> +    simulated_time = 0


v- You could probably do this easier with a __getattr__() that thunked
to self.sock. And explicitly blocked fileno. (My understanding is that
__getattr__() is only called when the attribute in question fails to be
found by the default object implementation)

> +
> +    def __init__(self, sock, latency, bandwidth=1.0, 
> +                 really_sleep=False):
> +        """ 
> +        :param bandwith: simulated bandwith (MegaBit)
> +        """
> +        self.sock = sock
> +        self.latency = latency
> +        self.really_sleep = really_sleep
> +        self.time_per_byte = 1 / (bandwidth / 8.0 * 1024 * 1024) 
> +        self.new_roundtrip = False
> +
> +    def sleep(self, s):
> +        if self.really_sleep:
> +            time.sleep(s)
> +        SocketDelay.simulated_time += s

Shouldn't this be:

if self.really_sleep:
  time.sleep(s)
else:
  SocketDelay.simulated_time += s


So the biggest issue is just PEP8 cleanliness. I also think we would
prefer to have a Latency adapter rather than using a subclass like you
have done.
That way you can run some test cases with and without latency, by just
running the adapter over them.

John
=:->


-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 254 bytes
Desc: OpenPGP digital signature
Url : https://lists.ubuntu.com/archives/bazaar/attachments/20060814/51f12048/attachment.pgp 


More information about the bazaar mailing list