~ubuntu-branches/ubuntu/utopic/tdb/utopic-proposed

« back to all changes in this revision

Viewing changes to buildtools/wafsamba/nothreads.py

  • Committer: Bazaar Package Importer
  • Author(s): Jelmer Vernooij
  • Date: 2010-10-04 21:55:48 UTC
  • mfrom: (1.1.12 upstream) (3.2.5 sid)
  • Revision ID: james.westby@ubuntu.com-20101004215548-43p3vge27fml13jz
Tags: 1.2.7-1
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
# encoding: utf-8
 
3
# Thomas Nagy, 2005-2008 (ita)
 
4
 
 
5
# this replaces the core of Runner.py in waf with a varient that works
 
6
# on systems with completely broken threading (such as Python 2.5.x on
 
7
# AIX). For simplicity we enable this when JOBS=1, which is triggered
 
8
# by the compatibility makefile used for the waf build. That also ensures
 
9
# this code is tested, as it means it is used in the build farm, and by
 
10
# anyone using 'make' to build Samba with waf
 
11
 
 
12
"Execute the tasks"
 
13
 
 
14
import sys, random, time, threading, traceback, os
 
15
try: from Queue import Queue
 
16
except ImportError: from queue import Queue
 
17
import Build, Utils, Logs, Options
 
18
from Logs import debug, error
 
19
from Constants import *
 
20
 
 
21
GAP = 15
 
22
 
 
23
run_old = threading.Thread.run
 
24
def run(*args, **kwargs):
 
25
        try:
 
26
                run_old(*args, **kwargs)
 
27
        except (KeyboardInterrupt, SystemExit):
 
28
                raise
 
29
        except:
 
30
                sys.excepthook(*sys.exc_info())
 
31
threading.Thread.run = run
 
32
 
 
33
 
 
34
class TaskConsumer(object):
 
35
        consumers = 1
 
36
 
 
37
def process(tsk):
 
38
        m = tsk.master
 
39
        if m.stop:
 
40
                m.out.put(tsk)
 
41
                return
 
42
 
 
43
        try:
 
44
                tsk.generator.bld.printout(tsk.display())
 
45
                if tsk.__class__.stat: ret = tsk.__class__.stat(tsk)
 
46
                # actual call to task's run() function
 
47
                else: ret = tsk.call_run()
 
48
        except Exception, e:
 
49
                tsk.err_msg = Utils.ex_stack()
 
50
                tsk.hasrun = EXCEPTION
 
51
 
 
52
                # TODO cleanup
 
53
                m.error_handler(tsk)
 
54
                m.out.put(tsk)
 
55
                return
 
56
 
 
57
        if ret:
 
58
                tsk.err_code = ret
 
59
                tsk.hasrun = CRASHED
 
60
        else:
 
61
                try:
 
62
                        tsk.post_run()
 
63
                except Utils.WafError:
 
64
                        pass
 
65
                except Exception:
 
66
                        tsk.err_msg = Utils.ex_stack()
 
67
                        tsk.hasrun = EXCEPTION
 
68
                else:
 
69
                        tsk.hasrun = SUCCESS
 
70
        if tsk.hasrun != SUCCESS:
 
71
                m.error_handler(tsk)
 
72
 
 
73
        m.out.put(tsk)
 
74
 
 
75
class Parallel(object):
 
76
        """
 
77
        keep the consumer threads busy, and avoid consuming cpu cycles
 
78
        when no more tasks can be added (end of the build, etc)
 
79
        """
 
80
        def __init__(self, bld, j=2):
 
81
 
 
82
                # number of consumers
 
83
                self.numjobs = j
 
84
 
 
85
                self.manager = bld.task_manager
 
86
                self.manager.current_group = 0
 
87
 
 
88
                self.total = self.manager.total()
 
89
 
 
90
                # tasks waiting to be processed - IMPORTANT
 
91
                self.outstanding = []
 
92
                self.maxjobs = MAXJOBS
 
93
 
 
94
                # tasks that are awaiting for another task to complete
 
95
                self.frozen = []
 
96
 
 
97
                # tasks returned by the consumers
 
98
                self.out = Queue(0)
 
99
 
 
100
                self.count = 0 # tasks not in the producer area
 
101
 
 
102
                self.processed = 1 # progress indicator
 
103
 
 
104
                self.stop = False # error condition to stop the build
 
