[MERGE] HTTP smart server

John Arbash Meinel john at arbash-meinel.com
Thu Oct 5 22:41:49 BST 2006


-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

Andrew Bennetts wrote:
> The attached bundle implements the "smart" protocol from HTTP.  It includes the
> client, and a testing server.  It won't actually be used by "http://" URLs yet.
> 
> The basic mechanism is that smart requests are POSTed to base_url +
> '.bzr/smart', and replies read from the body of the HTTP response.
> 
> Part of this work has been to seperate the protocol from the "medium" (I would
> have used the term transport, much like Twisted does, but this term already has
> a quite different meaning in bzrlib).  The resulting code in smart.py is
> generally cleaner and more flexible.
> 
> -Andrew.
> 

...

> === modified file BRANCH.TODO
> --- BRANCH.TODO
> +++ BRANCH.TODO
> @@ -1,3 +1,4 @@
>  # This file is for listing TODOs for branches that are being worked on.
>  # It should ALWAYS be empty in the mainline or in integration branches.
>  # 
> +

^- obviously you used BRANCH.TODO, but then finished everything that was
listed. It is usually good to just do a once-over your diff before you
submit to catch things like this.


...

> === modified file bzrlib/errors.py // last-changed:andrew.bennetts at canonical.co
> ... m-20060928040135-c2c58bbb690acc08
> --- bzrlib/errors.py
> +++ bzrlib/errors.py
> @@ -270,6 +270,14 @@
>      """Directory not empty: %(path)r%(extra)s"""
>  
>  

v- It seems that ReadingCompleted, WritingCompleted, and
WritingNotCompleted might be reasonable to encapsulate in a heirarchy,
rather than being independent. Also, and more importantly, which ones of
these are actually "user" errors that should only create a single line
error message, and which are programming bugs, which should generate a
traceback?

