~ubuntu-branches/ubuntu/trusty/brian/trusty

« back to all changes in this revision

Viewing changes to brian/tools/taskfarm.py

  • Committer: Package Import Robot
  • Author(s): Yaroslav Halchenko
  • Date: 2012-01-02 12:49:11 UTC
  • mfrom: (1.1.3)
  • Revision ID: package-import@ubuntu.com-20120102124911-6r1rmqgt5vr22ro3
Tags: 1.3.1-1
* Fresh upstream release
* Boosted policy compliance to 3.9.2 (no changes)
* Added up_skip_tests_with_paths patch to avoid test failures on custom 
  test scripts with hardcoded paths

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from datamanager import DataManager
 
2
import multiprocessing
 
3
from Queue import Empty as QueueEmpty
 
4
import Tkinter
 
5
from brian.utils.progressreporting import make_text_report
 
6
import inspect
 
7
import time
 
8
import os
 
9
from numpy import ndarray, zeros
 
10
 
 
11
__all__ = ['run_tasks']
 
12
 
 
13
# This is the default task class used if the user provides only a function
 
14
class FunctionTask(object):
 
15
    def __init__(self, func):
 
16
        self.func = func
 
17
    def __call__(self, *args, **kwds):
 
18
        # If the function has a 'report' argument, we pass it the reporter
 
19
        # function that will have been passed in kwds (see task_compute)
 
20
        fc = self.func.func_code
 
21
        if 'report' in fc.co_varnames[:fc.co_argcount] or fc.co_flags&8:
 
22
            return self.func(*args, **kwds)
 
23
        else:
 
24
            return self.func(*args)
 
25
 
 
26
def run_tasks(dataman, task, items, gui=True, poolsize=0,
 
27
              initargs=None, initkwds=None, verbose=None,
 
28
              numitems=None):
 
29
    '''
 
30
    Run a series of tasks using multiple CPUs on a single computer.
 
31
    
 
32
    Initialised with arguments:
 
33
    
 
34
    ``dataman``
 
35
        The :class:`~brian.tools.datamanager.DataManager` object used to store
 
36
        the results in, see below.
 
37
    ``task``
 
38
        The task function or class (see below).
 
39
    ``items``
 
40
        A sequence (e.g. list or iterator) of arguments to be passed to the
 
41
        task.
 
42
    ``gui=True``
 
43
        Whether or not to use a Tkinter based GUI to show progress and terminate
 
44
        the task run.
 
45
    ``poolsize=0``
 
46
        The number of CPUs to use. If the value is 0, use all available CPUs,
 
47
        if it is -1 use all but one CPU, etc.
 
48
    ``initargs``, ``initkwds``
 
49
        If ``task`` is a class, these are the initialisation arguments and
 
50
        keywords for the class.
 
51
    ``verbose=None``
 
52
        Specify True or False to print out every progress message (defaults to
 
53
        False if the GUI is used, or True if not).
 
54
    ``numitems=None``
 
55
        For iterables (rather than fixed length sequences), if you specify the
 
56
        number of items, an estimate of the time remaining will be given.
 
57
        
 
58
    The task (defined by a function or class, see below) will be called on each
 
59
    item in ``items``, and the results saved to ``dataman``. Results are stored
 
60
    in the format ``(key, val)`` where ``key`` is a unique but meaningless
 
61
    identifier. Results can be retrieved using ``dataman.values()`` or (for
 
62
    large data sets that should be iterated over) ``dataman.itervalues()``.
 
63
    
 
64
    The task can either be a function or a class. If it is a function, it will
 
65
    be called for each item in ``items``. If the items are tuples, the function
 
66
    will be called with those tuples as arguments (e.g. if the item is
 
67
    ``(1,2,3)`` the function will be called as ``task(1, 2, 3)``). If the task
 
68
    is a class, it can have an ``__init__`` method that is called once for
 
69
    each process (each CPU) at the beginning of the task run. If the ``__init__``
 
70
    method has a ``process_number`` argument, it will be passed an integer value
 
71
    from 0 to ``numprocesses-1`` giving the number of the process (note, this is
 
72
    not the process ID). The class should
 
73
    define a ``__call__`` method that behaves the same as above for ``task``
 
74
    being a function. In both cases (function or class), if the arguments
 
75
    include a keyword ``report`` then it will be passed a value that can be
 
76
    passed as the ``report`` keyword in Brian's :func:`run` function to give
 
77
    feedback on the simulation as it runs. A ``task`` function can also set
 
78
    ``self.taskname`` as a string that will be displayed on the GUI to give
 
79
    additional information.
 
80
    
 
81
    .. warning::
 
82
        On Windows, make sure that :func:`run_tasks` is only called from
 
83
        within a block such as::
 
84
        
 
85
            if __name__=='__main__':
 
86
                run_tasks(...)
 
87
                
 
88
        Otherwise, the program will go into a recursive loop.
 
89
        
 
90
    Note that this class only allows you to run tasks on a single computer, to
 
91
    distribute work over multiple computers, we suggest using
 
92
    `Playdoh <http://code.google.com/p/playdoh/>`__.
 
93
    '''
 
