Rev 29: Implement an HTTP specific collection to get finer results. in http://code.launchpad.net/%7Ev-ladeuil/bzr/transportstats

Vincent Ladeuil v.ladeuil+lp at free.fr
Fri Dec 7 11:09:27 GMT 2007


At http://code.launchpad.net/%7Ev-ladeuil/bzr/transportstats

------------------------------------------------------------
revno: 29
revision-id:v.ladeuil+lp at free.fr-20071207110923-g1fzl339sb4665h8
parent: v.ladeuil+lp at free.fr-20071205091943-bmh501x3vngt5pxa
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: transportstats
timestamp: Fri 2007-12-07 12:09:23 +0100
message:
  Implement an HTTP specific collection to get finer results.
  
  * stats.py:
  New HTTP specific statistic formats.
  
  * decorator.py:
  (StatsCollector.__new__): Create implementation specific
  statistics collectors if needed.
  (DefaultStatsCollector): New class implementing the default
  statistic collection (formerly StatsCollecor). Some method renamed
  so that daughter classes can access them.
  (DefaultStatsCollector._max_latency): Now a class attribute.
  (HTTPStatsCollector): HTTP specific implementation, mostly for
  readv and _get.
  
  * commands.py:
  (cmd_ts_display.run): Add 'bytes requested' and 'cumulated
  latency' since those are interesting data.
modified:
  commands.py                    commands.py-20070927123419-zy82qayy9xzwxvzi-1
  decorator.py                   decorator.py-20070926152401-52kuiex1mu755ajk-1
  stats.py                       stats.py-20070928061304-7i3r2h4gg6rbi03e-1
-------------- next part --------------
=== modified file 'commands.py'
--- a/commands.py	2007-11-26 09:53:37 +0000
+++ b/commands.py	2007-12-07 11:09:23 +0000
@@ -60,6 +60,7 @@
         mystats.start_exploiting()
 
         requests = 0
+        bytes_requested = 0
         bytes_read = 0
         bytes_written = 0
         latency_count = 0
@@ -68,6 +69,7 @@
             if verbose:
                 print stat._print_fmt % stat.__dict__
             bytes_read += getattr(stat, 'bytes_read', 0)
+            bytes_requested += getattr(stat, 'bytes_requested', 0)
             bytes_written += getattr(stat, 'bytes_written', 0)
             latency = getattr(stat, 'latency', None)
             if latency is not None:
@@ -79,8 +81,10 @@
             avg_latency = total_latency / latency_count
 
         print '# requests: %d' % requests
+        print 'Bytes requested: %d' % bytes_requested
         print 'Bytes read: %d' % bytes_read
         print 'Bytes written: %d' % bytes_written
+        print 'Cumulated latency: %dms' % total_latency
         # FIXME: It may be more appropriate to divide by 2 as ping does.
         print 'Average latency: %dms' % avg_latency
         mystats.stop_exploiting()

=== modified file 'decorator.py'
--- a/decorator.py	2007-12-05 09:19:43 +0000
+++ b/decorator.py	2007-12-07 11:09:23 +0000
@@ -30,16 +30,48 @@
 
 from bzrlib.lazy_import import lazy_import
 lazy_import(globals(), """
+from bzrlib import errors
 from bzrlib.plugins.transportstats import stats
 from bzrlib.transport import decorator
+from bzrlib.transport.http import response
 """)
 
-MAX_LATENCY = 2**32 - 1
+
+# Associates the scheme with a specific statistic collector class
+specific_collectors = dict()
+
 
 class StatsCollector(decorator.TransportDecorator):
     """A decorator that collect statistics.
 
-    This is requested via the 'stats+' prefix to get_transport().
+    This is requested via the 'stats+' prefix to get_transport().  This class
+    is a factory selecting the right implementation depending on the base used.
+    """
+
+    def __new__(cls, base, *args, **kwargs):
+        coll = DefaultStatsCollector(base, *args, **kwargs)
+        decorated = coll._decorated
+        # A bit hackish but we don't want to import specific transports and
+        # there is no way to find the decorated class from the base without
+        # creating another transport instance.
+        for key in specific_collectors.keys():
+            if decorated.base.startswith(key):
+                return specific_collectors[key](base, _decorated=decorated)
+
+        # No specific implementation found
+        return coll
+
+    @classmethod
+    def _get_url_prefix(self):
+        """Stats transport decorators are invoked via 'stats+'"""
+        return 'stats+'
+
+
+class DefaultStatsCollector(decorator.TransportDecorator):
+    """The default implementation for StatsCollector.
+
+    This class should be suitable for any transport for which a specific
+    implementation is not needed.
 
     Latency is represented with an unsigned long number of miliseconds.The
     latency measured here represents an estimate of the time between request
@@ -48,25 +80,31 @@
     data.
     """
 
