Rev 7: Create dbus-broadcast command to serve the Broadcast interface. in file:///home/robertc/source/baz/plugins/dbus/trunk/

Robert Collins robertc at robertcollins.net
Wed Feb 7 00:37:40 GMT 2007


At file:///home/robertc/source/baz/plugins/dbus/trunk/

------------------------------------------------------------
revno: 7
revision-id: robertc at robertcollins.net-20070207003738-jk12g1bhj0kfxaa4
parent: robertc at robertcollins.net-20070206073700-2vi6qnfqfchehdzn
committer: Robert Collins <robertc at robertcollins.net>
branch nick: trunk
timestamp: Wed 2007-02-07 11:37:38 +1100
message:
  Create dbus-broadcast command to serve the Broadcast interface.
modified:
  TODO                           todo-20070206032729-2b5teqiwctqrfgei-3
  __init__.py                    __init__.py-20070206032729-2b5teqiwctqrfgei-4
  activity.py                    activity.py-20070206034725-q208d0jtkshwu0fx-1
  tests/test_activity.py         test_activity.py-20070206035206-bnhzvtm6m6hpgylz-1
=== modified file 'TODO'
--- a/TODO	2007-02-06 07:37:00 +0000
+++ b/TODO	2007-02-07 00:37:38 +0000
@@ -2,9 +2,10 @@
 ----------------------
 
  * simple subscriber that sits in a corner and shows revisions
- * dbus service to act as the broadcast point for revisions
  * .service details for the dbus service.
  * bzr server integration to republish tip revisions from file:/// to public
    URLs when one is available.
  * debug why raising a dbus returned error skips regular error handling and 
    jumps to sys.excepthook [wager: crack from the dbus python bindings]
+ * setup.py for activation files. Are there user installs of such things?
+ * gobject mainloop headaches? threads?

=== modified file '__init__.py'
--- a/__init__.py	2007-02-06 03:30:59 +0000
+++ b/__init__.py	2007-02-07 00:37:38 +0000
@@ -21,6 +21,9 @@
 Currently nothing works, please see TODO.
 """
 
+from bzrlib import commands
+
+from activity import Activity
 from hook import install_hooks
 
 
@@ -31,3 +34,22 @@
 
 install_hooks()
 
+
+class cmd_dbus_broadcast(commands.Command):
+    """A dbus service to reflect revisions to subscribers.
+    
+    This service runs the bzrlib.plugins.dbus.activity.Broadcast service on the
+    session dbus.
+
+    It can be contacted on org.bazaarvcs.plugins.dbus.Broadcast, as
+    /org/bazaarvcs/plugins/dbus/Broadcast with interface
+    org.bazaarvcs.plugins.dbus.Broadcast.
+
+    The method announce_revision(revision_id, url) will cause the signal
+    'Revision' to be raised with two parameters - revision_id and url.
+    """
+
+    def run(self):
+        Activity().serve_broadcast()
+
+commands.register_command(cmd_dbus_broadcast)

=== modified file 'activity.py'
--- a/activity.py	2007-02-06 07:37:00 +0000
+++ b/activity.py	2007-02-07 00:37:38 +0000
@@ -40,6 +40,9 @@
             supply this parameter.
         """
         if bus is None:
+            # lazy import to not pollute bzr during startup.
+            import dbus.mainloop.glib
+            dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
             self.bus = dbus.SessionBus()
         else:
             self.bus = bus
@@ -95,16 +98,22 @@
             reply_handler=handle_reply,
             error_handler=handle_error)
 
