~ellisonbg/ipython/bugfixes0411409

« back to all changes in this revision

Viewing changes to IPython/background_jobs.py

  • Committer: ville
  • Date: 2008-02-16 09:50:47 UTC
  • mto: (0.12.1 ipython_main)
  • mto: This revision was merged to the branch mainline in revision 990.
  • Revision ID: ville@ville-pc-20080216095047-500x6dluki1iz40o
initialization (no svn history)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- coding: utf-8 -*-
 
2
"""Manage background (threaded) jobs conveniently from an interactive shell.
 
3
 
 
4
This module provides a BackgroundJobManager class.  This is the main class
 
5
meant for public usage, it implements an object which can create and manage
 
6
new background jobs.
 
7
 
 
8
It also provides the actual job classes managed by these BackgroundJobManager
 
9
objects, see their docstrings below.
 
10
 
 
11
 
 
12
This system was inspired by discussions with B. Granger and the
 
13
BackgroundCommand class described in the book Python Scripting for
 
14
Computational Science, by H. P. Langtangen:
 
15
 
 
16
http://folk.uio.no/hpl/scripting
 
17
 
 
18
(although ultimately no code from this text was used, as IPython's system is a
 
19
separate implementation).
 
20
 
 
21
$Id: background_jobs.py 994 2006-01-08 08:29:44Z fperez $
 
22
"""
 
23
 
 
24
#*****************************************************************************
 
25
#       Copyright (C) 2005-2006 Fernando Perez <fperez@colorado.edu>
 
26
#
 
27
#  Distributed under the terms of the BSD License.  The full license is in
 
28
#  the file COPYING, distributed as part of this software.
 
29
#*****************************************************************************
 
30
 
 
31
from IPython import Release
 
32
__author__  = '%s <%s>' % Release.authors['Fernando']
 
33
__license__ = Release.license
 
34
 
 
35
# Code begins
 
36
import sys
 
37
import threading
 
38
 
 
39
from IPython.ultraTB import AutoFormattedTB
 
40
from IPython.genutils import warn,error
 
41
 
 
42
class BackgroundJobManager:
 
43
    """Class to manage a pool of backgrounded threaded jobs.
 
44
 
 
45
    Below, we assume that 'jobs' is a BackgroundJobManager instance.
 
46
    
 
47
    Usage summary (see the method docstrings for details):
 
48
 
 
49
      jobs.new(...) -> start a new job
 
50
      
 
51
      jobs() or jobs.status() -> print status summary of all jobs
 
52
 
 
53
      jobs[N] -> returns job number N.
 
54
 
 
55
      foo = jobs[N].result -> assign to variable foo the result of job N
 
56
 
 
57
      jobs[N].traceback() -> print the traceback of dead job N
 
58
 
 
59
      jobs.remove(N) -> remove (finished) job N
 
60
 
 
61
      jobs.flush_finished() -> remove all finished jobs
 
62
      
 
63
    As a convenience feature, BackgroundJobManager instances provide the
 
64
    utility result and traceback methods which retrieve the corresponding
 
65
    information from the jobs list:
 
66
 
 
67
      jobs.result(N) <--> jobs[N].result
 
68
      jobs.traceback(N) <--> jobs[N].traceback()
 
69
 
 
70
    While this appears minor, it allows you to use tab completion
 
71
    interactively on the job manager instance.
 
72
 
 
73
    In interactive mode, IPython provides the magic fuction %bg for quick
 
74
    creation of backgrounded expression-based jobs. Type bg? for details."""
 
75
 
 
76
    def __init__(self):
 
77
        # Lists for job management
 
78
        self.jobs_run  = []
 
79
        self.jobs_comp = []
 
80
        self.jobs_dead = []
 
81
        # A dict of all jobs, so users can easily access any of them
 
82
        self.jobs_all = {}
 
83
        # For reporting
 
84
        self._comp_report = []
 
85
        self._dead_report = []
 
86
        # Store status codes locally for fast lookups
 
87
        self._s_created   = BackgroundJobBase.stat_created_c
 
88
        self._s_running   = BackgroundJobBase.stat_running_c
 
89
        self._s_completed = BackgroundJobBase.stat_completed_c
 