> +class ReadingCompleted(BzrNewError):
> +    """The MediumRequest '%(request)s' has already had finish_reading called upon it - the request has been completed and no more data may be read."""
> +
> +    def __init__(self, request):
> +        BzrNewError.__init__(self)
> +        self.request = request
> +
> +
>  class ResourceBusy(PathError):
>      """Device or resource busy: %(path)r%(extra)s"""
>  
> @@ -802,11 +810,21 @@
>  
>  class NoSuchExportFormat(BzrNewError):
>      """Export format %(format)r not supported"""
> +
>      def __init__(self, format):
>          BzrNewError.__init__(self)
>          self.format = format
>  
>  
> +
> +class TooManyConcurrentRequests(BzrNewError):
> +    """The medium '%(medium)s' has reached its concurrent request limit. Be sure to finish_writing and finish_reading on the current request that is open."""
> +
> +    def __init__(self, medium):
> +        BzrNewError.__init__(self)
> +        self.medium = medium
> +
> +
>  class TransportError(BzrNewError):
>      """Transport error: %(msg)s %(orig_error)s"""
>  
> @@ -903,6 +921,22 @@
>                            " unchanged." % tree.basedir)
>  
>  
> +class WritingCompleted(BzrNewError):
> +    """The MediumRequest '%(request)s' has already had finish_writing called upon it - accept bytes may not be called anymore."""
> +
> +    def __init__(self, request):
> +        BzrNewError.__init__(self)
> +        self.request = request
> +
> +
> +class WritingNotComplete(BzrNewError):
> +    """The MediumRequest '%(request)s' has not has finish_writing called upon it - until the write phase is complete no data may be read."""
> +
> +    def __init__(self, request):
> +        BzrNewError.__init__(self)
> +        self.request = request
> +
> +
>  class CantReprocessAndShowBase(BzrNewError):
>      """Can't reprocess and show base.
>  Reprocessing obscures relationship of conflicting lines to base."""
> @@ -923,6 +957,14 @@
>          self.filename = filename
>  
>  
> +class MediumNotConnected(BzrNewError):
> +    """The medium '%(medium)s' is not connected."""
> +
> +    def __init__(self, medium):
> +        BzrNewError.__init__(self)
> +        self.medium = medium
> +
> +
>  class MustUseDecorated(Exception):
>      """A decorating function has requested its original command be used.
>      
> @@ -1216,6 +1258,14 @@
>          BadInventoryFormat.__init__(self, msg=msg)
>  
>  
> +class NoSmartMedium(BzrNewError):
> +    """The transport '%(transport)s' cannot tunnel the smart protocol."""
> +
> +    def __init__(self, transport):
> +        BzrNewError.__init__(self)
> +        self.transport = transport
> +
> +
>  class NoSmartServer(NotBranchError):
>      """No smart server available at %(url)s"""

...


v- Your default '_expect_body_tail' is None, but then later on you use a
plain:

  while not self.received_bytes.endswith(self._expect_body_tail):

So if _expect_body_tail is None, this will fail with a weird error.

This seems to mean that _expect_body_tail should either be strictly
required, or default to '' or some other string object.

It might even be reasonable to have 'assert isinstance(expect_body_tail,
str)', since you explicitly expect that later...

> +
> +    def __init__(self, expect_body_tail=None):
> +        self._expect_body_tail = expect_body_tail
> +        self.host = None
> +        self.port = None
> +        self.received_bytes = ''
> +
> +    def setUp(self):
> +        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> +        self._sock.bind(('127.0.0.1', 0))
> +        self.host, self.port = self._sock.getsockname()
> +        self._ready = threading.Event()
> +        self._thread = threading.Thread(target=self._accept_read_and_reply)
> +        self._thread.setDaemon(True)
> +        self._thread.start()
> +        self._ready.wait(5)
> +
> +    def _accept_read_and_reply(self):
> +        self._sock.listen(1)
> +        self._ready.set()
> +        self._sock.settimeout(5)
> +        try:
> +            conn, address = self._sock.accept()
> +            # On win32, the accepted connection will be non-blocking to start
> +            # with because we're using settimeout.
> +            conn.setblocking(True)
> +            while not self.received_bytes.endswith(self._expect_body_tail):
> +                self.received_bytes += conn.recv(4096)
> +            conn.sendall('HTTP/1.1 200 OK\r\n')
> +        except socket.timeout:
> +            # Make sure the client isn't stuck waiting for us to e.g. accept.
> +            self._sock.close()
> +
> +    def tearDown(self):
> +        try:
> +            self._sock.close()
> +        except socket.error:
> +            # We might have already closed it.  We don't care.
> +            pass
> +        self.host = None
> +        self.port = None
> +
> +
>  class TestHttpUrls(TestCase):
>  
>      def test_url_parsing(self):
> @@ -125,6 +176,13 @@
>          self.assertTrue(server.logs[0].find(
>              '"GET /foo/bar HTTP/1.1" 200 - "-" "bzr/%s' % bzrlib.__version__) > -1)
>  
> +    def test_get_smart_medium(self):
> +        # For HTTP, get_smart_medium should return the transport object.
> +        server = self.get_readonly_server()
> +        http_transport = self._transport(server.get_url())
> +        medium = http_transport.get_smart_medium()
> +        self.assertTrue(medium is http_transport)
> +        

^- We have a helper function 'assertIsInstance' or 'assertIs', which
should be better than assertTrue(... is ...)

...

> +        # Doing a disconnect on a new (and thus unconnected) SSH medium
> +        # does nothing.
> +        medium = smart.SmartSSHClientMedium(None)
> +        medium.disconnect()

^- Is this actually testing that it does nothing, or just that it
doesn't fail?

> +
> +    def test_ssh_client_raises_on_read_when_not_connected(self):
> +        # Doing a read on a new (and thus unconnected) SSH medium raises
> +        # MediumNotConnected.
> +        medium = smart.SmartSSHClientMedium(None)
> +        self.assertRaises(errors.MediumNotConnected, medium.read_bytes, 0)
> +        self.assertRaises(errors.MediumNotConnected, medium.read_bytes, 1)
> +

^- Are we wanting to assert that read_bytes(0) raises, or is that just
an implementation detail?

> +    def test_ssh_client_supports__flush(self):
> +        # invoking _flush on a SSHClientMedium should flush the output 
> +        # pipe. We test this by creating an output pipe that records
> +        # flush calls made to it.
> +        from StringIO import StringIO # get regular StringIO
> +        input = StringIO()
> +        output = StringIO()
> +        flush_calls = []
> +        def _flush(): flush_calls.append('flush')
> +        output.flush = _flush

^- this seems a little bit evil, but not too bad. I would have thought
you need 'def _flush(self): ...', but you don't because the functions
should already be bound? Also, there is some confusion about using
output.flush = _flush, and then calling medium._flush().
Mabye calling it 'def logging_flush()'

> +        vendor = StringIOSSHVendor(input, output)
> +        medium = smart.SmartSSHClientMedium('a hostname', vendor=vendor)
> +        # this call is here to ensure we only flush once, not on every
> +        # _accept_bytes call.
> +        medium._accept_bytes('abc')
> +        medium._flush()
> +        medium.disconnect()
> +        self.assertEqual(['flush'], flush_calls)
> +        
> +    def test_construct_smart_tcp_client_medium(self):
> +        # the TCP client medium takes a host and a port.  Constructing it won't
> +        # connect to anything.
> +        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> +        sock.bind(('127.0.0.1', 0))
> +        unopen_port = sock.getsockname()[1]
> +        medium = smart.SmartTCPClientMedium('127.0.0.1', unopen_port)
> +        sock.close()

^- s/unopen_port/not_open_port/

> +
> +    def test_tcp_client_connects_on_first_use(self):
> +        # The only thing that initiates a connection from the medium is giving
> +        # it bytes.
> +        sock, medium = self.make_loopsocket_and_medium()
> +        bytes = []
> +        t = self.receive_bytes_on_server(sock, bytes)
> +        medium._accept_bytes('abc')
> +        t.join()
> +        sock.close()
> +        self.assertEqual(['abc'], bytes)
> +    
> +    def test_tcp_client_disconnect_does_so(self):
> +        # calling disconnect on the client terminates the connection.
> +        # we test this by forcing a short read during a socket.MSG_WAITALL
> +        # call : write 2 bytes, try to read 3, and then the client disconnects.
> +        sock, medium = self.make_loopsocket_and_medium()
> +        bytes = []
> +        t = self.receive_bytes_on_server(sock, bytes)
> +        medium._accept_bytes('ab')
> +        medium.disconnect()
> +        t.join()
> +        sock.close()
> +        self.assertEqual(['ab'], bytes)
> +        # now disconnect again : this should not do anything, if disconnection
> +        # really did disconnect.
> +        medium.disconnect()

^- Why is _accept_bytes() the function which sends bytes to the other
side? Is it possible to do something like this with public functions only?

And is there a reason you need to disconnect a second time? You make a
comment that 'it should not do anything', but I don't see how you are
testing that disconnect() is a no-op, versus it being an actual
disconnect. (You have no return value, or visible side-effects)

It seems like it would be a bug for the second disconnect to be needed.

v- These seem to be begging for an interface test, since you want to
test that all 'Medium' implementations support this. (How many Medium
implementations do you expect?)
I can see that there are small differences, but is it possible to factor
that out?

> +    
> +    def test_tcp_client_ignores_disconnect_when_not_connected(self):
> +        # Doing a disconnect on a new (and thus unconnected) TCP medium
> +        # does nothing.
> +        medium = smart.SmartTCPClientMedium(None, None)
> +        medium.disconnect()
> +
> +    def test_tcp_client_raises_on_read_when_not_connected(self):
> +        # Doing a read on a new (and thus unconnected) TCP medium raises
> +        # MediumNotConnected.
> +        medium = smart.SmartTCPClientMedium(None, None)
> +        self.assertRaises(errors.MediumNotConnected, medium.read_bytes, 0)
> +        self.assertRaises(errors.MediumNotConnected, medium.read_bytes, 1)
> +
> +    def test_tcp_client_supports__flush(self):
> +        # invoking _flush on a TCPClientMedium should do something useful.
> +        # RBC 20060922 not sure how to test/tell in this case.
> +        sock, medium = self.make_loopsocket_and_medium()
> +        bytes = []
> +        t = self.receive_bytes_on_server(sock, bytes)
> +        # try with nothing buffered
> +        medium._flush()
> +        medium._accept_bytes('ab')
> +        # and with something sent.
> +        medium._flush()
> +        medium.disconnect()
> +        t.join()
> +        sock.close()
> +        self.assertEqual(['ab'], bytes)
> +        # now disconnect again : this should not do anything, if disconnection
> +        # really did disconnect.
> +        medium.disconnect()
> +
> +
> +class TestSmartClientStreamMediumRequest(tests.TestCase):
> +    """Tests the for SmartClientStreamMediumRequest.
> +    
> +    SmartClientStreamMediumRequest is a helper for the three stream based 
> +    mediums: TCP, SSH, SimplePipes, so we only test it once, and then test that
> +    those three mediums implement the interface it expects.
> +    """
> +
> +    def test_accept_bytes_after_finished_writing_errors(self):
> +        # calling accept_bytes after calling finished_writing raises 
> +        # WritingCompleted to prevent bad assumptions on stream environments
> +        # breaking the needs of message-based environments.
> +        output = StringIO()
> +        medium = smart.SmartSimplePipesClientMedium(None, output)
> +        request = smart.SmartClientStreamMediumRequest(medium)
> +        request.finished_writing()
> +        self.assertRaises(errors.WritingCompleted, request.accept_bytes, None)
> +
> +    def test_accept_bytes(self):
> +        # accept bytes should invoke _accept_bytes on the stream medium.
> +        # we test this by using the SimplePipes medium - the most trivial one
> +        # and checking that the pipes get the data.
> +        input = StringIO()
> +        output = StringIO()
> +        medium = smart.SmartSimplePipesClientMedium(input, output)
> +        request = smart.SmartClientStreamMediumRequest(medium)
> +        request.accept_bytes('123')
> +        request.finished_writing()
> +        request.finished_reading()
> +        self.assertEqual('', input.getvalue())
> +        self.assertEqual('123', output.getvalue())

'accept_bytes()' really sounds to me like you are pulling new bytes to
yourself, not pushing bytes to other.


> +
> +    def test_construct_sets_stream_request(self):
> +        # constructing a SmartClientStreamMediumRequest on a StreamMedium sets
> +        # the current request to the new SmartClientStreamMediumRequest
> +        output = StringIO()
> +        medium = smart.SmartSimplePipesClientMedium(None, output)
> +        request = smart.SmartClientStreamMediumRequest(medium)
> +        self.assertTrue(medium._current_request is request)

^- assertIs

> +
> +    def test_construct_while_another_request_active_throws(self):
> +        # constructing a SmartClientStreamMediumRequest on a StreamMedium with
> +        # a non-None _current_request raises TooManyConcurrentRequests.
> +        output = StringIO()
> +        medium = smart.SmartSimplePipesClientMedium(None, output)
> +        medium._current_request = "a"
> +        self.assertRaises(errors.TooManyConcurrentRequests,
> +            smart.SmartClientStreamMediumRequest, medium)

^- Can't you just construct 2 requests? It doesn't seem like you want to
really assert the behavior based on a private member (_current_request)

...
...
v- Are the buffers meant to be public?

> +
> +    def test_construct_version_one_server_protocol(self):
> +        protocol = smart.SmartServerRequestProtocolOne(None, None)
> +        self.assertEqual('', protocol.excess_buffer)
> +        self.assertEqual('', protocol.in_buffer)
> +        self.assertFalse(protocol.has_dispatched)
> +        # Once refactoring is complete, we don't need these assertions
> +        self.assertFalse(hasattr(protocol, '_in'))
> +        self.assertFalse(hasattr(protocol, '_out'))
> +        self.assertEqual(1, protocol.next_read_size())

^- ***avoid the evil hasattr()***
Perhaps:
marker = object()
self.assertIs(marker, getattr(protocol, '_in', marker),
	      "Protocol should not have a member '_in'")

> +
> +    def test_construct_version_one_client_protocol(self):
> +        # we can construct a client protocol from a client medium request
> +        output = StringIO()
> +        medium = smart.SmartSimplePipesClientMedium(None, output)
> +        request = medium.get_request()
> +        client_protocol = smart.SmartClientRequestProtocolOne(request)
> +
>      def test_server_offset_serialisation(self):
>          """The Smart protocol serialises offsets as a comma and \n string.

