~vorlon/ubuntu/saucy/gourmet/trunk

« back to all changes in this revision

Viewing changes to src/lib/threadManager.py

  • Committer: Bazaar Package Importer
  • Author(s): Rolf Leggewie
  • Date: 2008-07-26 13:29:41 UTC
  • Revision ID: james.westby@ubuntu.com-20080726132941-6ldd73qmacrzz0bn
Tags: upstream-0.14.0
ImportĀ upstreamĀ versionĀ 0.14.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# This module is designed to handle all multi-threading processes in
 
2
# Gourmet. Separate threads are limited to doing the following things
 
3
# with respect to the GUI:
 
4
#
 
5
#   1. Start a notification dialog with a progress bar
 
6
#   2. Update the progress bar
 
7
#   3. Finish successfully
 
8
#   4. Stop with an error.
 
9
#
 
10
# If you need to get user input in the middle of your threaded process,
 
11
# you need to redesign so that it works as follows:
 
12
#
 
13
# 1. Run the first half of your process as a thread.
 
14
# 2. Upon completion of your thread, run your dialog to get your user
 
15
#    input
 
16
# 3. Run the second half of your process as a thread.
 
17
#
 
18
# In this module, we define the following base classes...
 
19
#
 
20
# A singleton ThreadingManager that tracks how many threads we have
 
21
# running, and allows a maximum number of threads to be run at any
 
22
# single time.
 
23
#
 
24
# A SuspendableThread base class for creating and running threaded
 
25
# processes.
 
26
#
 
27
#
 
28
from gettext import gettext as _
 
29
import threading, gtk, gobject, time
 
30
gobject.threads_init()
 
31
 
 
32
# _IdleObject etc. based on example John Stowers
 
33
# <john.stowers@gmail.com>
 
34
 
 
35
class _IdleObject(gobject.GObject):
 
36
    """
 
37
    Override gobject.GObject to always emit signals in the main thread
 
38
    by emmitting on an idle handler
 
39
    """
 
40
    def __init__(self):
 
41
        gobject.GObject.__init__(self)
 
42
 
 
43
    def emit(self, *args):
 
44
        if args[0]!='progress': print 'emit',args
 
45
        gobject.idle_add(gobject.GObject.emit,self,*args)
 
46
 
 
47
class Terminated (Exception):
 
48
    def __init__ (self, value):
 
49
        self.value=value
 
50
    def __str__(self):
 
51
        return repr(self.value)
 
52
 
 
53
class SuspendableThread (threading.Thread, _IdleObject):
 
54
 
 
55
 
 
56
    """A class for long-running processes that shouldn't interrupt the
 
57
    GUI.
 
58
 
 
59
    runnerClass will handle the actual process. runnerClass cannot
 
60
    touch the GUI. To interact with the GUI, emit a signal.
 
61
    """
 
62
 
 
63
    __gsignals__ = {
 
64
        'completed' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, []),
 
65
        'progress' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE,
 
66
                     [gobject.TYPE_FLOAT, gobject.TYPE_STRING]), #percent complete, progress bar text
 
67
        'error' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, [gobject.TYPE_INT, # error number
 
68
                                                                gobject.TYPE_STRING, # error name
 
69
                                                                gobject.TYPE_STRING # stack trace
 
70
                                                                ]),
 
71
        'stopped': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, []), # emitted when we are stopped
 
72
        'pause': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, []), # emitted when we pause
 
73
        'resume': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, []), # emitted when we resume
 
74
        'done': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, []), # emitted when/however we finish
 
75
        }
 
76
 
 
77
    def __init__(self, name=None):
 
78
        self.initialized = False
 
79
        self.name = name
 
80
        self.suspended = False
 
81
        self.terminated = False
 
82
        _IdleObject.__init__(self)
 
83
        threading.Thread.__init__(self, name=self.name)
 
84
 
 
85
    def initialize_thread (self):
 
86
        self.initialized = True
 
87
        self.start()
 
88
 
 
89
    def connect_subthread (self, subthread):
 
90
        '''For subthread subthread, connect to error and pause signals and 
 
91
        and emit as if they were our own.'''
 
92
        subthread.connect('error',lambda st,enum,ename,strace: self.emit('error',enum,ename,strace))
 
93
        subthread.connect('stopped',lambda st: self.emit('stopped'))
 
94
        subthread.connect('pause',lambda st: self.emit('pause'))
 
95
        subthread.connect('resume',lambda st: self.emit('resume'))
 
