2
# Twisted, the Framework of Your Internet
3
# Copyright (C) 2001 Matthew W. Lefkowitz
5
# This library is free software; you can redistribute it and/or
6
# modify it under the terms of version 2.1 of the GNU Lesser General Public
7
# License as published by the Free Software Foundation.
9
# This library is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12
# Lesser General Public License for more details.
14
# You should have received a copy of the GNU Lesser General Public
15
# License along with this library; if not, write to the Free Software
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""A task scheduler that is integrated with the main event loop.
23
from twisted.python import threadable
24
from twisted.python import log
25
from twisted.python import delay
26
from twisted.python import failure
28
class ThreadedScheduler:
29
"""I am a thread-aware delayed scheduler of for synchronous event loops.
31
This lets threads execute non-thread safe code by adding it to the
32
scheduler. Tasks added to this scheduler will *not* be stored persistently.
34
This is an implementation of the Active Object pattern, and can be used
35
as part of the queueing layer for the Async/Half-Async pattern. The other
36
half the Async/Half-Async pattern is twisted.internet.threadtask.
40
1) POSA2 book - http://www.cs.wustl.edu/~schmidt/POSA/
42
2) Active Object - http://www.cs.wustl.edu/~schmidt/PDF/Act-Obj.pdf
44
3) Async/Half-Async - http://www.cs.wustl.edu/~schmidt/PDF/PLoP-95.pdf
47
__implements__ = delay.IDelayed,
51
self._lock = thread.allocate_lock()
53
def __getstate__(self):
56
def __setstate__(self):
59
def addTask(self, function, *args, **kwargs):
60
"""Schedule a function to be called by the main event-loop thread.
62
The result of the function will not be returned.
64
threadTasks = self.threadTasks
65
hadNoTasks = (threadTasks == {})
66
id = thread.get_ident()
70
if not threadTasks.has_key(id):
71
threadTasks[id] = [(function, args, kwargs)]
73
threadTasks[id].append((function, args, kwargs))
81
"""Either I have work to do immediately, or no work to do at all.
88
def runUntilCurrent(self):
89
threadTasks = self.threadTasks
94
for thread, tasks in threadTasks.items():
95
tasksTodo.append(tasks.pop(0))
96
if tasks: tasksTodo.append(tasks.pop(0))
98
del threadTasks[thread]
102
for func, args, kwargs in tasksTodo:
103
apply(func, args, kwargs)
107
"""I am a non-thread-safe delayed scheduler for synchronous event loops.
110
__implements__ = delay.IDelayed,
115
def addTask(self, function, *args, **kwargs):
116
self.tasks.append((function, args, kwargs))
119
"""Either I have work to do immediately, or no work to do at all.
126
def runUntilCurrent(self):
129
for function, args, kwargs in tasks:
130
apply(function, args, kwargs)
134
global theScheduler, thread, schedule
140
# there may already be a registered scheduler, so we need to get
143
main.removeDelayed(theScheduler)
144
except (NameError, ValueError):
147
theScheduler = ThreadedScheduler()
148
schedule = theScheduler.addTask
149
main.addDelayed(theScheduler)
151
theScheduler = Scheduler()
152
schedule = theScheduler.addTask
153
threadable.whenThreaded(initThreads)
157
"""Run all tasks in the scheduler.
159
while theScheduler.timeout() != None:
160
theScheduler.runUntilCurrent()
164
"""I am a set of steps that get executed.
166
Each "step" is a method to call and some arguments to call it with. I am
167
to be used with the Scheduler class - after each step is called I will
168
readd myself to the scheduler if I still have steps to do.
171
def __init__(self, steps=None, scheduler=theScheduler):
174
# Format for 'steps' is [[func,args,kw],[func,args,kw], ...]
180
self.scheduler = scheduler
182
def addWork(self, callable, *args, **kw):
183
self.steps.append([callable, args, kw])
186
if self.progress < len(self.steps):
187
func, args, kw = self.steps[self.progress]
189
apply(func, args, kw)
191
log.msg( 'Exception in Task' )
195
self.progress = self.progress + 1
196
# if we still have tasks left we add ourselves to the scheduler
197
if self.progress < len(self.steps):
198
self.scheduler.addTask(self)
205
main.addDelayed(theScheduler)