90
        self._s_dead      = BackgroundJobBase.stat_dead_c
 
91
 
 
92
    def new(self,func_or_exp,*args,**kwargs):
 
93
        """Add a new background job and start it in a separate thread.
 
94
 
 
95
        There are two types of jobs which can be created:
 
96
 
 
97
        1. Jobs based on expressions which can be passed to an eval() call.
 
98
        The expression must be given as a string.  For example:
 
99
 
 
100
          job_manager.new('myfunc(x,y,z=1)'[,glob[,loc]])
 
101
 
 
102
        The given expression is passed to eval(), along with the optional
 
103
        global/local dicts provided.  If no dicts are given, they are
 
104
        extracted automatically from the caller's frame.
 
105
        
 
106
        A Python statement is NOT a valid eval() expression.  Basically, you
 
107
        can only use as an eval() argument something which can go on the right
 
108
        of an '=' sign and be assigned to a variable.
 
109
 
 
110
        For example,"print 'hello'" is not valid, but '2+3' is.
 
111
 
 
112
        2. Jobs given a function object, optionally passing additional
 
113
        positional arguments:
 
114
 
 
115
          job_manager.new(myfunc,x,y)
 
116
 
 
117
        The function is called with the given arguments.
 
118
 
 
119
        If you need to pass keyword arguments to your function, you must
 
120
        supply them as a dict named kw:
 
121
 
 
122
          job_manager.new(myfunc,x,y,kw=dict(z=1))
 
123
 
 
124
        The reason for this assymmetry is that the new() method needs to
 
125
        maintain access to its own keywords, and this prevents name collisions
 
126
        between arguments to new() and arguments to your own functions.
 
127
 
 
128
        In both cases, the result is stored in the job.result field of the
 
129
        background job object.
 
130
 
 
131
 
 
132
        Notes and caveats:
 
133
 
 
134
        1. All threads running share the same standard output.  Thus, if your
 
135
        background jobs generate output, it will come out on top of whatever
 
136
        you are currently writing.  For this reason, background jobs are best
 
137
        used with silent functions which simply return their output.
 
138
 
 
139
        2. Threads also all work within the same global namespace, and this
 
140
        system does not lock interactive variables.  So if you send job to the
 
141
        background which operates on a mutable object for a long time, and
 
142
        start modifying that same mutable object interactively (or in another
 
143
        backgrounded job), all sorts of bizarre behaviour will occur.
 
144
 
 
145
        3. If a background job is spending a lot of time inside a C extension
 
146
        module which does not release the Python Global Interpreter Lock
 
147
        (GIL), this will block the IPython prompt.  This is simply because the
 
148
        Python interpreter can only switch between threads at Python
 
149
        bytecodes.  While the execution is inside C code, the interpreter must
 
150
        simply wait unless the extension module releases the GIL.
 
151
 
 
152
        4. There is no way, due to limitations in the Python threads library,
 
153
        to kill a thread once it has started."""
 
154
        
 
155
        if callable(func_or_exp):
 
156
            kw  = kwargs.get('kw',{})
 
157
            job = BackgroundJobFunc(func_or_exp,*args,**kw)
 
158
        elif isinstance(func_or_exp,basestring):
 
159
            if not args:
 
160
                frame = sys._getframe(1)
 
161
                glob, loc = frame.f_globals, frame.f_locals
 
162
            elif len(args)==1:
 
163
                glob = loc = args[0]
 
164
            elif len(args)==2:
 
165
                glob,loc = args
 
166
            else:
 
167
                raise ValueError,\
 
168
                      'Expression jobs take at most 2 args (globals,locals)'
 
169
            job = BackgroundJobExpr(func_or_exp,glob,loc)
 
170
        else:
 
171
            raise
 
172
        jkeys = self.jobs_all.keys()
 
173
        if jkeys:
 
174
            job.num = max(jkeys)+1
 
175
        else:
 
176
            job.num = 0
 
177
        self.jobs_run.append(job)
 
178
        self.jobs_all[job.num] = job
 
179
        print 'Starting job # %s in a separate thread.' % job.num
 
180
        job.start()
 
181
        return job
 