-    def serve_agency(self):
-        """Serve an advertising agency. 
-
-        This service contains a list of subscribers, but no state per 
-        subscriber, so it should have minimal footprint.
+    def serve_broadcast(self, when_ready=None):
+        """Run a 'Broadcast' server.
+
+        This is the core logic for 'bzr dbus-broadcast' which will be invoked
+        by dbus activation.
+
+        It starts up gobject mainloop and places a Broadcast object on that.
+        When the loop exits, it returns.
+
+        :param when_ready: An optional callback to be invoked after the server
+            is ready to handle requests.
         """
-        # lazy import to not pollute bzr during startup.
-        import dbus.mainloop.glib
-        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+        broadcaster = Broadcast(self.bus)
         mainloop = gobject.MainLoop()
+        if when_ready:
+            when_ready()
         mainloop.run()
 
 

=== modified file 'tests/test_activity.py'
--- a/tests/test_activity.py	2007-02-06 07:37:00 +0000
+++ b/tests/test_activity.py	2007-02-07 00:37:38 +0000
@@ -23,6 +23,7 @@
 import subprocess
 import tempfile
 import thread
+import time
 import weakref
 
 import dbus
@@ -31,6 +32,8 @@
 import dbus.mainloop.glib
 import gobject
 
+import bzrlib.plugins
+
 # done here to just do it once, and not in the plugin module to avoid doing it
 # by default.
 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
@@ -73,8 +76,43 @@
     return address.strip(), proc
 
 
-class PrivateBus(dbus.Bus):
-    """A PrivateBus created within this application."""
+class BusOnAddress(dbus.Bus):
+    """A Bus created on a specific address."""
+
+    def __new__(cls, address):
+        bus = _dbus_bindings.BusImplementation.__new__(cls, address)
+        bus._bus_type = address
+        # _bus_names is used by dbus.service.BusName!
+        bus._bus_names = weakref.WeakValueDictionary()
+
+        bus._signal_recipients_by_object_path = {}
+        """Map from object path to dict mapping dbus_interface to dict
+        mapping member to list of SignalMatch objects."""
+
+        bus._signal_sender_matches = {}
+        """Map from sender well-known name to list of match rules for all
+        signal handlers that match on sender well-known name."""
+
+        bus._signals_lock = thread.allocate_lock()
+        """Lock used to protect signal data structures if doing two
+        removals at the same time (everything else is atomic, thanks to
+        the GIL)"""
+
+        bus.add_message_filter(bus.__class__._signal_func)
+        bus.set_exit_on_disconnect(False)
+
+        return bus
+
+    def __str__(self):
+        return "%s (%s)" % (self.__class__.__name__, self._bus_type)
+
+    def __repr__(self):
+        return '<%s.%s on %s at %#x>' % (__name__, self.__class__,
+            self._bus_type, id(self))
+
+
+class TemporaryBus(BusOnAddress):
+    """A TemporaryBus created within this application."""
 
     def __new__(cls):
         address, proc = create_daemon()
@@ -110,12 +148,6 @@
         """Ensure the daemon is shutdown."""
         os.kill(self._test_process.pid, signal.SIGINT)
     
-    def __str__(self):
-        return "Private DBUS (%s)" % self._bus_type
-
-    def __repr__(self):
-        return '<%s.%s on %s at %#x>' % (__name__, self.__class__,
-            self._bus_type, id(self))
 
 
 class TestCaseWithDBus(TestCaseWithMemoryTransport):
@@ -124,7 +156,7 @@
         TestCaseWithMemoryTransport.setUp(self)
         # setup a private dbus session so we dont spam 
         # the users desktop!
-        self.bus = PrivateBus()
+        self.bus = TemporaryBus()
         self.addCleanup(self.bus.nuke)
 
 
@@ -204,9 +236,54 @@
 
     def test_server(self):
         """Calling Activity.serve() provides a convient means to serve."""
-        obj = activity.Activity(bus=self.bus)
-        calls = []
-        #obj.serve_agency()
+        # to test the server, we can't easily run it in process due to 
+        # apparent glib/dbus/dbus-python interactions: so we fire it off
+        # externally. As all the server does is serve requests until its
+        # killed or quits, this is ok, if not ideal.
+        current_plugins_path = ':'.join(bzrlib.plugins.__path__)
+        process = self.start_bzr_subprocess(['dbus-broadcast'],
+            skip_if_plan_to_signal=True,
+            env_changes={'DBUS_SESSION_BUS_ADDRESS':self.bus._bus_type,
+                         'BZR_PLUGIN_PATH':current_plugins_path},
+            allow_plugins=True)
+        # subscribe to the server : will fail if it did not startup correctly,
+        # so we spin for up to 5 seconds then abort
+        start = time.time()
+        started = False
+        while not started and time.time() - start < 5:
+            try:
+                dbus_object = self.bus.get_object(activity.Broadcast.DBUS_NAME,
+                    activity.Broadcast.DBUS_PATH)
+            except dbus.DBusException, e:
+                if e.message.startswith(
+                    'org.freedesktop.DBus.Error.ServiceUnknown:'):
+                    # service not available - relinquish cpu
+                    time.sleep(0.001)
+                else:
+                    # some other error
+                    raise
+            else:
+                started = True
+        self.assertTrue(started)
+        mainloop = gobject.MainLoop()
+        # catch revisions
+        revisions = []
+        def catch_revision(revision, url):
+            revisions.append((revision, url))
+            # quit the loop as soon as it idles.
+            gobject.idle_add(mainloop.quit)
+        dbus_object.connect_to_signal("Revision", catch_revision,
+            dbus_interface=activity.Broadcast.DBUS_INTERFACE)
+        # finally, announce something
+        activity.Activity(bus=self.bus).announce_revision('foo', 'bar')
+        mainloop.run()
+        # now, we need to block until the server has exited.
+        # and finally, we can checkout our results.
+        self.assertEqual([('foo', 'bar')], revisions)
+        # FUGLY: there seems to be a race condition where the finish
+        # call hung, and I've not the time to debug it right now.
+        time.sleep(0.05)
+        self.finish_bzr_subprocess(process, 3, send_signal=signal.SIGINT)
 
 
 class TestBroadcast(TestCaseWithDBus):



More information about the bazaar-commits mailing list