...

> +    def _test_bulk_data(self, url_protocol):
> +        # We should be able to send and receive bulk data in a single message.
> +        # The 'readv' command in the smart protocol both sends and receives bulk
> +        # data, so we use that.
> +        self.build_tree(['data-file'])
> +        http_server = HTTPServerWithSmarts()
> +        http_server._url_protocol = url_protocol
> +        http_server.setUp()
> +        self.addCleanup(http_server.tearDown)
> +
> +        http_transport = get_transport(http_server.get_url())
> +
> +        medium = http_transport.get_smart_medium()
> +        #remote_transport = RemoteTransport('fake_url', medium)
> +        remote_transport = smart.SmartTransport('/', medium=medium)
> +        self.assertEqual(
> +            [(0, "c")], list(remote_transport.readv("data-file", [(0,1)])))
> +
> +    def test_bulk_data_pycurl(self):
> +        self._test_bulk_data('http+pycurl')
> +    
> +    def test_bulk_data_urllib(self):
> +        self._test_bulk_data('http+urllib')

^- what if pycurl is not installed?

...

...

Why does Post need its own curl object? Vincent Ladeuil had worked out
how to get _base and _range to share a curl object (just manually set
the Ranges header).

> === modified file bzrlib/transport/http/_pycurl.py // last-changed:andrew.benne
> ... tts at canonical.com-20060926054631-7d63ee5c801fb098
> --- bzrlib/transport/http/_pycurl.py
> +++ bzrlib/transport/http/_pycurl.py
> @@ -77,10 +77,12 @@
>          if from_transport is not None:
>              self._base_curl = from_transport._base_curl
>              self._range_curl = from_transport._range_curl
> +            self._post_curl = from_transport._post_curl
>          else:
>              mutter('using pycurl %s' % pycurl.version)
>              self._base_curl = pycurl.Curl()
>              self._range_curl = pycurl.Curl()
> +            self._post_curl = pycurl.Curl()
>  

