[MERGE] HTTP smart server
John Arbash Meinel
john at arbash-meinel.com
Thu Oct 5 22:41:49 BST 2006
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1
Andrew Bennetts wrote:
> The attached bundle implements the "smart" protocol from HTTP. It includes the
> client, and a testing server. It won't actually be used by "http://" URLs yet.
>
> The basic mechanism is that smart requests are POSTed to base_url +
> '.bzr/smart', and replies read from the body of the HTTP response.
>
> Part of this work has been to seperate the protocol from the "medium" (I would
> have used the term transport, much like Twisted does, but this term already has
> a quite different meaning in bzrlib). The resulting code in smart.py is
> generally cleaner and more flexible.
>
> -Andrew.
>
...
> === modified file BRANCH.TODO
> --- BRANCH.TODO
> +++ BRANCH.TODO
> @@ -1,3 +1,4 @@
> # This file is for listing TODOs for branches that are being worked on.
> # It should ALWAYS be empty in the mainline or in integration branches.
> #
> +
^- obviously you used BRANCH.TODO, but then finished everything that was
listed. It is usually good to just do a once-over your diff before you
submit to catch things like this.
...
> === modified file bzrlib/errors.py // last-changed:andrew.bennetts at canonical.co
> ... m-20060928040135-c2c58bbb690acc08
> --- bzrlib/errors.py
> +++ bzrlib/errors.py
> @@ -270,6 +270,14 @@
> """Directory not empty: %(path)r%(extra)s"""
>
>
v- It seems that ReadingCompleted, WritingCompleted, and
WritingNotCompleted might be reasonable to encapsulate in a heirarchy,
rather than being independent. Also, and more importantly, which ones of
these are actually "user" errors that should only create a single line
error message, and which are programming bugs, which should generate a
traceback?
> +class ReadingCompleted(BzrNewError):
> + """The MediumRequest '%(request)s' has already had finish_reading called upon it - the request has been completed and no more data may be read."""
> +
> + def __init__(self, request):
> + BzrNewError.__init__(self)
> + self.request = request
> +
> +
> class ResourceBusy(PathError):
> """Device or resource busy: %(path)r%(extra)s"""
>
> @@ -802,11 +810,21 @@
>
> class NoSuchExportFormat(BzrNewError):
> """Export format %(format)r not supported"""
> +
> def __init__(self, format):
> BzrNewError.__init__(self)
> self.format = format
>
>
> +
> +class TooManyConcurrentRequests(BzrNewError):
> + """The medium '%(medium)s' has reached its concurrent request limit. Be sure to finish_writing and finish_reading on the current request that is open."""
> +
> + def __init__(self, medium):
> + BzrNewError.__init__(self)
> + self.medium = medium
> +
> +
> class TransportError(BzrNewError):
> """Transport error: %(msg)s %(orig_error)s"""
>
> @@ -903,6 +921,22 @@
> " unchanged." % tree.basedir)
>
>
> +class WritingCompleted(BzrNewError):
> + """The MediumRequest '%(request)s' has already had finish_writing called upon it - accept bytes may not be called anymore."""
> +
> + def __init__(self, request):
> + BzrNewError.__init__(self)
> + self.request = request
> +
> +
> +class WritingNotComplete(BzrNewError):
> + """The MediumRequest '%(request)s' has not has finish_writing called upon it - until the write phase is complete no data may be read."""
> +
> + def __init__(self, request):
> + BzrNewError.__init__(self)
> + self.request = request
> +
> +
> class CantReprocessAndShowBase(BzrNewError):
> """Can't reprocess and show base.
> Reprocessing obscures relationship of conflicting lines to base."""
> @@ -923,6 +957,14 @@
> self.filename = filename
>
>
> +class MediumNotConnected(BzrNewError):
> + """The medium '%(medium)s' is not connected."""
> +
> + def __init__(self, medium):
> + BzrNewError.__init__(self)
> + self.medium = medium
> +
> +
> class MustUseDecorated(Exception):
> """A decorating function has requested its original command be used.
>
> @@ -1216,6 +1258,14 @@
> BadInventoryFormat.__init__(self, msg=msg)
>
>
> +class NoSmartMedium(BzrNewError):
> + """The transport '%(transport)s' cannot tunnel the smart protocol."""
> +
> + def __init__(self, transport):
> + BzrNewError.__init__(self)
> + self.transport = transport
> +
> +
> class NoSmartServer(NotBranchError):
> """No smart server available at %(url)s"""
...
v- Your default '_expect_body_tail' is None, but then later on you use a
plain:
while not self.received_bytes.endswith(self._expect_body_tail):
So if _expect_body_tail is None, this will fail with a weird error.
This seems to mean that _expect_body_tail should either be strictly
required, or default to '' or some other string object.
It might even be reasonable to have 'assert isinstance(expect_body_tail,
str)', since you explicitly expect that later...
> +
> + def __init__(self, expect_body_tail=None):
> + self._expect_body_tail = expect_body_tail
> + self.host = None
> + self.port = None
> + self.received_bytes = ''
> +
> + def setUp(self):
> + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> + self._sock.bind(('127.0.0.1', 0))
> + self.host, self.port = self._sock.getsockname()
> + self._ready = threading.Event()
> + self._thread = threading.Thread(target=self._accept_read_and_reply)
> + self._thread.setDaemon(True)
> + self._thread.start()
> + self._ready.wait(5)
> +
> + def _accept_read_and_reply(self):
> + self._sock.listen(1)
> + self._ready.set()
> + self._sock.settimeout(5)
> + try:
> + conn, address = self._sock.accept()
> + # On win32, the accepted connection will be non-blocking to start
> + # with because we're using settimeout.
> + conn.setblocking(True)
> + while not self.received_bytes.endswith(self._expect_body_tail):
> + self.received_bytes += conn.recv(4096)
> + conn.sendall('HTTP/1.1 200 OK\r\n')
> + except socket.timeout:
> + # Make sure the client isn't stuck waiting for us to e.g. accept.
> + self._sock.close()
> +
> + def tearDown(self):
> + try:
> + self._sock.close()
> + except socket.error:
> + # We might have already closed it. We don't care.
> + pass
> + self.host = None
> + self.port = None
> +
> +
> class TestHttpUrls(TestCase):
>
> def test_url_parsing(self):
> @@ -125,6 +176,13 @@
> self.assertTrue(server.logs[0].find(
> '"GET /foo/bar HTTP/1.1" 200 - "-" "bzr/%s' % bzrlib.__version__) > -1)
>
> + def test_get_smart_medium(self):
> + # For HTTP, get_smart_medium should return the transport object.
> + server = self.get_readonly_server()
> + http_transport = self._transport(server.get_url())
> + medium = http_transport.get_smart_medium()
> + self.assertTrue(medium is http_transport)
> +
^- We have a helper function 'assertIsInstance' or 'assertIs', which
should be better than assertTrue(... is ...)
...
> + # Doing a disconnect on a new (and thus unconnected) SSH medium
> + # does nothing.
> + medium = smart.SmartSSHClientMedium(None)
> + medium.disconnect()
^- Is this actually testing that it does nothing, or just that it
doesn't fail?
> +
> + def test_ssh_client_raises_on_read_when_not_connected(self):
> + # Doing a read on a new (and thus unconnected) SSH medium raises
> + # MediumNotConnected.
> + medium = smart.SmartSSHClientMedium(None)
> + self.assertRaises(errors.MediumNotConnected, medium.read_bytes, 0)
> + self.assertRaises(errors.MediumNotConnected, medium.read_bytes, 1)
> +
^- Are we wanting to assert that read_bytes(0) raises, or is that just
an implementation detail?
> + def test_ssh_client_supports__flush(self):
> + # invoking _flush on a SSHClientMedium should flush the output
> + # pipe. We test this by creating an output pipe that records
> + # flush calls made to it.
> + from StringIO import StringIO # get regular StringIO
> + input = StringIO()
> + output = StringIO()
> + flush_calls = []
> + def _flush(): flush_calls.append('flush')
> + output.flush = _flush
^- this seems a little bit evil, but not too bad. I would have thought
you need 'def _flush(self): ...', but you don't because the functions
should already be bound? Also, there is some confusion about using
output.flush = _flush, and then calling medium._flush().
Mabye calling it 'def logging_flush()'
> + vendor = StringIOSSHVendor(input, output)
> + medium = smart.SmartSSHClientMedium('a hostname', vendor=vendor)
> + # this call is here to ensure we only flush once, not on every
> + # _accept_bytes call.
> + medium._accept_bytes('abc')
> + medium._flush()
> + medium.disconnect()
> + self.assertEqual(['flush'], flush_calls)
> +
> + def test_construct_smart_tcp_client_medium(self):
> + # the TCP client medium takes a host and a port. Constructing it won't
> + # connect to anything.
> + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> + sock.bind(('127.0.0.1', 0))
> + unopen_port = sock.getsockname()[1]
> + medium = smart.SmartTCPClientMedium('127.0.0.1', unopen_port)
> + sock.close()
^- s/unopen_port/not_open_port/
> +
> + def test_tcp_client_connects_on_first_use(self):
> + # The only thing that initiates a connection from the medium is giving
> + # it bytes.
> + sock, medium = self.make_loopsocket_and_medium()
> + bytes = []
> + t = self.receive_bytes_on_server(sock, bytes)
> + medium._accept_bytes('abc')
> + t.join()
> + sock.close()
> + self.assertEqual(['abc'], bytes)
> +
> + def test_tcp_client_disconnect_does_so(self):
> + # calling disconnect on the client terminates the connection.
> + # we test this by forcing a short read during a socket.MSG_WAITALL
> + # call : write 2 bytes, try to read 3, and then the client disconnects.
> + sock, medium = self.make_loopsocket_and_medium()
> + bytes = []
> + t = self.receive_bytes_on_server(sock, bytes)
> + medium._accept_bytes('ab')
> + medium.disconnect()
> + t.join()
> + sock.close()
> + self.assertEqual(['ab'], bytes)
> + # now disconnect again : this should not do anything, if disconnection
> + # really did disconnect.
> + medium.disconnect()
^- Why is _accept_bytes() the function which sends bytes to the other
side? Is it possible to do something like this with public functions only?
And is there a reason you need to disconnect a second time? You make a
comment that 'it should not do anything', but I don't see how you are
testing that disconnect() is a no-op, versus it being an actual
disconnect. (You have no return value, or visible side-effects)
It seems like it would be a bug for the second disconnect to be needed.
v- These seem to be begging for an interface test, since you want to
test that all 'Medium' implementations support this. (How many Medium
implementations do you expect?)
I can see that there are small differences, but is it possible to factor
that out?
> +
> + def test_tcp_client_ignores_disconnect_when_not_connected(self):
> + # Doing a disconnect on a new (and thus unconnected) TCP medium
> + # does nothing.
> + medium = smart.SmartTCPClientMedium(None, None)
> + medium.disconnect()
> +
> + def test_tcp_client_raises_on_read_when_not_connected(self):
> + # Doing a read on a new (and thus unconnected) TCP medium raises
> + # MediumNotConnected.
> + medium = smart.SmartTCPClientMedium(None, None)
> + self.assertRaises(errors.MediumNotConnected, medium.read_bytes, 0)
> + self.assertRaises(errors.MediumNotConnected, medium.read_bytes, 1)
> +
> + def test_tcp_client_supports__flush(self):
> + # invoking _flush on a TCPClientMedium should do something useful.
> + # RBC 20060922 not sure how to test/tell in this case.
> + sock, medium = self.make_loopsocket_and_medium()
> + bytes = []
> + t = self.receive_bytes_on_server(sock, bytes)
> + # try with nothing buffered
> + medium._flush()
> + medium._accept_bytes('ab')
> + # and with something sent.
> + medium._flush()
> + medium.disconnect()
> + t.join()
> + sock.close()
> + self.assertEqual(['ab'], bytes)
> + # now disconnect again : this should not do anything, if disconnection
> + # really did disconnect.
> + medium.disconnect()
> +
> +
> +class TestSmartClientStreamMediumRequest(tests.TestCase):
> + """Tests the for SmartClientStreamMediumRequest.
> +
> + SmartClientStreamMediumRequest is a helper for the three stream based
> + mediums: TCP, SSH, SimplePipes, so we only test it once, and then test that
> + those three mediums implement the interface it expects.
> + """
> +
> + def test_accept_bytes_after_finished_writing_errors(self):
> + # calling accept_bytes after calling finished_writing raises
> + # WritingCompleted to prevent bad assumptions on stream environments
> + # breaking the needs of message-based environments.
> + output = StringIO()
> + medium = smart.SmartSimplePipesClientMedium(None, output)
> + request = smart.SmartClientStreamMediumRequest(medium)
> + request.finished_writing()
> + self.assertRaises(errors.WritingCompleted, request.accept_bytes, None)
> +
> + def test_accept_bytes(self):
> + # accept bytes should invoke _accept_bytes on the stream medium.
> + # we test this by using the SimplePipes medium - the most trivial one
> + # and checking that the pipes get the data.
> + input = StringIO()
> + output = StringIO()
> + medium = smart.SmartSimplePipesClientMedium(input, output)
> + request = smart.SmartClientStreamMediumRequest(medium)
> + request.accept_bytes('123')
> + request.finished_writing()
> + request.finished_reading()
> + self.assertEqual('', input.getvalue())
> + self.assertEqual('123', output.getvalue())
'accept_bytes()' really sounds to me like you are pulling new bytes to
yourself, not pushing bytes to other.
> +
> + def test_construct_sets_stream_request(self):
> + # constructing a SmartClientStreamMediumRequest on a StreamMedium sets
> + # the current request to the new SmartClientStreamMediumRequest
> + output = StringIO()
> + medium = smart.SmartSimplePipesClientMedium(None, output)
> + request = smart.SmartClientStreamMediumRequest(medium)
> + self.assertTrue(medium._current_request is request)
^- assertIs
> +
> + def test_construct_while_another_request_active_throws(self):
> + # constructing a SmartClientStreamMediumRequest on a StreamMedium with
> + # a non-None _current_request raises TooManyConcurrentRequests.
> + output = StringIO()
> + medium = smart.SmartSimplePipesClientMedium(None, output)
> + medium._current_request = "a"
> + self.assertRaises(errors.TooManyConcurrentRequests,
> + smart.SmartClientStreamMediumRequest, medium)
^- Can't you just construct 2 requests? It doesn't seem like you want to
really assert the behavior based on a private member (_current_request)
...
...
v- Are the buffers meant to be public?
> +
> + def test_construct_version_one_server_protocol(self):
> + protocol = smart.SmartServerRequestProtocolOne(None, None)
> + self.assertEqual('', protocol.excess_buffer)
> + self.assertEqual('', protocol.in_buffer)
> + self.assertFalse(protocol.has_dispatched)
> + # Once refactoring is complete, we don't need these assertions
> + self.assertFalse(hasattr(protocol, '_in'))
> + self.assertFalse(hasattr(protocol, '_out'))
> + self.assertEqual(1, protocol.next_read_size())
^- ***avoid the evil hasattr()***
Perhaps:
marker = object()
self.assertIs(marker, getattr(protocol, '_in', marker),
"Protocol should not have a member '_in'")
> +
> + def test_construct_version_one_client_protocol(self):
> + # we can construct a client protocol from a client medium request
> + output = StringIO()
> + medium = smart.SmartSimplePipesClientMedium(None, output)
> + request = medium.get_request()
> + client_protocol = smart.SmartClientRequestProtocolOne(request)
> +
> def test_server_offset_serialisation(self):
> """The Smart protocol serialises offsets as a comma and \n string.
...
> + def _test_bulk_data(self, url_protocol):
> + # We should be able to send and receive bulk data in a single message.
> + # The 'readv' command in the smart protocol both sends and receives bulk
> + # data, so we use that.
> + self.build_tree(['data-file'])
> + http_server = HTTPServerWithSmarts()
> + http_server._url_protocol = url_protocol
> + http_server.setUp()
> + self.addCleanup(http_server.tearDown)
> +
> + http_transport = get_transport(http_server.get_url())
> +
> + medium = http_transport.get_smart_medium()
> + #remote_transport = RemoteTransport('fake_url', medium)
> + remote_transport = smart.SmartTransport('/', medium=medium)
> + self.assertEqual(
> + [(0, "c")], list(remote_transport.readv("data-file", [(0,1)])))
> +
> + def test_bulk_data_pycurl(self):
> + self._test_bulk_data('http+pycurl')
> +
> + def test_bulk_data_urllib(self):
> + self._test_bulk_data('http+urllib')
^- what if pycurl is not installed?
...
...
Why does Post need its own curl object? Vincent Ladeuil had worked out
how to get _base and _range to share a curl object (just manually set
the Ranges header).
> === modified file bzrlib/transport/http/_pycurl.py // last-changed:andrew.benne
> ... tts at canonical.com-20060926054631-7d63ee5c801fb098
> --- bzrlib/transport/http/_pycurl.py
> +++ bzrlib/transport/http/_pycurl.py
> @@ -77,10 +77,12 @@
> if from_transport is not None:
> self._base_curl = from_transport._base_curl
> self._range_curl = from_transport._range_curl
> + self._post_curl = from_transport._post_curl
> else:
> mutter('using pycurl %s' % pycurl.version)
> self._base_curl = pycurl.Curl()
> self._range_curl = pycurl.Curl()
> + self._post_curl = pycurl.Curl()
>
v- There are quite a few typos here, can you clean it up so it is easier
to read?
> +
> +At the bottom level socket, pipes, HTTP server. For sockets, we have the
> +idea that you have multiple requests and get have a read error because the
vv ^^^^^^^^
> +other side did shutdown sd send. For pipes we have read pipe which will have a
> +zero read which marks end-of-file. For HTTP server environment there is not
> +end-of-stream because each request coming into the server is independent.
> +
vvvvvvv
> +So we need a wrapper around pipes and sockets to seperate out reqeusts from
> +substrate and this will give us a single model which is consist for HTTP,
> +sockets and pipes.
> +
> +Server-side
> +-----------
> +
> + MEDIUM (factory for protocol, reads bytes & pushes to protocol,
> + uses protocol to detect end-of-request, sends written
> + bytes to client) e.g. socket, pipe, HTTP request handler.
> + ^
> + | bytes.
> + v
> +
v- Martin has commented that we should be using American spelling which
means:
serialization and deserialization.
> +PROTOCOL (serialisation, deserialisation) accepts bytes for one
> + request, decodes according to internal state, pushes
> + structured data to handler. accepts structured data from
> + handler and encodes and writes to the medium. factory for
> + handler.
> + ^
> + | structured data
> + v
> +
> +HANDLER (domain logic) accepts structured data, operates state
> + machine until the request can be satisfied,
> + sends structured data to the protocol.
> +
> +
> +Client-side
> +-----------
> +
> + CLIENT domain logic, accepts domain requests, generated structured
> + data, reads structured data from responses and turns into
> + domain data. Sends structured data to the protocol.
> + Operates state machines until the request can be delivered
> + (e.g. reading from a bundle generated in bzrlib to deliver a
> + complete request).
> +
> + Possibly this should just be RemoteBzrDir, RemoteTransport,
> + ...
> + ^
> + | structured data
> + v
> +
> +PROTOCOL (serialisation, deserialisation) accepts structured data for one
> + request, encodes and writes to the medium. Reads bytes from the
> + medium, decodes and allows the client to read structured data.
> + ^
> + | bytes.
> + v
> +
> + MEDIUM (accepts bytes from the protocol & delivers to the remote server.
> + Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
> """
>
>
> @@ -137,14 +197,13 @@
> # the socket, and it assumes everything is UTF8 sections separated
> # by \001. Which means a request like '\002' Will abort the connection
> # because of a UnicodeDecodeError. It does look like invalid data will
> -# kill the SmartStreamServer, but only with an abort + exception, and
> +# kill the SmartServerStreamMedium, but only with an abort + exception, and
> # the overall server shouldn't die.
^- Shouldn't this kill the Protocol layer, not the Medium layer?
...
Admittedly, I didn't go over the rest as closely, but I think the rest
looks okay.
And I sort-of understand why you use 'accept_bytes', but I can say that
it wasn't directly obvious when I read it. Maybe 'process_bytes()' is
better, or maybe I was just reading it weird and not noticing that you
were telling the server process to 'accept_bytes' rather than the client
process.
It also isn't clear why you have the 'get_smart_client()' and
'get_smart_medium()' calls, when they all need to be passed into a
'SmartTransport' object. Doesn't it make sense to have
Transport.get_smart_transport() or something like that?
Maybe I misunderstood the layering.
John
=:->
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.5 (Darwin)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org
iD8DBQFFJXwcJdeBCYSNAAMRArqsAJ9GUgOEf4FVF3o09CygWwlz13xZoACgxr76
CrQe9omPbW/ZGE/tKuZzw9w=
=iTW7
-----END PGP SIGNATURE-----
More information about the bazaar
mailing list