-    def __init__(self, *args):
+    _max_latency = 2**32 - 1
+
+    def __init__(self, base, _decorated=None, _from_transport=None):
         "See TransportDecorator.__init__"""
-        super(StatsCollector, self).__init__(*args)
-        self._stats = stats.get_stats()
-        self._stats.start_collecting()
-        self.__start_time = 0
+        super(DefaultStatsCollector, self).__init__(base, _decorated,
+                                                    _from_transport)
+        if _from_transport is not None:
+            self._stats = _from_transport._stats
+        else:
+            self._stats = stats.get_stats()
+            self._stats.start_collecting()
+        self._start_time = None
 
-    def __collect(self, name, *args):
+    def _collect(self, name, *args):
         self._stats.write_stat(name, self._decorated.base, *args)
 
-    def __start(self):
-        self.__start_time = time.time()
+    def _start(self):
+        self._start_time = time.time()
 
-    def __latency(self):
+    def _latency(self):
         # Convert to int and cap
-        latency = int((time.time() - self.__start_time) * 1000)
-        return max(0, min(MAX_LATENCY, latency))
+        latency = int((time.time() - self._start_time) * 1000)
+        return max(0, min(self._max_latency, latency))
 
-    def __pump_to_tmp(self, f):
+    def _pump_to_tmp(self, f):
         """Copy the file-like object content to a temp file.
 
         :returns: temp_file, nb_bytes. temp_file is the temporary file
@@ -86,8 +124,8 @@
     def append_file(self, relpath, f, mode=None):
         """See Transport.append_file()."""
         saved_pos = f.tell()
-        temp_file, nb_bytes = self.__pump_to_tmp(f)
-        self.__start()
+        temp_file, nb_bytes = self._pump_to_tmp(f)
+        self._start()
         try:
             ret = self._decorated.append_file(relpath, temp_file, mode=mode)
         except:
@@ -95,68 +133,68 @@
             temp_file.close()
             raise
         else:
-            self.__collect('Transport.append_file', relpath, nb_bytes,
-                           self.__latency())
+            self._collect('Transport.append_file', relpath, nb_bytes,
+                           self._latency())
             temp_file.close()
         return ret
 
     def append_bytes(self, relpath, bytes, mode=None):
         """See Transport.append_bytes()."""
-        self.__start()
+        self._start()
         ret = self._decorated.append_bytes(relpath, bytes, mode=mode)
-        self.__collect('Transport.append_bytes', relpath, len(bytes),
-                       self.__latency())
+        self._collect('Transport.append_bytes', relpath, len(bytes),
+                       self._latency())
         return ret
 
     def delete(self, relpath):
         """See Transport.delete()."""
-        self.__start()
+        self._start()
         ret = self._decorated.delete(relpath)
-        self.__collect('Transport.delete', relpath, self.__latency())
+        self._collect('Transport.delete', relpath, self._latency())
         return ret
 
     def delete_tree(self, relpath):
         """See Transport.delete_tree()."""
-        self.__start()
+        self._start()
         ret = self._decorated.delete_tree(relpath)
-        self.__collect('Transport.delete_tree', relpath, self.__latency())
+        self._collect('Transport.delete_tree', relpath, self._latency())
         return ret
 
     def get(self, relpath):
         """See Transport.get()."""
-        self.__start()
+        self._start()
         f = self._decorated.get(relpath)
-        latency = self.__latency()
+        latency = self._latency()
 
         temp_file = tempfile.TemporaryFile()
         nb_bytes = self._pump(f, temp_file)
         temp_file.seek(0)
         f.close()
 
-        self.__collect('Transport.get', relpath, nb_bytes, latency)
+        self._collect('Transport.get', relpath, nb_bytes, latency)
         return temp_file
 
     def get_bytes(self, relpath):
         """See Transport.get_bytes()."""
-        self.__start()
+        self._start()
         bytes = self._decorated.get_bytes(relpath)
 
-        self.__collect('Transport.get_bytes', relpath, len(bytes),
-                       self.__latency())
+        self._collect('Transport.get_bytes', relpath, len(bytes),
+                       self._latency())
         return bytes
 
     def has(self, relpath):
         """See Transport.has()."""
-        self.__start()
+        self._start()
         ret = self._decorated.has(relpath)
-        self.__collect('Transport.has', relpath, self.__latency())
+        self._collect('Transport.has', relpath, self._latency())
         return ret
 
     def mkdir(self, relpath, mode=None):
         """See Transport.mkdir()."""
-        self.__start()
+        self._start()
         ret = self._decorated.mkdir(relpath, mode)
-        self.__collect('Transport.mkdir', relpath, self.__latency())
+        self._collect('Transport.mkdir', relpath, self._latency())
         return ret
 
     #FIXME:open_write_stream may need several implementations
@@ -164,7 +202,7 @@
     def put_file(self, relpath, f, mode=None):
         """See Transport.put_file()."""
         saved_pos = f.tell()
-        temp_file, nb_bytes = self.__pump_to_tmp(f)
+        temp_file, nb_bytes = self._pump_to_tmp(f)
         self.__start()
         try:
             ret = self._decorated.put_file(relpath, temp_file, mode)
@@ -173,22 +211,22 @@
             temp_file.close()
             raise
         else:
-            self.__collect('Transport.put_file', relpath, nb_bytes,
-                           self.__latency())
+            self._collect('Transport.put_file', relpath, nb_bytes,
+                           self._latency())
             temp_file.close()
         return ret
 
     def put_bytes(self, relpath, bytes, mode=None):
         """See Transport.put_bytes()."""
-        self.__start()
+        self._start()
         ret = self._decorated.put_bytes(relpath, bytes, mode)
-        self.__collect('Transport.put_bytes', relpath, len(bytes),
-                       self.__latency())
+        self._collect('Transport.put_bytes', relpath, len(bytes),
+                       self._latency())
         return ret
 
     def readv(self, relpath, offsets, *args, **kwargs):
         """See Transport.readv()."""
-        self.__start()
+        self._start()
         first = True
         # FIXME: There are two problems here, we don't really collect neither
         # the transactions with a remote server nor the real bytes
@@ -200,46 +238,188 @@
         for (pos, data) in self._decorated.readv(relpath, offsets,
                                                  *args, **kwargs):
             if first:
-                self.__collect('Transport.readv', relpath, self.__latency())
+                self._collect('Transport.readv', relpath, self._latency())
                 first = False
-            self.__collect('Transport.readv/offset', relpath, pos, len(data))
+            self._collect('Transport.readv/offset', relpath, pos, len(data))
             yield pos, data
 
     def rename(self, rel_from, rel_to):
-        self.__start()
+        self._start()
         ret = self._decorated.rename(rel_from, rel_to)
-        self.__collect('Transport.rename', rel_from, rel_to, self.__latency())
+        self._collect('Transport.rename', rel_from, rel_to, self._latency())
         return ret
 
     def rmdir(self, relpath):
         """See Transport.rmdir."""
-        self.__start()
+        self._start()
         ret = self._decorated.rmdir(relpath)
-        self.__collect('Transport.rmdir', relpath, self.__latency())
+        self._collect('Transport.rmdir', relpath, self._latency())
         return ret
 
     def stat(self, relpath):
         """See Transport.stat()."""
-        self.__start()
+        self._start()
         ret = self._decorated.stat(relpath)
-        self.__collect('Transport.stat', relpath, self.__latency())
+        self._collect('Transport.stat', relpath, self._latency())
         return ret
 
     def lock_read(self, relpath):
         """See Transport.lock_read."""
-        self.__start()
+        self._start()
         ret = self._decorated.lock_read(relpath)
-        self.__collect('Transport.lock_read', relpath, self.__latency())
+        self._collect('Transport.lock_read', relpath, self._latency())
         return ret
 
     def lock_write(self, relpath):
         """See Transport.lock_write."""
-        self.__start()
+        self._start()
         ret = self._decorated.lock_write(relpath)
-        self.__collect('Transport.lock_write', relpath, self.__latency())
+        self._collect('Transport.lock_write', relpath, self._latency())
         return ret
 
 
+# The http response handling have evolved and measuring the bytes downloaded
+# needs different implementations.
+try:
+    from bzrlib.transport.http.response import HttpRangeResponse
+    http_response_streamed = False
+except ImportError:
+    http_response_streamed = True
+
+
+class HTTPStatsCollector(DefaultStatsCollector):
+
+    def __init__(self, base, _decorated=None, _from_transport=None):
+        "See TransportDecorator.__init__"""
+        super(HTTPStatsCollector, self).__init__(base, _decorated,
+                                                 _from_transport)
+        # Install our hooks. Note that the methods are already bound so they
+        # get the right 'self' when executing.
+        self._get_of_decorated = self._decorated._get
+        self._decorated._get = self._get_for_decorated
+        if http_response_streamed:
+            self._get_nb_bytes = self._get_nb_bytes_streamed
+        else:
+            self._get_nb_bytes = self._get_nb_bytes_not_streamed
+
+    def _get_nb_bytes_not_streamed(self, resp):
+        """Return the number of bytes in the response body.
+
+        Since the whole response is buffered we just have to peak under the
+        covers of the two possible implementations.
+        """
+        if isinstance(resp, response.RangeFile):
+            nb_bytes = len(resp._data)
+        else:
+            nb_bytes = len(resp.getvalue())
+        return nb_bytes, resp
+
+    def _get_nb_bytes_streamed(self, resp):
+        """Return the number of bytes in the response body.
+
+        Since the whole response is streamed, we have to create a temp file
+        containing an equivalent content.
+        """
+        if not isinstance(resp, response.RangeFile):
+            nb_bytes = len(resp.getvalue())
+            return nb_bytes, resp
+
+        temp = tempfile.TemporaryFile()
+        if resp._size == -1 or resp._boundary is None:
+            # The content is raw bytes
+            data = resp.read()
+            nb_bytes = len(data)
+            temp.write(data)
+            temp.seek(0)
+            new_resp = response.RangeFile(resp._path, temp)
+            new_resp.set_range(resp._start, resp._size)
+        else:
+            # Multiple parts response
+            temp = tempfile.TemporaryFile()
+            # RangeFile has already consumed the beginning of the first range
+            # (including te boundary line) for its own setup. We need to put an
+            # equivalent at the beginning of our copy
+            temp.write('--' + resp._boundary + '\r\n')
+            temp.write(''.join(resp._headers.headers) + '\r\n')
+            # Copy the rest of the body bypassing RangeFile.read() to get the
+            # raw content
+            data = resp._file.read()
+            while data:
+                nb_bytes = len(data)
+                temp.write(data)
+                data = resp._file.read()
+            temp.seek(0)
+            new_resp = response.RangeFile(resp._path, temp)
+            new_resp.set_boundary(resp._boundary)
+
+        return nb_bytes, new_resp
+
+        data = response.read()
+        nb_bytes = len(data)
+        temp_file.write(data)
+        again = True
+        while again:
+            try:
+                # We try to seek when at the end of the range (but staying at
+                # the same offset), that should position us to the beginning of
+                # the next range if it exist.
+                response.seek(0, 1)
+                data = response.read()
+                nb_bytes += len(data)
+                temp_file.write(data)
+                again = True
+            except (errors.InvalidRange, errors.InvalidHttpResponse):
+                again = False
+        temp_file.seek(0)
+        return nb_bytes
+
+    def _get_for_decorated(self, relpath, ranges, tail_amount=0):
+        """See HttpTransportBase._get().
+
+        This method is defined here but will be executed with self being the
+        decorated transport. That allows us to get called by methods of the
+        decorated transport.
+        """
+        self._start()
+        code, f = self._get_of_decorated(relpath, ranges, tail_amount)
+        latency = self._latency()
+
+        if code in (200, 206):
+            # We collect data on valid responses only (nb_bytes is still an
+            # approximation since it doesn't include the HTTP headers.)
+            nb_bytes, f = self._get_nb_bytes(f)
+        else:
+            nb_bytes = 0
+        self._collect('HttpTransportBase._get', relpath, nb_bytes, latency)
+        return code, f
+
+    def readv(self, relpath, offsets, *args, **kwargs):
+        """See Transport.readv().
+
+
+        HTTP transports really issue GET requests. We want to collect statitics
+        at both points:
+        - readv to collect bytes requested,
+        - _get to collect latency and bytes transferred.
+
+        So we need to be a bit more invasive by patching the decorated _get
+        method. (see _get method above).
+        """
+        self._collect('HttpTransportBase.readv', relpath)
+        for (pos, data) in self._decorated.readv(relpath, offsets,
+                                                 *args, **kwargs):
+            self._collect('HttpTransportBase.readv/offset', relpath, pos,
+                          len(data))
+            yield pos, data
+
+
+specific_collectors.update(http=HTTPStatsCollector,
+                           # Since we match on the beginning of the string, the
+                           # following is useless:
+                           # https=HTTPStatsCollector,
+                           )
+
+
 class StatsServer(decorator.DecoratorServer):
     """Server for the ReadonlyTransportDecorator for testing with."""
 

=== modified file 'stats.py'
--- a/stats.py	2007-12-05 09:19:43 +0000
+++ b/stats.py	2007-12-07 11:09:23 +0000
@@ -142,6 +142,13 @@
               '%(base)us%(relpath)us%(latency)L')
 
 
+register_stat('HttpTransportBase.readv', TransportStat,
+              '%(base)us%(relpath)us')
+register_stat('HttpTransportBase.readv/offset', TransportReadvOffset,
+              '%(base)us%(relpath)us%(offset)L%(bytes_requested)L')
+register_stat('HttpTransportBase._get', TransportStat,
+              '%(base)us%(relpath)us%(bytes_read)L%(latency)L')
+
 class Stats(object):
 
     _default_name = '.transport_stats_for_bzr'



More information about the bazaar-commits mailing list