~nmu-sscheel/gtg/rework-task-editor

« back to all changes in this revision

Viewing changes to GTG/tests/signals_testing.py

  • Committer: Luca Invernizzi
  • Date: 2010-09-08 14:15:11 UTC
  • mfrom: (825.1.227 liblarch_rebased)
  • Revision ID: invernizzi.l@gmail.com-20100908141511-vsctgw74dj1xp0wi
Liblarch is now in trunk.
Note that performances are still bad and it misses DnD and multi-select
support, but its state is better that the current trunk.
Backends are added in this merge too.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import threading
 
2
import gobject
 
3
import time
 
4
import unittest
 
5
 
 
6
from GTG.tools.watchdog import Watchdog
 
7
 
 
8
 
 
9
 
 
10
class SignalCatcher(object):
 
11
    '''
 
12
    A class to test signals
 
13
    '''
 
14
 
 
15
 
 
16
    def __init__(self, unittest,  generator, signal_name,\
 
17
                 should_be_caught = True, how_many_signals = 1, \
 
18
                error_code = "No error code set"):
 
19
        self.signal_catched_event = threading.Event()
 
20
        self.generator = generator
 
21
        self.signal_name = signal_name
 
22
        self.signal_arguments = []
 
23
        self.unittest = unittest
 
24
        self.how_many_signals = how_many_signals
 
25
        self.should_be_caught = should_be_caught
 
26
        self.error_code = error_code
 
27
 
 
28
        def _on_failure():
 
29
            #we need to release the waiting thread
 
30
            self.signal_catched_event.set()
 
31
            self.missed = True
 
32
            #then we notify the error
 
33
            #if the error_code is set to None, we're expecting it to fail.
 
34
            if error_code != None:
 
35
                print "An expected signal wasn't received %s" % str(error_code)
 
36
            self.unittest.assertFalse(should_be_caught)
 
37
 
 
38
        self.watchdog = Watchdog(3, _on_failure)
 
39
 
 
40
    def __enter__(self):
 
41
 
 
42
        def __signal_callback(*args):
 
43
            self.signal_arguments.append(args[1:])
 
44
            if len(self.signal_arguments) >= self.how_many_signals:
 
45
                self.signal_catched_event.set()
 
46
 
 
47
        self.handler = \
 
48
                self.generator.connect(self.signal_name, __signal_callback)
 
49
        self.watchdog.__enter__()
 
50
        return [self.signal_catched_event, self.signal_arguments]
 
51
 
 
52
    def __exit__(self, err_type, value, traceback):
 
53
        self.generator.disconnect(self.handler)
 
54
        if not self.should_be_caught and not hasattr(self, 'missed'):
 
55
            self.assertFalse(True)
 
56
        return not isinstance(value, Exception) and \
 
57
                self.watchdog.__exit__(err_type, value, traceback)
 
58
    
 
59
 
 
60
 
 
61
class GobjectSignalsManager(object):
 
62
    
 
63
 
 
64
    def init_signals(self):
 
65
        '''
 
66
        Initializes the gobject main loop so that signals can be used.
 
67
        This function returns only when the gobject main loop is running
 
68
        '''
 
69
        def gobject_main_loop():
 
70
            gobject.threads_init()
 
71
            self.main_loop = gobject.MainLoop()
 
72
            self.main_loop.run()
 
73
        threading.Thread(target = gobject_main_loop).start()
 
74
        while not hasattr(self, 'main_loop') or \
 
75
              not self.main_loop.is_running():
 
76
            #since running the gobject main loop is a blocking call, we have to
 
77
            #check that it has been started in a polling fashion
 
78
            time.sleep(0.1)
 
79
 
 
80
    def terminate_signals(self):
 
81
        self.main_loop.quit()
 
82