Rev 29: Implement an HTTP specific collection to get finer results. in http://code.launchpad.net/%7Ev-ladeuil/bzr/transportstats
Vincent Ladeuil
v.ladeuil+lp at free.fr
Fri Dec 7 11:09:27 GMT 2007
At http://code.launchpad.net/%7Ev-ladeuil/bzr/transportstats
------------------------------------------------------------
revno: 29
revision-id:v.ladeuil+lp at free.fr-20071207110923-g1fzl339sb4665h8
parent: v.ladeuil+lp at free.fr-20071205091943-bmh501x3vngt5pxa
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: transportstats
timestamp: Fri 2007-12-07 12:09:23 +0100
message:
Implement an HTTP specific collection to get finer results.
* stats.py:
New HTTP specific statistic formats.
* decorator.py:
(StatsCollector.__new__): Create implementation specific
statistics collectors if needed.
(DefaultStatsCollector): New class implementing the default
statistic collection (formerly StatsCollecor). Some method renamed
so that daughter classes can access them.
(DefaultStatsCollector._max_latency): Now a class attribute.
(HTTPStatsCollector): HTTP specific implementation, mostly for
readv and _get.
* commands.py:
(cmd_ts_display.run): Add 'bytes requested' and 'cumulated
latency' since those are interesting data.
modified:
commands.py commands.py-20070927123419-zy82qayy9xzwxvzi-1
decorator.py decorator.py-20070926152401-52kuiex1mu755ajk-1
stats.py stats.py-20070928061304-7i3r2h4gg6rbi03e-1
-------------- next part --------------
=== modified file 'commands.py'
--- a/commands.py 2007-11-26 09:53:37 +0000
+++ b/commands.py 2007-12-07 11:09:23 +0000
@@ -60,6 +60,7 @@
mystats.start_exploiting()
requests = 0
+ bytes_requested = 0
bytes_read = 0
bytes_written = 0
latency_count = 0
@@ -68,6 +69,7 @@
if verbose:
print stat._print_fmt % stat.__dict__
bytes_read += getattr(stat, 'bytes_read', 0)
+ bytes_requested += getattr(stat, 'bytes_requested', 0)
bytes_written += getattr(stat, 'bytes_written', 0)
latency = getattr(stat, 'latency', None)
if latency is not None:
@@ -79,8 +81,10 @@
avg_latency = total_latency / latency_count
print '# requests: %d' % requests
+ print 'Bytes requested: %d' % bytes_requested
print 'Bytes read: %d' % bytes_read
print 'Bytes written: %d' % bytes_written
+ print 'Cumulated latency: %dms' % total_latency
# FIXME: It may be more appropriate to divide by 2 as ping does.
print 'Average latency: %dms' % avg_latency
mystats.stop_exploiting()
=== modified file 'decorator.py'
--- a/decorator.py 2007-12-05 09:19:43 +0000
+++ b/decorator.py 2007-12-07 11:09:23 +0000
@@ -30,16 +30,48 @@
from bzrlib.lazy_import import lazy_import
lazy_import(globals(), """
+from bzrlib import errors
from bzrlib.plugins.transportstats import stats
from bzrlib.transport import decorator
+from bzrlib.transport.http import response
""")
-MAX_LATENCY = 2**32 - 1
+
+# Associates the scheme with a specific statistic collector class
+specific_collectors = dict()
+
class StatsCollector(decorator.TransportDecorator):
"""A decorator that collect statistics.
- This is requested via the 'stats+' prefix to get_transport().
+ This is requested via the 'stats+' prefix to get_transport(). This class
+ is a factory selecting the right implementation depending on the base used.
+ """
+
+ def __new__(cls, base, *args, **kwargs):
+ coll = DefaultStatsCollector(base, *args, **kwargs)
+ decorated = coll._decorated
+ # A bit hackish but we don't want to import specific transports and
+ # there is no way to find the decorated class from the base without
+ # creating another transport instance.
+ for key in specific_collectors.keys():
+ if decorated.base.startswith(key):
+ return specific_collectors[key](base, _decorated=decorated)
+
+ # No specific implementation found
+ return coll
+
+ @classmethod
+ def _get_url_prefix(self):
+ """Stats transport decorators are invoked via 'stats+'"""
+ return 'stats+'
+
+
+class DefaultStatsCollector(decorator.TransportDecorator):
+ """The default implementation for StatsCollector.
+
+ This class should be suitable for any transport for which a specific
+ implementation is not needed.
Latency is represented with an unsigned long number of miliseconds.The
latency measured here represents an estimate of the time between request
@@ -48,25 +80,31 @@
data.
"""
- def __init__(self, *args):
+ _max_latency = 2**32 - 1
+
+ def __init__(self, base, _decorated=None, _from_transport=None):
"See TransportDecorator.__init__"""
- super(StatsCollector, self).__init__(*args)
- self._stats = stats.get_stats()
- self._stats.start_collecting()
- self.__start_time = 0
+ super(DefaultStatsCollector, self).__init__(base, _decorated,
+ _from_transport)
+ if _from_transport is not None:
+ self._stats = _from_transport._stats
+ else:
+ self._stats = stats.get_stats()
+ self._stats.start_collecting()
+ self._start_time = None
- def __collect(self, name, *args):
+ def _collect(self, name, *args):
self._stats.write_stat(name, self._decorated.base, *args)
- def __start(self):
- self.__start_time = time.time()
+ def _start(self):
+ self._start_time = time.time()
- def __latency(self):
+ def _latency(self):
# Convert to int and cap
- latency = int((time.time() - self.__start_time) * 1000)
- return max(0, min(MAX_LATENCY, latency))
+ latency = int((time.time() - self._start_time) * 1000)
+ return max(0, min(self._max_latency, latency))
- def __pump_to_tmp(self, f):
+ def _pump_to_tmp(self, f):
"""Copy the file-like object content to a temp file.
:returns: temp_file, nb_bytes. temp_file is the temporary file
@@ -86,8 +124,8 @@
def append_file(self, relpath, f, mode=None):
"""See Transport.append_file()."""
saved_pos = f.tell()
- temp_file, nb_bytes = self.__pump_to_tmp(f)
- self.__start()
+ temp_file, nb_bytes = self._pump_to_tmp(f)
+ self._start()
try:
ret = self._decorated.append_file(relpath, temp_file, mode=mode)
except:
@@ -95,68 +133,68 @@
temp_file.close()
raise
else:
- self.__collect('Transport.append_file', relpath, nb_bytes,
- self.__latency())
+ self._collect('Transport.append_file', relpath, nb_bytes,
+ self._latency())
temp_file.close()
return ret
def append_bytes(self, relpath, bytes, mode=None):
"""See Transport.append_bytes()."""
- self.__start()
+ self._start()
ret = self._decorated.append_bytes(relpath, bytes, mode=mode)
- self.__collect('Transport.append_bytes', relpath, len(bytes),
- self.__latency())
+ self._collect('Transport.append_bytes', relpath, len(bytes),
+ self._latency())
return ret
def delete(self, relpath):
"""See Transport.delete()."""
- self.__start()
+ self._start()
ret = self._decorated.delete(relpath)
- self.__collect('Transport.delete', relpath, self.__latency())
+ self._collect('Transport.delete', relpath, self._latency())
return ret
def delete_tree(self, relpath):
"""See Transport.delete_tree()."""
- self.__start()
+ self._start()
ret = self._decorated.delete_tree(relpath)
- self.__collect('Transport.delete_tree', relpath, self.__latency())
+ self._collect('Transport.delete_tree', relpath, self._latency())
return ret
def get(self, relpath):
"""See Transport.get()."""
- self.__start()
+ self._start()
f = self._decorated.get(relpath)
- latency = self.__latency()
+ latency = self._latency()
temp_file = tempfile.TemporaryFile()
nb_bytes = self._pump(f, temp_file)
temp_file.seek(0)
f.close()
- self.__collect('Transport.get', relpath, nb_bytes, latency)
+ self._collect('Transport.get', relpath, nb_bytes, latency)
return temp_file
def get_bytes(self, relpath):
"""See Transport.get_bytes()."""
- self.__start()
+ self._start()
bytes = self._decorated.get_bytes(relpath)
- self.__collect('Transport.get_bytes', relpath, len(bytes),
- self.__latency())
+ self._collect('Transport.get_bytes', relpath, len(bytes),
+ self._latency())
return bytes
def has(self, relpath):
"""See Transport.has()."""
- self.__start()
+ self._start()
ret = self._decorated.has(relpath)
- self.__collect('Transport.has', relpath, self.__latency())
+ self._collect('Transport.has', relpath, self._latency())
return ret
def mkdir(self, relpath, mode=None):
"""See Transport.mkdir()."""
- self.__start()
+ self._start()
ret = self._decorated.mkdir(relpath, mode)
- self.__collect('Transport.mkdir', relpath, self.__latency())
+ self._collect('Transport.mkdir', relpath, self._latency())
return ret
#FIXME:open_write_stream may need several implementations
@@ -164,7 +202,7 @@
def put_file(self, relpath, f, mode=None):
"""See Transport.put_file()."""
saved_pos = f.tell()
- temp_file, nb_bytes = self.__pump_to_tmp(f)
+ temp_file, nb_bytes = self._pump_to_tmp(f)
self.__start()
try:
ret = self._decorated.put_file(relpath, temp_file, mode)
@@ -173,22 +211,22 @@
temp_file.close()
raise
else:
- self.__collect('Transport.put_file', relpath, nb_bytes,
- self.__latency())
+ self._collect('Transport.put_file', relpath, nb_bytes,
+ self._latency())
temp_file.close()
return ret
def put_bytes(self, relpath, bytes, mode=None):
"""See Transport.put_bytes()."""
- self.__start()
+ self._start()
ret = self._decorated.put_bytes(relpath, bytes, mode)
- self.__collect('Transport.put_bytes', relpath, len(bytes),
- self.__latency())
+ self._collect('Transport.put_bytes', relpath, len(bytes),
+ self._latency())
return ret
def readv(self, relpath, offsets, *args, **kwargs):
"""See Transport.readv()."""
- self.__start()
+ self._start()
first = True
# FIXME: There are two problems here, we don't really collect neither
# the transactions with a remote server nor the real bytes
@@ -200,46 +238,188 @@
for (pos, data) in self._decorated.readv(relpath, offsets,
*args, **kwargs):
if first:
- self.__collect('Transport.readv', relpath, self.__latency())
+ self._collect('Transport.readv', relpath, self._latency())
first = False
- self.__collect('Transport.readv/offset', relpath, pos, len(data))
+ self._collect('Transport.readv/offset', relpath, pos, len(data))
yield pos, data
def rename(self, rel_from, rel_to):
- self.__start()
+ self._start()
ret = self._decorated.rename(rel_from, rel_to)
- self.__collect('Transport.rename', rel_from, rel_to, self.__latency())
+ self._collect('Transport.rename', rel_from, rel_to, self._latency())
return ret
def rmdir(self, relpath):
"""See Transport.rmdir."""
- self.__start()
+ self._start()
ret = self._decorated.rmdir(relpath)
- self.__collect('Transport.rmdir', relpath, self.__latency())
+ self._collect('Transport.rmdir', relpath, self._latency())
return ret
def stat(self, relpath):
"""See Transport.stat()."""
- self.__start()
+ self._start()
ret = self._decorated.stat(relpath)
- self.__collect('Transport.stat', relpath, self.__latency())
+ self._collect('Transport.stat', relpath, self._latency())
return ret
def lock_read(self, relpath):
"""See Transport.lock_read."""
- self.__start()
+ self._start()
ret = self._decorated.lock_read(relpath)
- self.__collect('Transport.lock_read', relpath, self.__latency())
+ self._collect('Transport.lock_read', relpath, self._latency())
return ret
def lock_write(self, relpath):
"""See Transport.lock_write."""
- self.__start()
+ self._start()
ret = self._decorated.lock_write(relpath)
- self.__collect('Transport.lock_write', relpath, self.__latency())
+ self._collect('Transport.lock_write', relpath, self._latency())
return ret
+# The http response handling have evolved and measuring the bytes downloaded
+# needs different implementations.
+try:
+ from bzrlib.transport.http.response import HttpRangeResponse
+ http_response_streamed = False
+except ImportError:
+ http_response_streamed = True
+
+
+class HTTPStatsCollector(DefaultStatsCollector):
+
+ def __init__(self, base, _decorated=None, _from_transport=None):
+ "See TransportDecorator.__init__"""
+ super(HTTPStatsCollector, self).__init__(base, _decorated,
+ _from_transport)
+ # Install our hooks. Note that the methods are already bound so they
+ # get the right 'self' when executing.
+ self._get_of_decorated = self._decorated._get
+ self._decorated._get = self._get_for_decorated
+ if http_response_streamed:
+ self._get_nb_bytes = self._get_nb_bytes_streamed
+ else:
+ self._get_nb_bytes = self._get_nb_bytes_not_streamed
+
+ def _get_nb_bytes_not_streamed(self, resp):
+ """Return the number of bytes in the response body.
+
+ Since the whole response is buffered we just have to peak under the
+ covers of the two possible implementations.
+ """
+ if isinstance(resp, response.RangeFile):
+ nb_bytes = len(resp._data)
+ else:
+ nb_bytes = len(resp.getvalue())
+ return nb_bytes, resp
+
+ def _get_nb_bytes_streamed(self, resp):
+ """Return the number of bytes in the response body.
+
+ Since the whole response is streamed, we have to create a temp file
+ containing an equivalent content.
+ """
+ if not isinstance(resp, response.RangeFile):
+ nb_bytes = len(resp.getvalue())
+ return nb_bytes, resp
+
+ temp = tempfile.TemporaryFile()
+ if resp._size == -1 or resp._boundary is None:
+ # The content is raw bytes
+ data = resp.read()
+ nb_bytes = len(data)
+ temp.write(data)
+ temp.seek(0)
+ new_resp = response.RangeFile(resp._path, temp)
+ new_resp.set_range(resp._start, resp._size)
+ else:
+ # Multiple parts response
+ temp = tempfile.TemporaryFile()
+ # RangeFile has already consumed the beginning of the first range
+ # (including te boundary line) for its own setup. We need to put an
+ # equivalent at the beginning of our copy
+ temp.write('--' + resp._boundary + '\r\n')
+ temp.write(''.join(resp._headers.headers) + '\r\n')
+ # Copy the rest of the body bypassing RangeFile.read() to get the
+ # raw content
+ data = resp._file.read()
+ while data:
+ nb_bytes = len(data)
+ temp.write(data)
+ data = resp._file.read()
+ temp.seek(0)
+ new_resp = response.RangeFile(resp._path, temp)
+ new_resp.set_boundary(resp._boundary)
+
+ return nb_bytes, new_resp
+
+ data = response.read()
+ nb_bytes = len(data)
+ temp_file.write(data)
+ again = True
+ while again:
+ try:
+ # We try to seek when at the end of the range (but staying at
+ # the same offset), that should position us to the beginning of
+ # the next range if it exist.
+ response.seek(0, 1)
+ data = response.read()
+ nb_bytes += len(data)
+ temp_file.write(data)
+ again = True
+ except (errors.InvalidRange, errors.InvalidHttpResponse):
+ again = False
+ temp_file.seek(0)
+ return nb_bytes
+
+ def _get_for_decorated(self, relpath, ranges, tail_amount=0):
+ """See HttpTransportBase._get().
+
+ This method is defined here but will be executed with self being the
+ decorated transport. That allows us to get called by methods of the
+ decorated transport.
+ """
+ self._start()
+ code, f = self._get_of_decorated(relpath, ranges, tail_amount)
+ latency = self._latency()
+
+ if code in (200, 206):
+ # We collect data on valid responses only (nb_bytes is still an
+ # approximation since it doesn't include the HTTP headers.)
+ nb_bytes, f = self._get_nb_bytes(f)
+ else:
+ nb_bytes = 0
+ self._collect('HttpTransportBase._get', relpath, nb_bytes, latency)
+ return code, f
+
+ def readv(self, relpath, offsets, *args, **kwargs):
+ """See Transport.readv().
+
+
+ HTTP transports really issue GET requests. We want to collect statitics
+ at both points:
+ - readv to collect bytes requested,
+ - _get to collect latency and bytes transferred.
+
+ So we need to be a bit more invasive by patching the decorated _get
+ method. (see _get method above).
+ """
+ self._collect('HttpTransportBase.readv', relpath)
+ for (pos, data) in self._decorated.readv(relpath, offsets,
+ *args, **kwargs):
+ self._collect('HttpTransportBase.readv/offset', relpath, pos,
+ len(data))
+ yield pos, data
+
+
+specific_collectors.update(http=HTTPStatsCollector,
+ # Since we match on the beginning of the string, the
+ # following is useless:
+ # https=HTTPStatsCollector,
+ )
+
+
class StatsServer(decorator.DecoratorServer):
"""Server for the ReadonlyTransportDecorator for testing with."""
=== modified file 'stats.py'
--- a/stats.py 2007-12-05 09:19:43 +0000
+++ b/stats.py 2007-12-07 11:09:23 +0000
@@ -142,6 +142,13 @@
'%(base)us%(relpath)us%(latency)L')
+register_stat('HttpTransportBase.readv', TransportStat,
+ '%(base)us%(relpath)us')
+register_stat('HttpTransportBase.readv/offset', TransportReadvOffset,
+ '%(base)us%(relpath)us%(offset)L%(bytes_requested)L')
+register_stat('HttpTransportBase._get', TransportStat,
+ '%(base)us%(relpath)us%(bytes_read)L%(latency)L')
+
class Stats(object):
_default_name = '.transport_stats_for_bzr'
More information about the bazaar-commits
mailing list