182
 
 
183
    def __getitem__(self,key):
 
184
        return self.jobs_all[key]
 
185
 
 
186
    def __call__(self):
 
187
        """An alias to self.status(),
 
188
 
 
189
        This allows you to simply call a job manager instance much like the
 
190
        Unix jobs shell command."""
 
191
 
 
192
        return self.status()
 
193
 
 
194
    def _update_status(self):
 
195
        """Update the status of the job lists.
 
196
 
 
197
        This method moves finished jobs to one of two lists:
 
198
          - self.jobs_comp: jobs which completed successfully
 
199
          - self.jobs_dead: jobs which finished but died.
 
200
 
 
201
        It also copies those jobs to corresponding _report lists.  These lists
 
202
        are used to report jobs completed/dead since the last update, and are
 
203
        then cleared by the reporting function after each call."""
 
204
        
 
205
        run,comp,dead = self._s_running,self._s_completed,self._s_dead
 
206
        jobs_run = self.jobs_run
 
207
        for num in range(len(jobs_run)):
 
208
            job  = jobs_run[num]
 
209
            stat = job.stat_code
 
210
            if stat == run:
 
211
                continue
 
212
            elif stat == comp:
 
213
                self.jobs_comp.append(job)
 
214
                self._comp_report.append(job)
 
215
                jobs_run[num] = False
 
216
            elif stat == dead:
 
217
                self.jobs_dead.append(job)
 
218
                self._dead_report.append(job)
 
219
                jobs_run[num] = False
 
220
        self.jobs_run = filter(None,self.jobs_run)
 
221
 
 
222
    def _group_report(self,group,name):
 
223
        """Report summary for a given job group.
 
224
 
 
225
        Return True if the group had any elements."""
 
226
 
 
227
        if group:
 
228
            print '%s jobs:' % name
 
229
            for job in group:
 
230
                print '%s : %s' % (job.num,job)
 
231
            print
 
232
            return True
 
233
 
 
234
    def _group_flush(self,group,name):
 
235
        """Flush a given job group
 
236
 
 
237
        Return True if the group had any elements."""
 
238
 
 
239
        njobs = len(group)
 
240
        if njobs:
 
241
            plural = {1:''}.setdefault(njobs,'s')
 
242
            print 'Flushing %s %s job%s.' % (njobs,name,plural)
 
243
            group[:] = []
 
244
            return True
 
245
        
 
246
    def _status_new(self):
 
247
        """Print the status of newly finished jobs.
 
248
 
 
249
        Return True if any new jobs are reported.
 
250
 
 
251
        This call resets its own state every time, so it only reports jobs
 
252
        which have finished since the last time it was called."""
 
253
 
 
254
        self._update_status()
 
255
        new_comp = self._group_report(self._comp_report,'Completed')
 
256
        new_dead = self._group_report(self._dead_report,
 
257
                                      'Dead, call job.traceback() for details')
 
258
        self._comp_report[:] = []
 
259
        self._dead_report[:] = []
 
260
        return new_comp or new_dead
 
261
                
 
262
    def status(self,verbose=0):
 
263
        """Print a status of all jobs currently being managed."""
 
264
 
 
265
        self._update_status()
 
266
        self._group_report(self.jobs_run,'Running')
 
267
        self._group_report(self.jobs_comp,'Completed')
 
268
        self._group_report(self.jobs_dead,'Dead')
 
269
        # Also flush the report queues
 
270
        self._comp_report[:] = []
 
271
        self._dead_report[:] = []
 
272
 
 
273
    def remove(self,num):
 
274
        """Remove a finished (completed or dead) job."""
 
275
 
 
276
        try:
 
277
            job = self.jobs_all[num]
 
278
        except KeyError:
 
279
            error('Job #%s not found' % num)
 
280
        else:
 
281
            stat_code = job.stat_code
 
282
            if stat_code == self._s_running:
 
283
                error('Job #%s is still running, it can not be removed.' % num)
 
284
                return
 
285
            elif stat_code == self._s_completed:
 
286
                self.jobs_comp.remove(job)
 
287
            elif stat_code == self._s_dead:
 
288
                self.jobs_dead.remove(job)
 