96
 
 
97
    def run (self):
 
98
        try:
 
99
            print self,'run!'
 
100
            self.do_run()
 
101
            print self,'Done!'
 
102
        except Terminated:
 
103
            print 'stopped!'
 
104
            self.emit('stopped')
 
105
        except:
 
106
            print 'Error!'
 
107
            import traceback
 
108
            self.emit('error',1,
 
109
                      'Error during %s'%self.name,
 
110
                      traceback.format_exc())
 
111
        else:
 
112
            self.emit('completed')
 
113
        self.emit('done')
 
114
 
 
115
    def do_run (self):
 
116
        # Note that sub-classes need to call check_for_sleep
 
117
        # periodically, otherwise pausing & cancelling won't work
 
118
        raise NotImplementedError
 
119
    
 
120
    def suspend (self):
 
121
        self.suspended = True
 
122
 
 
123
    def resume (self):
 
124
        self.suspended = False
 
125
 
 
126
    def terminate (self):
 
127
        self.terminated = True
 
128
        self.emit('stopped')
 
129
 
 
130
    def check_for_sleep (self):
 
131
        """Check whether we have been suspended or terminated.
 
132
        """
 
133
        paused_emitted = False
 
134
        emit_resume = False
 
135
        if self.terminated:
 
136
            raise Terminated('%s terminated'%self.name)
 
137
        if self.suspended:
 
138
            print 'suspended!'
 
139
            self.emit('pause')
 
140
            emit_resume = True
 
141
        while self.suspended:
 
142
            if self.terminated:
 
143
                raise Terminated('%s terminated'%self.name)
 
144
            time.sleep(1)
 
145
        if emit_resume:
 
146
            self.emit('resume')
 
147
 
 
148
    def __repr__ (self):
 
149
        try:
 
150
            return threading.Thread.__repr__(self)
 
151
        except AssertionError:
 
152
            return '<SuspendableThread %s - uninitialized>'%self.name
 
153
        
 
154
 
 
155
class ThreadManager:
 
156
 
 
157
    __single = None
 
158
 
 
159
    def __init__ (self, max_concurrent_threads = 2):
 
160
        if ThreadManager.__single:
 
161
            raise ThreadManager.__single
 
162
        self.max_concurrent_threads = max_concurrent_threads
 
163
        self.thread_queue = []
 
164
        self.count = 0
 
165
        self.active_count = 0
 
166
        self.threads = []
 
167
 
 
168
    def add_thread (self, thread):
 
169
        try:
 
170
            assert(isinstance(thread,SuspendableThread))
 
171
        except AssertionError:
 
172
            print 'Class',thread,type(thread),'is not a SuspendableThread'
 
173
            raise
 
174
        self.threads.append(thread)
 
175
        thread.connect('pause',self.register_thread_paused)
 
176
        thread.connect('resume',self.register_thread_resume)
 
177
        thread.connect('done',self.register_thread_done)
 
178
        if self.active_count < self.max_concurrent_threads:
 
179
            self.active_count += 1
 
180
            thread.initialize_thread()
 
181
        else:
 
182
            self.thread_queue.append(thread)
 
183
        
 
184
    def register_thread_done (self, thread):
 
185
        print thread,'done'
 
186
        if thread in self.threads:
 
187
            self.threads.remove(thread)
 
188
            self.active_count -= 1
 
189
            self.start_queued_threads()        
 
190
 
 
191
    def register_thread_paused (self, thread):
 
192
        print thread,'paused'        
 
193
        self.active_count -= 1
 
194
        self.start_queued_threads()
 
195
 
 
196
    def register_thread_resume (self, thread):
 
197
        print thread,'resume'                
 
198
        self.active_count += 1
 
199
 
 
200
    def resume_thread (self, thread):
 
201
        print 'resuming thread...'
 
202
        if self.active_count < self.max_concurrent_threads:
 
203
            print 'resume right away!'
 
204
            thread.resume()
 
205
            self.active_count += 1
 
206
        else:
 
207
            print 'add to queue'
 
208
            self.thread_queue.append(thread)
 
209
    
 
210
    def start_queued_threads (self):
 
211
        print 'Check queue'
 
212
        while self.active_count < self.max_concurrent_threads and self.thread_queue:
 
213
            thread_to_add = self.thread_queue.pop()
 
214
            self.active_count += 1
 
215
            if thread_to_add.initialized:
 
216
                thread_to_add.resume()
 
