3
# Thomas Nagy, 2005-2008 (ita)
5
# this replaces the core of Runner.py in waf with a varient that works
6
# on systems with completely broken threading (such as Python 2.5.x on
7
# AIX). For simplicity we enable this when JOBS=1, which is triggered
8
# by the compatibility makefile used for the waf build. That also ensures
9
# this code is tested, as it means it is used in the build farm, and by
10
# anyone using 'make' to build Samba with waf
14
import sys, random, time, threading, traceback, os
15
try: from Queue import Queue
16
except ImportError: from queue import Queue
17
import Build, Utils, Logs, Options
18
from Logs import debug, error
19
from Constants import *
23
run_old = threading.Thread.run
24
def run(*args, **kwargs):
26
run_old(*args, **kwargs)
27
except (KeyboardInterrupt, SystemExit):
30
sys.excepthook(*sys.exc_info())
31
threading.Thread.run = run
34
class TaskConsumer(object):
44
tsk.generator.bld.printout(tsk.display())
45
if tsk.__class__.stat: ret = tsk.__class__.stat(tsk)
46
# actual call to task's run() function
47
else: ret = tsk.call_run()
49
tsk.err_msg = Utils.ex_stack()
50
tsk.hasrun = EXCEPTION
63
except Utils.WafError:
66
tsk.err_msg = Utils.ex_stack()
67
tsk.hasrun = EXCEPTION
70
if tsk.hasrun != SUCCESS:
75
class Parallel(object):
77
keep the consumer threads busy, and avoid consuming cpu cycles
78
when no more tasks can be added (end of the build, etc)
80
def __init__(self, bld, j=2):
85
self.manager = bld.task_manager
86
self.manager.current_group = 0
88
self.total = self.manager.total()
90
# tasks waiting to be processed - IMPORTANT
92
self.maxjobs = MAXJOBS
94
# tasks that are awaiting for another task to complete
97
# tasks returned by the consumers
100
self.count = 0 # tasks not in the producer area
102
self.processed = 1 # progress indicator
104
self.stop = False # error condition to stop the build
105
self.error = False # error flag
108
"override this method to schedule the tasks in a particular order"
109
if not self.outstanding:
111
return self.outstanding.pop(0)
113
def postpone(self, tsk):
114
"override this method to schedule the tasks in a particular order"
115
# TODO consider using a deque instead
116
if random.randint(0, 1):
117
self.frozen.insert(0, tsk)
119
self.frozen.append(tsk)
121
def refill_task_list(self):
122
"called to set the next group of tasks"
124
while self.count > self.numjobs + GAP or self.count >= self.maxjobs:
127
while not self.outstanding:
132
self.outstanding += self.frozen
135
(jobs, tmp) = self.manager.get_next_set()
136
if jobs != None: self.maxjobs = jobs
137
if tmp: self.outstanding += tmp
141
"the tasks that are put to execute are all collected using get_out"
143
self.manager.add_finished(ret)
144
if not self.stop and getattr(ret, 'more_tasks', None):
145
self.outstanding += ret.more_tasks
146
self.total += len(ret.more_tasks)
149
def error_handler(self, tsk):
150
"by default, errors make the build stop (not thread safe so be careful)"
151
if not Options.options.keep:
160
self.refill_task_list()
162
# consider the next task
163
tsk = self.get_next()
166
# tasks may add new ones after they are run
169
# no tasks to run, no tasks running, time to exit
173
# if the task is marked as "run", just skip it
175
self.manager.add_finished(tsk)
179
st = tsk.runnable_status()
182
if self.stop and not Options.options.keep:
184
self.manager.add_finished(tsk)
186
self.error_handler(tsk)
187
self.manager.add_finished(tsk)
188
tsk.hasrun = EXCEPTION
189
tsk.err_msg = Utils.ex_stack()
197
self.manager.add_finished(tsk)
199
# run me: put the task in ready queue
200
tsk.position = (self.processed, self.total)
207
# self.count represents the tasks that have been made available to the consumer threads
208
# collect all the tasks after an error else the message may be incomplete
209
while self.error and self.count:
213
assert (self.count == 0 or self.stop)
218
Runner.process = process
219
Runner.Parallel = Parallel