289
 
 
290
    def flush_finished(self):
 
291
        """Flush all jobs finished (completed and dead) from lists.
 
292
 
 
293
        Running jobs are never flushed.
 
294
 
 
295
        It first calls _status_new(), to update info. If any jobs have
 
296
        completed since the last _status_new() call, the flush operation
 
297
        aborts."""
 
298
 
 
299
        if self._status_new():
 
300
            error('New jobs completed since last '\
 
301
                  '_status_new(), aborting flush.')
 
302
            return
 
303
 
 
304
        # Remove the finished jobs from the master dict
 
305
        jobs_all = self.jobs_all
 
306
        for job in self.jobs_comp+self.jobs_dead:
 
307
            del(jobs_all[job.num])
 
308
 
 
309
        # Now flush these lists completely
 
310
        fl_comp = self._group_flush(self.jobs_comp,'Completed')
 
311
        fl_dead = self._group_flush(self.jobs_dead,'Dead')
 
312
        if not (fl_comp or fl_dead):
 
313
            print 'No jobs to flush.'
 
314
 
 
315
    def result(self,num):
 
316
        """result(N) -> return the result of job N."""
 
317
        try:
 
318
            return self.jobs_all[num].result
 
319
        except KeyError:
 
320
            error('Job #%s not found' % num)
 
321
 
 
322
    def traceback(self,num):
 
323
        try:
 
324
            self.jobs_all[num].traceback()
 
325
        except KeyError:
 
326
            error('Job #%s not found' % num)
 
327
 
 
328
 
 
329
class BackgroundJobBase(threading.Thread):
 
330
    """Base class to build BackgroundJob classes.
 
331
 
 
332
    The derived classes must implement:
 
333
 
 
334
    - Their own __init__, since the one here raises NotImplementedError.  The
 
335
    derived constructor must call self._init() at the end, to provide common
 
336
    initialization.
 
337
 
 
338
    - A strform attribute used in calls to __str__.
 
339
 
 
340
    - A call() method, which will make the actual execution call and must
 
341
    return a value to be held in the 'result' field of the job object."""
 
342
 
 
343
    # Class constants for status, in string and as numerical codes (when
 
344
    # updating jobs lists, we don't want to do string comparisons).  This will
 
345
    # be done at every user prompt, so it has to be as fast as possible
 
346
    stat_created   = 'Created'; stat_created_c = 0
 
347
    stat_running   = 'Running'; stat_running_c = 1
 
348
    stat_completed = 'Completed'; stat_completed_c = 2
 
349
    stat_dead      = 'Dead (Exception), call job.traceback() for details'
 
350
    stat_dead_c = -1
 
351
 
 
352
    def __init__(self):
 
353
        raise NotImplementedError, \
 
354
              "This class can not be instantiated directly."
 
355
 
 
356
    def _init(self):
 
357
        """Common initialization for all BackgroundJob objects"""
 
358
        
 
359
        for attr in ['call','strform']:
 
360
            assert hasattr(self,attr), "Missing attribute <%s>" % attr
 
361
        
 
362
        # The num tag can be set by an external job manager
 
363
        self.num = None
 
364
      
 
365
        self.status    = BackgroundJobBase.stat_created
 
366
        self.stat_code = BackgroundJobBase.stat_created_c
 
367
        self.finished  = False
 
368
        self.result    = '<BackgroundJob has not completed>'
 
369
        # reuse the ipython traceback handler if we can get to it, otherwise
 
370
        # make a new one
 
371
        try:
 
372
            self._make_tb = __IPYTHON__.InteractiveTB.text
 
373
        except:
 
374
            self._make_tb = AutoFormattedTB(mode = 'Context',
 
375
                                           color_scheme='NoColor',
 
376
                                           tb_offset = 1).text
 
377
        # Hold a formatted traceback if one is generated.
 
378
        self._tb = None
 
379
        
 
380
        threading.Thread.__init__(self)
 
381
 
 
382
    def __str__(self):
 
383
        return self.strform
 
384
 
 
385
    def __repr__(self):
 
386
        return '<BackgroundJob: %s>' % self.strform
 
387
 
 
388
    def traceback(self):
 
389
        print self._tb
 