217
            else:
 
218
                thread_to_add.initialize_thread()
 
219
 
 
220
def get_thread_manager ():
 
221
    try:
 
222
        return ThreadManager()
 
223
    except ThreadManager, tm:
 
224
        return tm
 
225
 
 
226
class ThreadManagerGui:
 
227
 
 
228
    __single__ = None
 
229
    paused_text = ' (' + _('Paused') + ')'
 
230
 
 
231
    def __init__ (self, parent=None):
 
232
        if ThreadManagerGui.__single__:
 
233
            raise ThreadManagerGui.__single__
 
234
        else:
 
235
            ThreadManagerGui.__single__ = self
 
236
        self.tm = get_thread_manager()
 
237
        self.threads = {}
 
238
        self.dialog = gtk.Dialog(parent=parent,
 
239
                                 buttons=(gtk.STOCK_CLOSE,gtk.RESPONSE_CLOSE))
 
240
        self.dialog.connect('response',self.close)
 
241
        self.dialog.connect('delete-event',self.delete_event_cb)
 
242
        self.sw = gtk.ScrolledWindow()
 
243
        self.pbtable = gtk.Table()
 
244
        self.last_row = 0
 
245
        self.sw.add_with_viewport(self.pbtable)
 
246
        self.sw.set_policy(gtk.POLICY_NEVER,gtk.POLICY_AUTOMATIC)
 
247
        self.sw.show_all()
 
248
        self.dialog.vbox.add(self.sw)
 
249
        self.to_remove = [] # a list of widgets to remove when we close...
 
250
 
 
251
    def response (self, dialog, response):
 
252
        if response==gtk.RESPONSE_CLOSE:
 
253
            self.close()
 
254
        
 
255
    def register_thread_with_dialog (self, description, thread):
 
256
        pb = gtk.ProgressBar()
 
257
        pause_button = gtk.ToggleButton();
 
258
        lab = gtk.Label(_('Pause'))
 
259
        pause_button.add(lab); pause_button.show_all()
 
260
        dlab = gtk.Label(description)
 
261
        cancel_button = gtk.Button(stock=gtk.STOCK_CANCEL)
 
262
        self.pbtable.attach(dlab,0,3,self.last_row,self.last_row+1)
 
263
        self.pbtable.attach(pb,0,1,self.last_row+1,self.last_row+2)
 
264
        self.pbtable.attach(cancel_button,1,2,self.last_row+1,self.last_row+2)
 
265
        self.pbtable.attach(pause_button,2,3,self.last_row+1,self.last_row+2)
 
266
        # Create an object for easy reference to our widgets in callbacks
 
267
        class ThreadBox: pass
 
268
        threadbox = ThreadBox()
 
269
        threadbox.pb = pb
 
270
        threadbox.buttons = [pause_button,cancel_button]
 
271
        threadbox.label = dlab
 
272
        threadbox.pb.show(); threadbox.label.show()
 
273
        threadbox.widgets = [threadbox.pb, threadbox.label] + threadbox.buttons
 
274
        threadbox.row = self.last_row
 
275
        for b in threadbox.buttons: b.show()
 
276
        thread.connect('completed',self.thread_done,threadbox)
 
277
        thread.connect('error',self.thread_error,threadbox)
 
278
        thread.connect('stopped',self.thread_stopped,threadbox)        
 
279
        thread.connect('pause',self.thread_pause,threadbox)
 
280
        thread.connect('resume',self.thread_resume,threadbox)
 
281
        thread.connect('progress',self.progress_update,threadbox.pb)        
 
282
        pause_button.connect('clicked',self.pause_cb,thread)
 
283
        cancel_button.connect('clicked',self.cancel_cb,thread)
 
284
        self.last_row += 2
 
285
 
 
286
    def pause_cb (self, b, thread):
 
287
        if b.get_active():
 
288
            thread.suspend()
 
289
        else:
 
290
            self.tm.resume_thread(thread)
 
291
 
 
292
    def cancel_cb (self, b, thread):
 
293
        thread.terminate()
 
294
 
 
295
    def thread_done (self, thread, threadbox):
 
296
        print 'thread_done cb'
 
297
        for b in threadbox.buttons: b.hide()
 
298
        self.to_remove.append(threadbox)
 
299
        txt = threadbox.pb.get_text()
 
300
        threadbox.pb.set_text(txt + ' ('+_('Done')+')')
 
301
 
 
302
    def progress_update (self, thread, perc, txt, pb):
 
