1
from datamanager import DataManager
3
from Queue import Empty as QueueEmpty
5
from brian.utils.progressreporting import make_text_report
9
from numpy import ndarray, zeros
11
__all__ = ['run_tasks']
13
# This is the default task class used if the user provides only a function
14
class FunctionTask(object):
15
def __init__(self, func):
17
def __call__(self, *args, **kwds):
18
# If the function has a 'report' argument, we pass it the reporter
19
# function that will have been passed in kwds (see task_compute)
20
fc = self.func.func_code
21
if 'report' in fc.co_varnames[:fc.co_argcount] or fc.co_flags&8:
22
return self.func(*args, **kwds)
24
return self.func(*args)
26
def run_tasks(dataman, task, items, gui=True, poolsize=0,
27
initargs=None, initkwds=None, verbose=None,
30
Run a series of tasks using multiple CPUs on a single computer.
32
Initialised with arguments:
35
The :class:`~brian.tools.datamanager.DataManager` object used to store
36
the results in, see below.
38
The task function or class (see below).
40
A sequence (e.g. list or iterator) of arguments to be passed to the
43
Whether or not to use a Tkinter based GUI to show progress and terminate
46
The number of CPUs to use. If the value is 0, use all available CPUs,
47
if it is -1 use all but one CPU, etc.
48
``initargs``, ``initkwds``
49
If ``task`` is a class, these are the initialisation arguments and
50
keywords for the class.
52
Specify True or False to print out every progress message (defaults to
53
False if the GUI is used, or True if not).
55
For iterables (rather than fixed length sequences), if you specify the
56
number of items, an estimate of the time remaining will be given.
58
The task (defined by a function or class, see below) will be called on each
59
item in ``items``, and the results saved to ``dataman``. Results are stored
60
in the format ``(key, val)`` where ``key`` is a unique but meaningless
61
identifier. Results can be retrieved using ``dataman.values()`` or (for
62
large data sets that should be iterated over) ``dataman.itervalues()``.
64
The task can either be a function or a class. If it is a function, it will
65
be called for each item in ``items``. If the items are tuples, the function
66
will be called with those tuples as arguments (e.g. if the item is
67
``(1,2,3)`` the function will be called as ``task(1, 2, 3)``). If the task
68
is a class, it can have an ``__init__`` method that is called once for
69
each process (each CPU) at the beginning of the task run. If the ``__init__``
70
method has a ``process_number`` argument, it will be passed an integer value
71
from 0 to ``numprocesses-1`` giving the number of the process (note, this is
72
not the process ID). The class should
73
define a ``__call__`` method that behaves the same as above for ``task``
74
being a function. In both cases (function or class), if the arguments
75
include a keyword ``report`` then it will be passed a value that can be
76
passed as the ``report`` keyword in Brian's :func:`run` function to give
77
feedback on the simulation as it runs. A ``task`` function can also set
78
``self.taskname`` as a string that will be displayed on the GUI to give
79
additional information.
82
On Windows, make sure that :func:`run_tasks` is only called from
83
within a block such as::
85
if __name__=='__main__':
88
Otherwise, the program will go into a recursive loop.
90
Note that this class only allows you to run tasks on a single computer, to
91
distribute work over multiple computers, we suggest using
92
`Playdoh <http://code.google.com/p/playdoh/>`__.
94
# User can provide task as a class or a function, if its a function we
95
# we use the default FunctionTask
96
if not inspect.isclass(task):
103
if 'report' in fc.co_varnames[:fc.co_argcount] or fc.co_flags&8:
107
if numitems is None and isinstance(items, (list, tuple, ndarray)):
108
numitems = len(items)
109
# This will be used to provide process safe access to the data manager
110
# (so that multiple processes do not attempt to write to the session at
112
session = dataman.locking_computer_session()
114
numprocesses = poolsize+multiprocessing.cpu_count()
116
numprocesses = poolsize
117
manager = multiprocessing.Manager()
118
# We have to send the process number to the initializer via this silly
119
# queue because of a limitation of multiprocessing
120
process_number_queue = manager.Queue()
121
for n in range(numprocesses):
122
process_number_queue.put(n)
123
# This will be used to send messages about the status of the run, i.e.
124
# percentage complete
125
message_queue = manager.Queue()
130
pool = multiprocessing.Pool(processes=numprocesses,
131
initializer=pool_initializer,
132
initargs=(process_number_queue, message_queue,
134
task, initargs, initkwds))
135
results = pool.imap_unordered(task_compute, items)
136
# We use this to map process IDs to task number, so that we can show the
137
# information on the GUI in a consistent fashion
138
pid_to_id = dict((pid, i) for i, pid in enumerate([p.pid for p in pool._pool]))
140
stoprunningsim = [False]
142
# We acquire the datamanager session lock so that if a process is in the
143
# middle of writing data, it won't be terminated until its finished,
144
# meaning we can safely terminate the process without worrying about
149
stoprunningsim[0] = True
153
controller = GuiTaskController(numprocesses, terminate_sim,
154
verbose=verbose, will_report=will_report)
158
controller = TextTaskController(numprocesses, terminate_sim, verbose=verbose)
159
for i in range(numprocesses):
160
controller.update_process(i, 0, 0, 'No task information')
162
controller.update_overall(0, numitems)
163
def empty_message_queue():
164
while not message_queue.empty():
166
pid, taskname, elapsed, complete = message_queue.get_nowait()
167
controller.update_process(pid_to_id[pid], elapsed, complete, taskname)
174
# This waits 0.1s for a new result, and otherwise raises a
175
# TimeoutError that allows the GUI to update the percentage
177
nextresult = results.next(0.1)
178
empty_message_queue()
180
elapsed = time.time()-start
182
controller.update_overall(i, numitems)
183
except StopIteration:
187
except (KeyboardInterrupt, SystemExit):
189
print 'Terminated task processes'
191
except multiprocessing.TimeoutError:
192
empty_message_queue()
193
if stoprunningsim[0]:
194
print 'Terminated task processes'
199
# We store these values as global values, which are initialised by
200
# pool_initializer on each process
204
task_message_queue = None
206
def pool_initializer(process_number_queue, message_queue, dataman, session,
207
task, initargs, initkwds):
208
global task_object, task_dataman, task_session, task_message_queue
209
n = process_number_queue.get()
210
init_method = task.__init__
211
fc = init_method.func_code
212
# Checks if there is a process_number argument explicitly given in the
213
# __init__ method of the task class, the co_flags&8 checks i there is a
214
# **kwds parameter in the definition
215
if 'process_number' in fc.co_varnames[:fc.co_argcount] or fc.co_flags&8:
216
initkwds['process_number'] = n
217
task_object = task(*initargs, **initkwds)
218
task_dataman = dataman
219
task_session = session
220
task_message_queue = message_queue
222
def task_reporter(elapsed, complete):
223
# If the task class defines a task name, we can display it with the
224
# percentage complete
225
if hasattr(task_object, 'taskname'):
226
taskname = task_object.taskname
228
taskname = 'No task information'
229
# This queue is used by the main loop in run_tasks
230
task_message_queue.put((os.getpid(), taskname, elapsed, complete))
232
def task_compute(args):
233
if not isinstance(args, tuple):
235
# We check if the task function has a report argument, and if it does we
236
# pass it task_reporter so that it can integrate with the GUI
238
fc = task_object.__call__.func_code
239
if 'report' in fc.co_varnames[:fc.co_argcount] or fc.co_flags&8:
240
kwds['report'] = task_reporter
241
result = task_object(*args, **kwds)
242
# Save the results, with a unique key, to the locking session of the dataman
243
task_session[task_dataman.make_unique_key()] = result
245
class TaskController(object):
246
def __init__(self, processes, terminator, verbose=True):
247
self.verbose = verbose
248
self.completion = zeros(processes)
249
self.numitems, self.numdone = None, 0
250
self.start_time = time.time()
251
def update_process(self, i, elapsed, complete, msg):
252
self.completion[i] = complete%1.0
254
print 'Process '+str(i)+': '+make_text_report(elapsed, complete)+': '+msg
255
_, msg = self.get_overall_completion()
257
def get_overall_completion(self):
259
numitems, numdone = self.numitems, self.numdone
260
elapsed = time.time()-self.start_time
261
if numitems is not None:
262
complete = (numdone+sum(self.completion))/numitems
263
txt = 'Overall, '+str(numdone)+' done'
264
if numitems is not None:
265
txt += ' of '+str(numitems)+': '+make_text_report(elapsed, complete)
267
def update_overall(self, numdone, numitems):
268
self.numdone = numdone
269
self.numitems = numitems
270
def recompute_overall(self):
277
class TextTaskController(TaskController):
278
def update_overall(self, numdone, numitems):
279
TaskController.update_overall(self, numdone, numitems)
280
_, msg = self.get_overall_completion()
284
class GuiTaskController(Tkinter.Tk, TaskController):
285
def __init__(self, processes, terminator, width=600, verbose=False,
287
Tkinter.Tk.__init__(self, None)
288
TaskController.__init__(self, processes, terminator, verbose=verbose)
291
button = Tkinter.Button(self, text='Terminate task',
293
button.grid(column=0, row=0)
294
self.pb_width = width
295
self.progressbars = []
297
self.will_report = will_report
300
for i in xrange(numbars):
301
can = Tkinter.Canvas(self, width=width, height=30)
302
can.grid(column=0, row=1+i)
303
can.create_rectangle(0, 0, width, 30, fill='#aaaaaa')
308
r = can.create_rectangle(0, 0, 0, 30, fill=col, width=0)
309
t = can.create_text(width/2, 15, text='')
310
self.progressbars.append((can, r, t))
311
self.title('Task control')
313
def update_process(self, i, elapsed, complete, msg):
314
TaskController.update_process(self, i, elapsed, complete, msg)
316
can, r, t = self.progressbars[i]
317
can.itemconfigure(t, text='Process '+str(i)+': '+make_text_report(elapsed, complete)+': '+msg)
318
can.coords(r, 0, 0, int(self.pb_width*complete), 30)
319
self.recompute_overall()
321
def update_overall(self, numdone, numitems):
322
TaskController.update_overall(self, numdone, numitems)
323
self.recompute_overall()
325
def recompute_overall(self):
326
complete, msg = TaskController.get_overall_completion(self)
327
numitems = self.numitems
328
can, r, t = self.progressbars[-1]
329
can.itemconfigure(t, text=msg)
330
if numitems is not None:
331
can.coords(r, 0, 0, int(self.pb_width*complete), 30)