390
        
 
391
    def run(self):
 
392
        try:
 
393
            self.status    = BackgroundJobBase.stat_running
 
394
            self.stat_code = BackgroundJobBase.stat_running_c
 
395
            self.result    = self.call()
 
396
        except:
 
397
            self.status    = BackgroundJobBase.stat_dead
 
398
            self.stat_code = BackgroundJobBase.stat_dead_c
 
399
            self.finished  = None
 
400
            self.result    = ('<BackgroundJob died, call job.traceback() for details>')
 
401
            self._tb       = self._make_tb()
 
402
        else:
 
403
            self.status    = BackgroundJobBase.stat_completed
 
404
            self.stat_code = BackgroundJobBase.stat_completed_c
 
405
            self.finished  = True
 
406
 
 
407
class BackgroundJobExpr(BackgroundJobBase):
 
408
    """Evaluate an expression as a background job (uses a separate thread)."""
 
409
 
 
410
    def __init__(self,expression,glob=None,loc=None):
 
411
        """Create a new job from a string which can be fed to eval().
 
412
 
 
413
        global/locals dicts can be provided, which will be passed to the eval
 
414
        call."""
 
415
 
 
416
        # fail immediately if the given expression can't be compiled
 
417
        self.code = compile(expression,'<BackgroundJob compilation>','eval')
 
418
                
 
419
        if glob is None:
 
420
            glob = {}
 
421
        if loc is None:
 
422
            loc = {}
 
423
            
 
424
        self.expression = self.strform = expression
 
425
        self.glob = glob
 
426
        self.loc = loc
 
427
        self._init()
 
428
        
 
429
    def call(self):
 
430
        return eval(self.code,self.glob,self.loc)
 
431
 
 
432
class BackgroundJobFunc(BackgroundJobBase):
 
433
    """Run a function call as a background job (uses a separate thread)."""
 
434
 
 
435
    def __init__(self,func,*args,**kwargs):
 
436
        """Create a new job from a callable object.
 
437
 
 
438
        Any positional arguments and keyword args given to this constructor
 
439
        after the initial callable are passed directly to it."""
 
440
 
 
441
        assert callable(func),'first argument must be callable'
 
442
        
 
443
        if args is None:
 
444
            args = []
 
445
        if kwargs is None:
 
446
            kwargs = {}
 
447
        
 
448
        self.func = func
 
449
        self.args = args
 
450
        self.kwargs = kwargs
 
451
        # The string form will only include the function passed, because
 
452
        # generating string representations of the arguments is a potentially
 
453
        # _very_ expensive operation (e.g. with large arrays).
 
454
        self.strform = str(func)
 
455
        self._init()
 
456
 
 
457
    def call(self):
 
458
        return self.func(*self.args,**self.kwargs)
 
459
 
 
460
 
 
461
if __name__=='__main__':
 
462
 
 
463
    import time
 
464
 
 
465
    def sleepfunc(interval=2,*a,**kw):
 
466
        args = dict(interval=interval,
 
467
                    args=a,
 
468
                    kwargs=kw)
 
469
        time.sleep(interval)
 
470
        return args
 
471
 
 
472
    def diefunc(interval=2,*a,**kw):
 
473
        time.sleep(interval)
 
474
        die
 
475
 
 
476
    def printfunc(interval=1,reps=5):
 
477
        for n in range(reps):
 
478
            time.sleep(interval)
 
479
            print 'In the background...'
 
480
 
 
481
    jobs = BackgroundJobManager()
 
482
    # first job will have # 0
 
483
    jobs.new(sleepfunc,4)
 
484
    jobs.new(sleepfunc,kw={'reps':2})
 
485
    # This makes a job which will die
 
486
    jobs.new(diefunc,1)
 
487
    jobs.new('printfunc(1,3)')
 
488
 
 
489
    # after a while, you can get the traceback of a dead job.  Run the line
 
490
    # below again interactively until it prints a traceback (check the status
 
491
    # of the job):
 
492
    print jobs[1].status
 
493
    jobs[1].traceback()
 
494
    
 
495
    # Run this line again until the printed result changes
 
496
    print "The result of job #0 is:",jobs[0].result