v- There are quite a few typos here, can you clean it up so it is easier
to read?

> +
> +At the bottom level socket, pipes, HTTP server.  For sockets, we have the
> +idea that you have multiple requests and get have a read error because the
                           vv               ^^^^^^^^
> +other side did shutdown sd send.  For pipes we have read pipe which will have a
> +zero read which marks end-of-file.  For HTTP server environment there is not
> +end-of-stream because each request coming into the server is independent.
> +
                                                                 vvvvvvv
> +So we need a wrapper around pipes and sockets to seperate out reqeusts from
> +substrate and this will give us a single model which is consist for HTTP,
> +sockets and pipes.
> +
> +Server-side
> +-----------
> +
> + MEDIUM  (factory for protocol, reads bytes & pushes to protocol,
> +          uses protocol to detect end-of-request, sends written
> +          bytes to client) e.g. socket, pipe, HTTP request handler.
> +  ^
> +  | bytes.
> +  v
> +

v- Martin has commented that we should be using American spelling which
means:
serialization and deserialization.

> +PROTOCOL  (serialisation, deserialisation)  accepts bytes for one
> +          request, decodes according to internal state, pushes
> +          structured data to handler.  accepts structured data from
> +          handler and encodes and writes to the medium.  factory for
> +          handler.
> +  ^
> +  | structured data
> +  v
> +
> +HANDLER   (domain logic) accepts structured data, operates state
> +          machine until the request can be satisfied,
> +          sends structured data to the protocol.
> +
> +
> +Client-side
> +-----------
> +
> + CLIENT             domain logic, accepts domain requests, generated structured
> +                    data, reads structured data from responses and turns into
> +                    domain data.  Sends structured data to the protocol.
> +                    Operates state machines until the request can be delivered
> +                    (e.g. reading from a bundle generated in bzrlib to deliver a
> +                    complete request).
> +
> +                    Possibly this should just be RemoteBzrDir, RemoteTransport,
> +                    ...
> +  ^
> +  | structured data
> +  v
> +
> +PROTOCOL  (serialisation, deserialisation)  accepts structured data for one
> +          request, encodes and writes to the medium.  Reads bytes from the
> +          medium, decodes and allows the client to read structured data.
> +  ^
> +  | bytes.
> +  v
> +
> + MEDIUM  (accepts bytes from the protocol & delivers to the remote server.
> +          Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
>  """
>  
>  
> @@ -137,14 +197,13 @@
>  #       the socket, and it assumes everything is UTF8 sections separated
>  #       by \001. Which means a request like '\002' Will abort the connection
>  #       because of a UnicodeDecodeError. It does look like invalid data will
> -#       kill the SmartStreamServer, but only with an abort + exception, and 
> +#       kill the SmartServerStreamMedium, but only with an abort + exception, and 
>  #       the overall server shouldn't die.

^- Shouldn't this kill the Protocol layer, not the Medium layer?

...

Admittedly, I didn't go over the rest as closely, but I think the rest
looks okay.

And I sort-of understand why you use 'accept_bytes', but I can say that
it wasn't directly obvious when I read it. Maybe 'process_bytes()' is
better, or maybe I was just reading it weird and not noticing that you
were telling the server process to 'accept_bytes' rather than the client
process.

It also isn't clear why you have the 'get_smart_client()' and
'get_smart_medium()' calls, when they all need to be passed into a
'SmartTransport' object. Doesn't it make sense to have
Transport.get_smart_transport() or something like that?

Maybe I misunderstood the layering.

John
=:->



-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.5 (Darwin)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org

iD8DBQFFJXwcJdeBCYSNAAMRArqsAJ9GUgOEf4FVF3o09CygWwlz13xZoACgxr76
CrQe9omPbW/ZGE/tKuZzw9w=
=iTW7
-----END PGP SIGNATURE-----





More information about the bazaar mailing list