94
    # User can provide task as a class or a function, if its a function we
 
95
    # we use the default FunctionTask
 
96
    if not inspect.isclass(task):
 
97
        f = task
 
98
        initargs = (task,)
 
99
        task = FunctionTask
 
100
    else:
 
101
        f = task.__call__
 
102
    fc = f.func_code
 
103
    if 'report' in fc.co_varnames[:fc.co_argcount] or fc.co_flags&8:
 
104
        will_report = True
 
105
    else:
 
106
        will_report = False
 
107
    if numitems is None and isinstance(items, (list, tuple, ndarray)):
 
108
        numitems = len(items)
 
109
    # This will be used to provide process safe access to the data manager
 
110
    # (so that multiple processes do not attempt to write to the session at
 
111
    # the same time)
 
112
    session = dataman.locking_computer_session()
 
113
    if poolsize<=0:
 
114
        numprocesses = poolsize+multiprocessing.cpu_count()
 
115
    elif poolsize>0:
 
116
        numprocesses = poolsize
 
117
    manager = multiprocessing.Manager()
 
118
    # We have to send the process number to the initializer via this silly
 
119
    # queue because of a limitation of multiprocessing
 
120
    process_number_queue = manager.Queue()
 
121
    for n in range(numprocesses):
 
122
        process_number_queue.put(n)
 
123
    # This will be used to send messages about the status of the run, i.e.
 
124
    # percentage complete
 
125
    message_queue = manager.Queue()
 
126
    if initargs is None:
 
127
        initargs = ()
 
128
    if initkwds is None:
 
129
        initkwds = {}
 
130
    pool = multiprocessing.Pool(processes=numprocesses,
 
131
                                initializer=pool_initializer,
 
132
                                initargs=(process_number_queue, message_queue,
 
133
                                          dataman, session,
 
134
                                          task, initargs, initkwds))
 
135
    results = pool.imap_unordered(task_compute, items)
 
136
    # We use this to map process IDs to task number, so that we can show the
 
137
    # information on the GUI in a consistent fashion
 
138
    pid_to_id = dict((pid, i) for i, pid in enumerate([p.pid for p in pool._pool]))
 
139
    start = time.time()
 
140
    stoprunningsim = [False]
 
141
    def terminate_sim():
 
142
        # We acquire the datamanager session lock so that if a process is in the
 
143
        # middle of writing data, it won't be terminated until its finished,
 
144
        # meaning we can safely terminate the process without worrying about
 
145
        # data loss.
 
146
        session.acquire()
 
147
        pool.terminate()
 
148
        session.release()
 
149
        stoprunningsim[0] = True
 
150
    if gui:
 
151
        if verbose is None:
 
152
            verbose = False
 
153
        controller = GuiTaskController(numprocesses, terminate_sim,
 
154
                                       verbose=verbose, will_report=will_report)
 
155
    else:
 