303
        if perc >= 0.0:
 
304
            pb.set_percentage(perc)
 
305
        else:
 
306
            pb.pulse()
 
307
        pb.set_text(txt)
 
308
 
 
309
    def thread_error (self, thread, errno, errname, trace, threadbox):
 
310
        print 'thread_error cb'
 
311
        for b in threadbox.buttons: b.hide()
 
312
        threadbox.pb.set_text(_('Error: %s')%errname)
 
313
        b = gtk.Button(_('Details'))
 
314
        b.connect('clicked',self.show_traceback,errno,errname,trace)
 
315
        self.pbtable.attach(b,2,3,threadbox.row+1,threadbox.row+2)
 
316
        threadbox.widgets.append(b)
 
317
        b.show()
 
318
        self.to_remove.append(threadbox)
 
319
 
 
320
    def thread_stopped (self, thread, threadbox):
 
321
        txt = threadbox.pb.get_text()
 
322
        txt += (' ('+_('cancelled') + ')')
 
323
        threadbox.pb.set_text(txt)
 
324
 
 
325
    def thread_pause (self, thread, threadbox):
 
326
        txt = threadbox.pb.get_text()
 
327
        txt += self.paused_text
 
328
        threadbox.pb.set_text(txt)
 
329
 
 
330
    def thread_resume (self, thread, threadbox):
 
331
        txt = threadbox.pb.get_text()
 
332
        if txt.find(self.paused_text):
 
333
            txt = txt[:-len(self.paused_text)]
 
334
            threadbox.pb.set_text(txt)
 
335
        
 
336
    def show (self, *args):
 
337
        self.dialog.present()
 
338
 
 
339
    def delete_event_cb (self, *args):
 
340
        self.dialog.hide()
 
341
        return True
 
342
 
 
343
    def close (self, *args):
 
344
        while self.to_remove:
 
345
            box_to_remove = self.to_remove.pop()
 
346
            for w in box_to_remove.widgets:
 
347
                w.hide()
 
348
                self.pbtable.remove(w)
 
349
        self.dialog.hide()
 
350
 
 
351
    def show_traceback (self, button, errno, errname, traceback):
 
352
        import gourmet.gtk_extras.dialog_extras as de
 
353
        de.show_message(label=_('Error'),
 
354
                        sublabel=_('Error %s: %s')%(errno,errname),
 
355
                        expander=(_('Traceback'),traceback),
 
356
                        )
 
357
 
 
358
def get_thread_manager_gui ():
 
359
    try:
 
360
        return ThreadManagerGui()
 
361
    except ThreadManagerGui, tmg:
 
362
        print 'Returning single'
 
363
        return tmg
 
364
 
 
365
if __name__ == '__main__':
 
366
    import gtk
 
367
    class TestThread (SuspendableThread):
 
368
 
 
369
        def do_run (self):
 
370
            for n in range(1000):
 
371
                time.sleep(0.01)
 
372
                self.emit('progress',n/1000.0,'%s of 1000'%n)
 
373
                self.check_for_sleep()
 
374
 
 
375
    class TestError (SuspendableThread):
 
376
 
 
377
        def do_run (self):
 
378
            for n in range(1000):
 
379
                time.sleep(0.01)
 
380
                if n==100: raise AttributeError("This is a phony error")
 
381
                self.emit('progress',n/1000.0,'%s of 1000'%n)
 
382
                self.check_for_sleep()
 
383
                
 
384
 
 
385
    class TestInterminable (SuspendableThread):
 
386
 
 
387
        def do_run (self):
 
388
            while 1:
 
389
                time.sleep(0.1)
 
390
                self.emit('progress',-1,'Working interminably')
 
391
                self.check_for_sleep()
 
392
                
 
393
    tm = get_thread_manager()
 
394
    tmg = get_thread_manager_gui()
 
395
    for desc,thread in [
 
396
        ('Interminable 1',TestInterminable()),
 
397
        ('Linear 1',TestThread()),
 
398
        ('Linear 2',TestThread()),
 
399
        ('Interminable 2',TestInterminable()),
 
400
        ('Error 3',TestError())
 
401
        ]:
 
402
        tm.add_thread(thread)
 
403
        tmg.register_thread_with_dialog(desc,thread)
 
404
    def quit (*args): gtk.main_quit()
 
405
    tmg.dialog.connect('delete-event',quit)
 
406
    tmg.show()
 
407
    gtk.main()