105
                self.error = False # error flag
 
106
 
 
107
        def get_next(self):
 
108
                "override this method to schedule the tasks in a particular order"
 
109
                if not self.outstanding:
 
110
                        return None
 
111
                return self.outstanding.pop(0)
 
112
 
 
113
        def postpone(self, tsk):
 
114
                "override this method to schedule the tasks in a particular order"
 
115
                # TODO consider using a deque instead
 
116
                if random.randint(0, 1):
 
117
                        self.frozen.insert(0, tsk)
 
118
                else:
 
119
                        self.frozen.append(tsk)
 
120
 
 
121
        def refill_task_list(self):
 
122
                "called to set the next group of tasks"
 
123
 
 
124
                while self.count > self.numjobs + GAP or self.count >= self.maxjobs:
 
125
                        self.get_out()
 
126
 
 
127
                while not self.outstanding:
 
128
                        if self.count:
 
129
                                self.get_out()
 
130
 
 
131
                        if self.frozen:
 
132
                                self.outstanding += self.frozen
 
133
                                self.frozen = []
 
134
                        elif not self.count:
 
135
                                (jobs, tmp) = self.manager.get_next_set()
 
136
                                if jobs != None: self.maxjobs = jobs
 
137
                                if tmp: self.outstanding += tmp
 
138
                                break
 
139
 
 
140
        def get_out(self):
 
141
                "the tasks that are put to execute are all collected using get_out"
 
142
                ret = self.out.get()
 
143
                self.manager.add_finished(ret)
 
144
                if not self.stop and getattr(ret, 'more_tasks', None):
 
145
                        self.outstanding += ret.more_tasks
 
146
                        self.total += len(ret.more_tasks)
 
147
                self.count -= 1
 
148
 
 
149
        def error_handler(self, tsk):
 
150
                "by default, errors make the build stop (not thread safe so be careful)"
 
151
                if not Options.options.keep:
 
152
                        self.stop = True
 
153
                self.error = True
 
154
 
 
155
        def start(self):
 
156
                "execute the tasks"
 
157
 
 
158
                while not self.stop:
 
159
 
 
160
                        self.refill_task_list()
 
161
 
 
162
                        # consider the next task
 
163
                        tsk = self.get_next()
 
164
                        if not tsk:
 
165
                                if self.count:
 
166
                                        # tasks may add new ones after they are run
 
167
                                        continue
 
168
                                else:
 
169
                                        # no tasks to run, no tasks running, time to exit
 
170
                                        break
 
171
 
 
172
                        if tsk.hasrun:
 
173
                                # if the task is marked as "run", just skip it
 
174
                                self.processed += 1
 
175
                                self.manager.add_finished(tsk)
 
176
                                continue
 
177
 
 
178
                        try:
 
179
                                st = tsk.runnable_status()
 
180
                        except Exception, e:
 
181
                                self.processed += 1
 
182
                                if self.stop and not Options.options.keep:
 
183
                                        tsk.hasrun = SKIPPED
 
184
                                        self.manager.add_finished(tsk)
 
185
                                        continue
 
186
                                self.error_handler(tsk)
 
187
                                self.manager.add_finished(tsk)
 
188
                                tsk.hasrun = EXCEPTION
 
189
                                tsk.err_msg = Utils.ex_stack()
 
190
                                continue
 
191
 
 
192
                        if st == ASK_LATER:
 
193
                                self.postpone(tsk)
 
194
                        elif st == SKIP_ME:
 
195
                                self.processed += 1
 
196
                                tsk.hasrun = SKIPPED
 
197
                                self.manager.add_finished(tsk)
 
198
                        else:
 
199
                                # run me: put the task in ready queue
 
200
                                tsk.position = (self.processed, self.total)
 
201
                                self.count += 1
 
202
                                self.processed += 1
 
203
                                tsk.master = self
 
204
 
 
205
                                process(tsk)
 
206
 
 
207
                # self.count represents the tasks that have been made available to the consumer threads
 
208
                # collect all the tasks after an error else the message may be incomplete
 
209
                while self.error and self.count:
 
210
                        self.get_out()
 
211
 
 
212
                #print loop
 
213
                assert (self.count == 0 or self.stop)
 
214
 
 
215
 
 
216
# enable nothreads
 
217
import Runner
 
218
Runner.process = process
 
219
Runner.Parallel = Parallel