3
This module defines the Serial and Parallel classes that execute tasks to
4
complete a build. The Jobs class provides a higher level interface to start,
5
stop, and wait on jobs.
10
# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008 The SCons Foundation
12
# Permission is hereby granted, free of charge, to any person obtaining
13
# a copy of this software and associated documentation files (the
14
# "Software"), to deal in the Software without restriction, including
15
# without limitation the rights to use, copy, modify, merge, publish,
16
# distribute, sublicense, and/or sell copies of the Software, and to
17
# permit persons to whom the Software is furnished to do so, subject to
18
# the following conditions:
20
# The above copyright notice and this permission notice shall be included
21
# in all copies or substantial portions of the Software.
23
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
24
# KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
25
# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32
__revision__ = "src/engine/SCons/Job.py 3603 2008/10/10 05:46:45 scons"
39
# The default stack size (in kilobytes) of the threads used to execute
42
# We use a stack size of 256 kilobytes. The default on some platforms
43
# is too large and prevents us from creating enough threads to fully
44
# parallelized the build. For example, the default stack size on linux
47
explicit_stack_size = None
48
default_stack_size = 256
50
interrupt_msg = 'Build interrupted.'
55
self.interrupted = False
58
self.interrupted = True
61
return self.interrupted
65
"""An instance of this class initializes N jobs, and provides
66
methods for starting, stopping, and waiting on all N jobs.
69
def __init__(self, num, taskmaster):
71
create 'num' jobs using the given taskmaster.
73
If 'num' is 1 or less, then a serial job will be used,
74
otherwise a parallel job with 'num' worker threads will
77
The 'num_jobs' attribute will be set to the actual number of jobs
78
allocated. If more than one job is requested but the Parallel
79
class can't do it, it gets reset to 1. Wrapping interfaces that
80
care should check the value of 'num_jobs' after initialization.
85
stack_size = explicit_stack_size
86
if stack_size is None:
87
stack_size = default_stack_size
90
self.job = Parallel(taskmaster, num, stack_size)
95
self.job = Serial(taskmaster)
98
def run(self, postfunc=lambda: None):
101
postfunc() will be invoked after the jobs has run. It will be
102
invoked even if the jobs are interrupted by a keyboard
103
interrupt (well, in fact by a signal such as either SIGINT,
104
SIGTERM or SIGHUP). The execution of postfunc() is protected
105
against keyboard interrupts and is guaranteed to run to
107
self._setup_sig_handler()
112
self._reset_sig_handler()
114
def were_interrupted(self):
115
"""Returns whether the jobs were interrupted by a signal."""
116
return self.job.interrupted()
118
def _setup_sig_handler(self):
119
"""Setup an interrupt handler so that SCons can shutdown cleanly in
122
a) SIGINT: Keyboard interrupt
123
b) SIGTERM: kill or system shutdown
124
c) SIGHUP: Controlling shell exiting
126
We handle all of these cases by stopping the taskmaster. It
127
turns out that it very difficult to stop the build process
128
by throwing asynchronously an exception such as
129
KeyboardInterrupt. For example, the python Condition
130
variables (threading.Condition) and Queue's do not seem to
131
asynchronous-exception-safe. It would require adding a whole
132
bunch of try/finally block and except KeyboardInterrupt all
135
Note also that we have to be careful to handle the case when
136
SCons forks before executing another process. In that case, we
137
want the child to exit immediately.
139
def handler(signum, stack, self=self, parentpid=os.getpid()):
140
if os.getpid() == parentpid:
141
self.job.taskmaster.stop()
142
self.job.interrupted.set()
146
self.old_sigint = signal.signal(signal.SIGINT, handler)
147
self.old_sigterm = signal.signal(signal.SIGTERM, handler)
149
self.old_sighup = signal.signal(signal.SIGHUP, handler)
150
except AttributeError:
153
def _reset_sig_handler(self):
154
"""Restore the signal handlers to their previous state (before the
155
call to _setup_sig_handler()."""
157
signal.signal(signal.SIGINT, self.old_sigint)
158
signal.signal(signal.SIGTERM, self.old_sigterm)
160
signal.signal(signal.SIGHUP, self.old_sighup)
161
except AttributeError:
165
"""This class is used to execute tasks in series, and is more efficient
166
than Parallel, but is only appropriate for non-parallel builds. Only
167
one instance of this class should be in existence at a time.
169
This class is not thread safe.
172
def __init__(self, taskmaster):
173
"""Create a new serial job given a taskmaster.
175
The taskmaster's next_task() method should return the next task
176
that needs to be executed, or None if there are no more tasks. The
177
taskmaster's executed() method will be called for each task when it
178
is successfully executed or failed() will be called if it failed to
179
execute (e.g. execute() raised an exception)."""
181
self.taskmaster = taskmaster
182
self.interrupted = InterruptState()
185
"""Start the job. This will begin pulling tasks from the taskmaster
186
and executing them, and return when there are no more tasks. If a task
187
fails to execute (i.e. execute() raises an exception), then the job will
191
task = self.taskmaster.next_task()
198
if task.needs_execute():
201
if self.interrupted():
203
raise SCons.Errors.BuildError(
204
task.targets[0], errstr=interrupt_msg)
210
# Let the failed() callback function arrange for the
211
# build to stop if that's appropriate.
217
self.taskmaster.cleanup()
220
# Trap import failure so that everything in the Job module but the
221
# Parallel class (and its dependent classes) will work if the interpreter
222
# doesn't support threads.
229
class Worker(threading.Thread):
230
"""A worker thread waits on a task to be posted to its request queue,
231
dequeues the task, executes it, and posts a tuple including the task
232
and a boolean indicating whether the task executed successfully. """
234
def __init__(self, requestQueue, resultsQueue, interrupted):
235
threading.Thread.__init__(self)
237
self.requestQueue = requestQueue
238
self.resultsQueue = resultsQueue
239
self.interrupted = interrupted
244
task = self.requestQueue.get()
247
# The "None" value is used as a sentinel by
248
# ThreadPool.cleanup(). This indicates that there
249
# are no more tasks, so we should quit.
253
if self.interrupted():
254
raise SCons.Errors.BuildError(
255
task.targets[0], errstr=interrupt_msg)
263
self.resultsQueue.put((task, ok))
266
"""This class is responsible for spawning and managing worker threads."""
268
def __init__(self, num, stack_size, interrupted):
269
"""Create the request and reply queues, and 'num' worker threads.
271
One must specify the stack size of the worker threads. The
272
stack size is specified in kilobytes.
274
self.requestQueue = Queue.Queue(0)
275
self.resultsQueue = Queue.Queue(0)
278
prev_size = threading.stack_size(stack_size*1024)
279
except AttributeError, e:
280
# Only print a warning if the stack size has been
282
if not explicit_stack_size is None:
283
msg = "Setting stack size is unsupported by this version of Python:\n " + \
285
SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
286
except ValueError, e:
287
msg = "Setting stack size failed:\n " + \
289
SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
291
# Create worker threads
294
worker = Worker(self.requestQueue, self.resultsQueue, interrupted)
295
self.workers.append(worker)
297
# Once we drop Python 1.5 we can change the following to:
298
#if 'prev_size' in locals():
299
if 'prev_size' in locals().keys():
300
threading.stack_size(prev_size)
303
"""Put task into request queue."""
304
self.requestQueue.put(task)
307
"""Remove and return a result tuple from the results queue."""
308
return self.resultsQueue.get()
310
def preparation_failed(self, task):
311
self.resultsQueue.put((task, False))
315
Shuts down the thread pool, giving each worker thread a
316
chance to shut down gracefully.
318
# For each worker thread, put a sentinel "None" value
319
# on the requestQueue (indicating that there's no work
320
# to be done) so that each worker thread will get one and
321
# terminate gracefully.
322
for _ in self.workers:
323
self.requestQueue.put(None)
325
# Wait for all of the workers to terminate.
327
# If we don't do this, later Python versions (2.4, 2.5) often
328
# seem to raise exceptions during shutdown. This happens
329
# in requestQueue.get(), as an assertion failure that
330
# requestQueue.not_full is notified while not acquired,
331
# seemingly because the main thread has shut down (or is
332
# in the process of doing so) while the workers are still
333
# trying to pull sentinels off the requestQueue.
335
# Normally these terminations should happen fairly quickly,
336
# but we'll stick a one-second timeout on here just in case
338
for worker in self.workers:
343
"""This class is used to execute tasks in parallel, and is somewhat
344
less efficient than Serial, but is appropriate for parallel builds.
346
This class is thread safe.
349
def __init__(self, taskmaster, num, stack_size):
350
"""Create a new parallel job given a taskmaster.
352
The taskmaster's next_task() method should return the next
353
task that needs to be executed, or None if there are no more
354
tasks. The taskmaster's executed() method will be called
355
for each task when it is successfully executed or failed()
356
will be called if the task failed to execute (i.e. execute()
357
raised an exception).
359
Note: calls to taskmaster are serialized, but calls to
360
execute() on distinct tasks are not serialized, because
361
that is the whole point of parallel jobs: they can execute
362
multiple tasks simultaneously. """
364
self.taskmaster = taskmaster
365
self.interrupted = InterruptState()
366
self.tp = ThreadPool(num, stack_size, self.interrupted)
371
"""Start the job. This will begin pulling tasks from the
372
taskmaster and executing them, and return when there are no
373
more tasks. If a task fails to execute (i.e. execute() raises
374
an exception), then the job will stop."""
379
# Start up as many available tasks as we're
381
while jobs < self.maxjobs:
382
task = self.taskmaster.next_task()
387
# prepare task for execution
394
if task.needs_execute():
402
if not task and not jobs: break
404
# Let any/all completed tasks finish up before we go
405
# back and put the next batch of tasks on the queue.
407
task, ok = self.tp.get()
413
if self.interrupted():
415
raise SCons.Errors.BuildError(
416
task.targets[0], errstr=interrupt_msg)
420
# Let the failed() callback function arrange
421
# for the build to stop if that's appropriate.
426
if self.tp.resultsQueue.empty():
430
self.taskmaster.cleanup()