156
        if verbose is None:
 
157
            verbose = True
 
158
        controller = TextTaskController(numprocesses, terminate_sim, verbose=verbose)
 
159
    for i in range(numprocesses):
 
160
        controller.update_process(i, 0, 0, 'No task information')
 
161
    i = 0
 
162
    controller.update_overall(0, numitems)
 
163
    def empty_message_queue():
 
164
        while not message_queue.empty():
 
165
            try:
 
166
                pid, taskname, elapsed, complete = message_queue.get_nowait()
 
167
                controller.update_process(pid_to_id[pid], elapsed, complete, taskname)
 
168
            except QueueEmpty:
 
169
                break
 
170
        controller.update()
 
171
        
 
172
    while True:
 
173
        try:
 
174
            # This waits 0.1s for a new result, and otherwise raises a
 
175
            # TimeoutError that allows the GUI to update the percentage
 
176
            # complete
 
177
            nextresult = results.next(0.1)
 
178
            empty_message_queue()
 
179
            i = i+1
 
180
            elapsed = time.time()-start
 
181
            complete = 0.0
 
182
            controller.update_overall(i, numitems)
 
183
        except StopIteration:
 
184
            terminate_sim()
 
185
            print 'Finished.'
 
186
            break
 
187
        except (KeyboardInterrupt, SystemExit):
 
188
            terminate_sim()
 
189
            print 'Terminated task processes'
 
190
            raise
 
191
        except multiprocessing.TimeoutError:
 
192
            empty_message_queue()
 
193
            if stoprunningsim[0]:
 
194
                print 'Terminated task processes'
 
195
                break
 
196
    controller.destroy()
 
197
 
 
198
 
 
199
# We store these values as global values, which are initialised by
 
200
# pool_initializer on each process
 
201
task_object = None
 
202
task_dataman = None
 
203
task_session = None
 
204
task_message_queue = None
 
205
 
 
206
def pool_initializer(process_number_queue, message_queue, dataman, session,
 
207
                     task, initargs, initkwds):
 
208
    global task_object, task_dataman, task_session, task_message_queue
 
209
    n = process_number_queue.get()
 
210
    init_method = task.__init__
 
211
    fc = init_method.func_code
 
212
    # Checks if there is a process_number argument explicitly given in the
 
213
    # __init__ method of the task class, the co_flags&8 checks i there is a
 
214
    # **kwds parameter in the definition
 
215
    if 'process_number' in fc.co_varnames[:fc.co_argcount] or fc.co_flags&8:
 
216
        initkwds['process_number'] = n
 
217
    task_object = task(*initargs, **initkwds)
 
218
    task_dataman = dataman
 
219
    task_session = session
 
220
    task_message_queue = message_queue
 
221
 
 
222
def task_reporter(elapsed, complete):
 
223
    # If the task class defines a task name, we can display it with the
 
224
    # percentage complete
 
225
    if hasattr(task_object, 'taskname'):
 
226
        taskname = task_object.taskname
 
227
    else:
 
228
        taskname = 'No task information'
 
229
    # This queue is used by the main loop in run_tasks
 
230
    task_message_queue.put((os.getpid(), taskname, elapsed, complete))
 
231
 
 
232
def task_compute(args):
 
233
    if not isinstance(args, tuple):
 
234
        args = (args,)
 
235
    # We check if the task function has a report argument, and if it does we
 
236
    # pass it task_reporter so that it can integrate with the GUI
 
237
    kwds = {}
 
238
    fc = task_object.__call__.func_code
 
239
    if 'report' in fc.co_varnames[:fc.co_argcount] or fc.co_flags&8:
 
240
        kwds['report'] = task_reporter
 
241
    result = task_object(*args, **kwds)
 
242
    # Save the results, with a unique key, to the locking session of the dataman
 
243
    task_session[task_dataman.make_unique_key()] = result
 
244
 
 
245
class TaskController(object):
 
246
    def __init__(self, processes, terminator, verbose=True):
 
247
        self.verbose = verbose
 
