1
# -*- coding: utf-8 -*-
2
"""Manage background (threaded) jobs conveniently from an interactive shell.
4
This module provides a BackgroundJobManager class. This is the main class
5
meant for public usage, it implements an object which can create and manage
8
It also provides the actual job classes managed by these BackgroundJobManager
9
objects, see their docstrings below.
12
This system was inspired by discussions with B. Granger and the
13
BackgroundCommand class described in the book Python Scripting for
14
Computational Science, by H. P. Langtangen:
16
http://folk.uio.no/hpl/scripting
18
(although ultimately no code from this text was used, as IPython's system is a
19
separate implementation).
21
$Id: background_jobs.py 994 2006-01-08 08:29:44Z fperez $
24
#*****************************************************************************
25
# Copyright (C) 2005-2006 Fernando Perez <fperez@colorado.edu>
27
# Distributed under the terms of the BSD License. The full license is in
28
# the file COPYING, distributed as part of this software.
29
#*****************************************************************************
31
from IPython import Release
32
__author__ = '%s <%s>' % Release.authors['Fernando']
33
__license__ = Release.license
39
from IPython.ultraTB import AutoFormattedTB
40
from IPython.genutils import warn,error
42
class BackgroundJobManager:
43
"""Class to manage a pool of backgrounded threaded jobs.
45
Below, we assume that 'jobs' is a BackgroundJobManager instance.
47
Usage summary (see the method docstrings for details):
49
jobs.new(...) -> start a new job
51
jobs() or jobs.status() -> print status summary of all jobs
53
jobs[N] -> returns job number N.
55
foo = jobs[N].result -> assign to variable foo the result of job N
57
jobs[N].traceback() -> print the traceback of dead job N
59
jobs.remove(N) -> remove (finished) job N
61
jobs.flush_finished() -> remove all finished jobs
63
As a convenience feature, BackgroundJobManager instances provide the
64
utility result and traceback methods which retrieve the corresponding
65
information from the jobs list:
67
jobs.result(N) <--> jobs[N].result
68
jobs.traceback(N) <--> jobs[N].traceback()
70
While this appears minor, it allows you to use tab completion
71
interactively on the job manager instance.
73
In interactive mode, IPython provides the magic fuction %bg for quick
74
creation of backgrounded expression-based jobs. Type bg? for details."""
77
# Lists for job management
81
# A dict of all jobs, so users can easily access any of them
84
self._comp_report = []
85
self._dead_report = []
86
# Store status codes locally for fast lookups
87
self._s_created = BackgroundJobBase.stat_created_c
88
self._s_running = BackgroundJobBase.stat_running_c
89
self._s_completed = BackgroundJobBase.stat_completed_c
90
self._s_dead = BackgroundJobBase.stat_dead_c
92
def new(self,func_or_exp,*args,**kwargs):
93
"""Add a new background job and start it in a separate thread.
95
There are two types of jobs which can be created:
97
1. Jobs based on expressions which can be passed to an eval() call.
98
The expression must be given as a string. For example:
100
job_manager.new('myfunc(x,y,z=1)'[,glob[,loc]])
102
The given expression is passed to eval(), along with the optional
103
global/local dicts provided. If no dicts are given, they are
104
extracted automatically from the caller's frame.
106
A Python statement is NOT a valid eval() expression. Basically, you
107
can only use as an eval() argument something which can go on the right
108
of an '=' sign and be assigned to a variable.
110
For example,"print 'hello'" is not valid, but '2+3' is.
112
2. Jobs given a function object, optionally passing additional
113
positional arguments:
115
job_manager.new(myfunc,x,y)
117
The function is called with the given arguments.
119
If you need to pass keyword arguments to your function, you must
120
supply them as a dict named kw:
122
job_manager.new(myfunc,x,y,kw=dict(z=1))
124
The reason for this assymmetry is that the new() method needs to
125
maintain access to its own keywords, and this prevents name collisions
126
between arguments to new() and arguments to your own functions.
128
In both cases, the result is stored in the job.result field of the
129
background job object.
134
1. All threads running share the same standard output. Thus, if your
135
background jobs generate output, it will come out on top of whatever
136
you are currently writing. For this reason, background jobs are best
137
used with silent functions which simply return their output.
139
2. Threads also all work within the same global namespace, and this
140
system does not lock interactive variables. So if you send job to the
141
background which operates on a mutable object for a long time, and
142
start modifying that same mutable object interactively (or in another
143
backgrounded job), all sorts of bizarre behaviour will occur.
145
3. If a background job is spending a lot of time inside a C extension
146
module which does not release the Python Global Interpreter Lock
147
(GIL), this will block the IPython prompt. This is simply because the
148
Python interpreter can only switch between threads at Python
149
bytecodes. While the execution is inside C code, the interpreter must
150
simply wait unless the extension module releases the GIL.
152
4. There is no way, due to limitations in the Python threads library,
153
to kill a thread once it has started."""
155
if callable(func_or_exp):
156
kw = kwargs.get('kw',{})
157
job = BackgroundJobFunc(func_or_exp,*args,**kw)
158
elif isinstance(func_or_exp,basestring):
160
frame = sys._getframe(1)
161
glob, loc = frame.f_globals, frame.f_locals
168
'Expression jobs take at most 2 args (globals,locals)'
169
job = BackgroundJobExpr(func_or_exp,glob,loc)
172
jkeys = self.jobs_all.keys()
174
job.num = max(jkeys)+1
177
self.jobs_run.append(job)
178
self.jobs_all[job.num] = job
179
print 'Starting job # %s in a separate thread.' % job.num
183
def __getitem__(self,key):
184
return self.jobs_all[key]
187
"""An alias to self.status(),
189
This allows you to simply call a job manager instance much like the
190
Unix jobs shell command."""
194
def _update_status(self):
195
"""Update the status of the job lists.
197
This method moves finished jobs to one of two lists:
198
- self.jobs_comp: jobs which completed successfully
199
- self.jobs_dead: jobs which finished but died.
201
It also copies those jobs to corresponding _report lists. These lists
202
are used to report jobs completed/dead since the last update, and are
203
then cleared by the reporting function after each call."""
205
run,comp,dead = self._s_running,self._s_completed,self._s_dead
206
jobs_run = self.jobs_run
207
for num in range(len(jobs_run)):
213
self.jobs_comp.append(job)
214
self._comp_report.append(job)
215
jobs_run[num] = False
217
self.jobs_dead.append(job)
218
self._dead_report.append(job)
219
jobs_run[num] = False
220
self.jobs_run = filter(None,self.jobs_run)
222
def _group_report(self,group,name):
223
"""Report summary for a given job group.
225
Return True if the group had any elements."""
228
print '%s jobs:' % name
230
print '%s : %s' % (job.num,job)
234
def _group_flush(self,group,name):
235
"""Flush a given job group
237
Return True if the group had any elements."""
241
plural = {1:''}.setdefault(njobs,'s')
242
print 'Flushing %s %s job%s.' % (njobs,name,plural)
246
def _status_new(self):
247
"""Print the status of newly finished jobs.
249
Return True if any new jobs are reported.
251
This call resets its own state every time, so it only reports jobs
252
which have finished since the last time it was called."""
254
self._update_status()
255
new_comp = self._group_report(self._comp_report,'Completed')
256
new_dead = self._group_report(self._dead_report,
257
'Dead, call job.traceback() for details')
258
self._comp_report[:] = []
259
self._dead_report[:] = []
260
return new_comp or new_dead
262
def status(self,verbose=0):
263
"""Print a status of all jobs currently being managed."""
265
self._update_status()
266
self._group_report(self.jobs_run,'Running')
267
self._group_report(self.jobs_comp,'Completed')
268
self._group_report(self.jobs_dead,'Dead')
269
# Also flush the report queues
270
self._comp_report[:] = []
271
self._dead_report[:] = []
273
def remove(self,num):
274
"""Remove a finished (completed or dead) job."""
277
job = self.jobs_all[num]
279
error('Job #%s not found' % num)
281
stat_code = job.stat_code
282
if stat_code == self._s_running:
283
error('Job #%s is still running, it can not be removed.' % num)
285
elif stat_code == self._s_completed:
286
self.jobs_comp.remove(job)
287
elif stat_code == self._s_dead:
288
self.jobs_dead.remove(job)
290
def flush_finished(self):
291
"""Flush all jobs finished (completed and dead) from lists.
293
Running jobs are never flushed.
295
It first calls _status_new(), to update info. If any jobs have
296
completed since the last _status_new() call, the flush operation
299
if self._status_new():
300
error('New jobs completed since last '\
301
'_status_new(), aborting flush.')
304
# Remove the finished jobs from the master dict
305
jobs_all = self.jobs_all
306
for job in self.jobs_comp+self.jobs_dead:
307
del(jobs_all[job.num])
309
# Now flush these lists completely
310
fl_comp = self._group_flush(self.jobs_comp,'Completed')
311
fl_dead = self._group_flush(self.jobs_dead,'Dead')
312
if not (fl_comp or fl_dead):
313
print 'No jobs to flush.'
315
def result(self,num):
316
"""result(N) -> return the result of job N."""
318
return self.jobs_all[num].result
320
error('Job #%s not found' % num)
322
def traceback(self,num):
324
self.jobs_all[num].traceback()
326
error('Job #%s not found' % num)
329
class BackgroundJobBase(threading.Thread):
330
"""Base class to build BackgroundJob classes.
332
The derived classes must implement:
334
- Their own __init__, since the one here raises NotImplementedError. The
335
derived constructor must call self._init() at the end, to provide common
338
- A strform attribute used in calls to __str__.
340
- A call() method, which will make the actual execution call and must
341
return a value to be held in the 'result' field of the job object."""
343
# Class constants for status, in string and as numerical codes (when
344
# updating jobs lists, we don't want to do string comparisons). This will
345
# be done at every user prompt, so it has to be as fast as possible
346
stat_created = 'Created'; stat_created_c = 0
347
stat_running = 'Running'; stat_running_c = 1
348
stat_completed = 'Completed'; stat_completed_c = 2
349
stat_dead = 'Dead (Exception), call job.traceback() for details'
353
raise NotImplementedError, \
354
"This class can not be instantiated directly."
357
"""Common initialization for all BackgroundJob objects"""
359
for attr in ['call','strform']:
360
assert hasattr(self,attr), "Missing attribute <%s>" % attr
362
# The num tag can be set by an external job manager
365
self.status = BackgroundJobBase.stat_created
366
self.stat_code = BackgroundJobBase.stat_created_c
367
self.finished = False
368
self.result = '<BackgroundJob has not completed>'
369
# reuse the ipython traceback handler if we can get to it, otherwise
372
self._make_tb = __IPYTHON__.InteractiveTB.text
374
self._make_tb = AutoFormattedTB(mode = 'Context',
375
color_scheme='NoColor',
377
# Hold a formatted traceback if one is generated.
380
threading.Thread.__init__(self)
386
return '<BackgroundJob: %s>' % self.strform
393
self.status = BackgroundJobBase.stat_running
394
self.stat_code = BackgroundJobBase.stat_running_c
395
self.result = self.call()
397
self.status = BackgroundJobBase.stat_dead
398
self.stat_code = BackgroundJobBase.stat_dead_c
400
self.result = ('<BackgroundJob died, call job.traceback() for details>')
401
self._tb = self._make_tb()
403
self.status = BackgroundJobBase.stat_completed
404
self.stat_code = BackgroundJobBase.stat_completed_c
407
class BackgroundJobExpr(BackgroundJobBase):
408
"""Evaluate an expression as a background job (uses a separate thread)."""
410
def __init__(self,expression,glob=None,loc=None):
411
"""Create a new job from a string which can be fed to eval().
413
global/locals dicts can be provided, which will be passed to the eval
416
# fail immediately if the given expression can't be compiled
417
self.code = compile(expression,'<BackgroundJob compilation>','eval')
424
self.expression = self.strform = expression
430
return eval(self.code,self.glob,self.loc)
432
class BackgroundJobFunc(BackgroundJobBase):
433
"""Run a function call as a background job (uses a separate thread)."""
435
def __init__(self,func,*args,**kwargs):
436
"""Create a new job from a callable object.
438
Any positional arguments and keyword args given to this constructor
439
after the initial callable are passed directly to it."""
441
assert callable(func),'first argument must be callable'
451
# The string form will only include the function passed, because
452
# generating string representations of the arguments is a potentially
453
# _very_ expensive operation (e.g. with large arrays).
454
self.strform = str(func)
458
return self.func(*self.args,**self.kwargs)
461
if __name__=='__main__':
465
def sleepfunc(interval=2,*a,**kw):
466
args = dict(interval=interval,
472
def diefunc(interval=2,*a,**kw):
476
def printfunc(interval=1,reps=5):
477
for n in range(reps):
479
print 'In the background...'
481
jobs = BackgroundJobManager()
482
# first job will have # 0
483
jobs.new(sleepfunc,4)
484
jobs.new(sleepfunc,kw={'reps':2})
485
# This makes a job which will die
487
jobs.new('printfunc(1,3)')
489
# after a while, you can get the traceback of a dead job. Run the line
490
# below again interactively until it prints a traceback (check the status
495
# Run this line again until the printed result changes
496
print "The result of job #0 is:",jobs[0].result