Rev 5656: Add CatchingExceptionThread.set_and_switch() to avoid race conditions. in file:///home/vila/src/bzr/experimental/thread-with-exception/

Vincent Ladeuil v.ladeuil+lp at free.fr
Thu Feb 10 11:49:48 UTC 2011


At file:///home/vila/src/bzr/experimental/thread-with-exception/

------------------------------------------------------------
revno: 5656
revision-id: v.ladeuil+lp at free.fr-20110210114948-hktri36s9tk0kdj0
parent: v.ladeuil+lp at free.fr-20110209091410-cubxgkmrfd0hu577
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: thread-with-exception
timestamp: Thu 2011-02-10 12:49:48 +0100
message:
  Add CatchingExceptionThread.set_and_switch() to avoid race conditions.
-------------- next part --------------
=== modified file 'bzrlib/tests/test_thread.py'
--- a/bzrlib/tests/test_thread.py	2011-02-08 16:26:23 +0000
+++ b/bzrlib/tests/test_thread.py	2011-02-10 11:49:48 +0000
@@ -43,7 +43,7 @@
         tt.start()
         self.assertRaises(MyException, tt.join)
 
-    def test_join_when_no_exception(self):
+    def test_join_around_exception(self):
         resume = threading.Event()
         class MyException(Exception):
             pass
@@ -61,4 +61,105 @@
         resume.set()
         self.assertRaises(MyException, tt.join)
 
-
+    def test_sync_event(self):
+        control = threading.Event()
+        in_thread = threading.Event()
+        class MyException(Exception):
+            pass
+
+        def raise_my_exception():
+            # Wait for the test to tell us to resume
+            control.wait()
+            # Now we can raise
+            raise MyException()
+
+        tt = thread.CatchingExceptionThread(target=raise_my_exception,
+                                            sync_event=in_thread)
+        tt.start()
+        tt.join(timeout=0)
+        self.assertIs(None, tt.exception)
+        self.assertIs(in_thread, tt.sync_event)
+        control.set()
+        self.assertRaises(MyException, tt.join)
+        self.assertEquals(True, tt.sync_event.isSet())
+
+    def test_set_and_switch(self):
+        """Caller can precisely control a thread."""
+        control1 = threading.Event()
+        control2 = threading.Event()
+        control3 = threading.Event()
+
+        class TestThread(thread.CatchingExceptionThread):
+
+            def __init__(self, *args, **kwargs):
+                super(TestThread, self).__init__(*args,
+                                                 target=self.step_by_step,
+                                                 **kwargs)
+                self.current_step = 'starting'
+                self.step1 = threading.Event()
+                self.set_sync_event(self.step1)
+                self.step2 = threading.Event()
+                self.final = threading.Event()
+
+            def step_by_step(self):
+                control1.wait()
+                self.current_step = 'step1'
+                self.set_and_switch(self.step2)
+                control2.wait()
+                self.current_step = 'step2'
+                self.set_and_switch(self.final)
+                control3.wait()
+                self.current_step = 'done'
+
+        tt = TestThread()
+        tt.start()
+        self.assertEquals('starting', tt.current_step)
+        control1.set()
+        tt.step1.wait()
+        self.assertEquals('step1', tt.current_step)
+        control2.set()
+        tt.step2.wait()
+        self.assertEquals('step2', tt.current_step)
+        control3.set()
+        # We don't wait on tt.final
+        tt.join()
+        self.assertEquals('done', tt.current_step)
+
+    def test_exception_while_set_and_switch(self):
+        control1 = threading.Event()
+
+        class MyException(Exception):
+            pass
+
+        class TestThread(thread.CatchingExceptionThread):
+
+            def __init__(self, *args, **kwargs):
+                self.step1 = threading.Event()
+                self.step2 = threading.Event()
+                super(TestThread, self).__init__(*args,
+                                                 target=self.step_by_step,
+                                                  sync_event=self.step1,
+                                                 **kwargs)
+                self.current_step = 'starting'
+                self.set_sync_event(self.step1)
+
+            def step_by_step(self):
+                control1.wait()
+                self.current_step = 'step1'
+                self.set_and_switch(self.step2)
+
+            def set_sync_event(self, event):
+                # We force an exception while trying to set step2
+                if event is self.step2:
+                    raise MyException()
+                super(TestThread, self).set_sync_event(event)
+
+        tt = TestThread()
+        tt.start()
+        self.assertEquals('starting', tt.current_step)
+        control1.set()
+        # We now wait on step1 which will be set when catching the exception
+        tt.step1.wait()
+        self.assertRaises(MyException, tt.pending_exception)
+        self.assertIs(tt.step1, tt.sync_event)
+        self.assertTrue(tt.step1.isSet())

=== modified file 'bzrlib/thread.py'
--- a/bzrlib/thread.py	2011-02-08 16:26:23 +0000
+++ b/bzrlib/thread.py	2011-02-10 11:49:48 +0000
@@ -39,6 +39,7 @@
         self.set_sync_event(sync_event)
         self.exception = None
         self.ignored_exceptions = None # see set_ignored_exceptions
+        self.lock = threading.Lock()
 
     # compatibility thunk for python-2.4 and python-2.5...
     if sys.version_info < (2, 6):
@@ -55,11 +56,49 @@
         Some threads require multiple events and should set the relevant one
         when appropriate.
 
-        Note that the event should be cleared so the caller can wait() on him
-        and be released when the thread set the event.
+        Note that the event should be initially cleared so the caller can
+        wait() on him and be released when the thread set the event.
+
+        Also note that the thread can use multiple events, setting them as it
+        progress, while the caller can chose to wait on any of them. What
+        matters is that there is always one event set so that the caller is
+        always released when an exception is caught. Re-using the same event is
+        therefore risky as the thread itself has no idea about which event the
+        caller is waiting on. If the caller has already been released then a
+        cleared event won't guarantee that the caller is still waiting on it.
         """
         self.sync_event = event
 
+    def set_and_switch(self, new):
+        """Set the current ``sync_event`` and switch to a new one.
+
+        Using this method protects against race conditions while setting a new
+        ``sync_event``.
+
+        Note that this allows a caller to wait either on the old or the new
+        event depending on whether it wants a fine control on what is happening
+        inside a thread.
+
+        :param new: The event that will become ``sync_event``
+        """
+        cur = self.sync_event
+        self.lock.acquire()
+        try: # Always release the lock
+            try:
+                self.set_sync_event(new)
+                # From now on, any exception will be synced with the new event
+            except:
+                # Unlucky, we couldn't set the new sync event, try restoring a
+                # safe state
+                self.set_sync_event(cur)
+                raise
+            # Setting the current ``sync_event`` will release callers waiting
+            # on it, note that it will also be set in run() if an exception is
+            # raised
+            cur.set()
+        finally:
+            self.lock.release()
+
     def set_ignored_exceptions(self, ignored):
         """Declare which exceptions will be ignored.
 



More information about the bazaar-commits mailing list