248
        self.completion = zeros(processes)
 
249
        self.numitems, self.numdone = None, 0
 
250
        self.start_time = time.time()
 
251
    def update_process(self, i, elapsed, complete, msg):
 
252
        self.completion[i] = complete%1.0
 
253
        if self.verbose:
 
254
            print 'Process '+str(i)+': '+make_text_report(elapsed, complete)+': '+msg
 
255
            _, msg = self.get_overall_completion()
 
256
            print msg
 
257
    def get_overall_completion(self):
 
258
        complete = 0.0
 
259
        numitems, numdone = self.numitems, self.numdone
 
260
        elapsed = time.time()-self.start_time
 
261
        if numitems is not None:
 
262
            complete = (numdone+sum(self.completion))/numitems
 
263
        txt = 'Overall, '+str(numdone)+' done'
 
264
        if numitems is not None:
 
265
            txt += ' of '+str(numitems)+': '+make_text_report(elapsed, complete)
 
266
        return complete, txt
 
267
    def update_overall(self, numdone, numitems):
 
268
        self.numdone = numdone
 
269
        self.numitems = numitems
 
270
    def recompute_overall(self):
 
271
        pass
 
272
    def update(self):
 
273
        pass
 
274
    def destroy(self):
 
275
        pass
 
276
 
 
277
class TextTaskController(TaskController):
 
278
    def update_overall(self, numdone, numitems):
 
279
        TaskController.update_overall(self, numdone, numitems)
 
280
        _, msg = self.get_overall_completion()
 
281
        print msg
 
282
 
 
283
# task control GUI
 
284
class GuiTaskController(Tkinter.Tk, TaskController):
 
285
    def __init__(self, processes, terminator, width=600, verbose=False,
 
286
                 will_report=True):
 
287
        Tkinter.Tk.__init__(self, None)
 
288
        TaskController.__init__(self, processes, terminator, verbose=verbose)
 
289
        self.parent = None
 
290
        self.grid()
 
291
        button = Tkinter.Button(self, text='Terminate task',
 
292
                                command=terminator)
 
293
        button.grid(column=0, row=0)
 
294
        self.pb_width = width
 
295
        self.progressbars = []
 
296
        numbars = 1
 
297
        self.will_report = will_report
 
298
        if will_report:
 
299
            numbars += processes
 
300
        for i in xrange(numbars):
 
301
            can = Tkinter.Canvas(self, width=width, height=30)
 
302
            can.grid(column=0, row=1+i)
 
303
            can.create_rectangle(0, 0, width, 30, fill='#aaaaaa')
 
304
            if i<numbars-1:
 
305
                col = '#ffaaaa'
 
306
            else:
 
307
                col = '#aaaaff'
 
308
            r = can.create_rectangle(0, 0, 0, 30, fill=col, width=0)
 
309
            t = can.create_text(width/2, 15, text='')
 
310
            self.progressbars.append((can, r, t))
 
311
        self.title('Task control')
 
312
        
 
313
    def update_process(self, i, elapsed, complete, msg):
 
314
        TaskController.update_process(self, i, elapsed, complete, msg)
 
315
        if self.will_report:
 
316
            can, r, t = self.progressbars[i]
 
317
            can.itemconfigure(t, text='Process '+str(i)+': '+make_text_report(elapsed, complete)+': '+msg)
 
318
            can.coords(r, 0, 0, int(self.pb_width*complete), 30)
 
319
        self.recompute_overall()
 
320
        
 
321
    def update_overall(self, numdone, numitems):
 
322
        TaskController.update_overall(self, numdone, numitems)
 
323
        self.recompute_overall()
 
324
    
 
325
    def recompute_overall(self):
 
326
        complete, msg = TaskController.get_overall_completion(self)
 
327
        numitems = self.numitems
 
328
        can, r, t = self.progressbars[-1]
 
329
        can.itemconfigure(t, text=msg)
 
330
        if numitems is not None:
 
331
            can.coords(r, 0, 0, int(self.pb_width*complete), 30)
 
332
        self.update()