1
# -*- test-case-name: twisted.test.test_task,twisted.test.test_cooperator -*-
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
Scheduling utility methods and classes.
15
from zope.interface import implements
17
from twisted.python import reflect
18
from twisted.python.failure import Failure
20
from twisted.internet import base, defer
21
from twisted.internet.interfaces import IReactorTime
25
"""Call a function repeatedly.
27
If C{f} returns a deferred, rescheduling will not take place until the
28
deferred has fired. The result value is ignored.
30
@ivar f: The function to call.
31
@ivar a: A tuple of arguments to pass the function.
32
@ivar kw: A dictionary of keyword arguments to pass to the function.
33
@ivar clock: A provider of
34
L{twisted.internet.interfaces.IReactorTime}. The default is
35
L{twisted.internet.reactor}. Feel free to set this to
36
something else, but it probably ought to be set *before*
39
@type _expectNextCallAt: C{float}
40
@ivar _expectNextCallAt: The time at which this instance most recently
41
scheduled itself to run.
43
@type _realLastTime: C{float}
44
@ivar _realLastTime: When counting skips, the time at which the skip counter
47
@type _runAtStart: C{bool}
48
@ivar _runAtStart: A flag indicating whether the 'now' argument was passed
49
to L{LoopingCall.start}.
56
_expectNextCallAt = 0.0
60
def __init__(self, f, *a, **kw):
64
from twisted.internet import reactor
68
def withCount(cls, countCallable):
70
An alternate constructor for L{LoopingCall} that makes available the
71
number of calls which should have occurred since it was last invoked.
73
Note that this number is an C{int} value; It represents the discrete
74
number of calls that should have been made. For example, if you are
75
using a looping call to display an animation with discrete frames, this
76
number would be the number of frames to advance.
78
The count is normally 1, but can be higher. For example, if the reactor
79
is blocked and takes too long to invoke the L{LoopingCall}, a Deferred
80
returned from a previous call is not fired before an interval has
81
elapsed, or if the callable itself blocks for longer than an interval,
82
preventing I{itself} from being called.
84
@param countCallable: A callable that will be invoked each time the
85
resulting LoopingCall is run, with an integer specifying the number
86
of calls that should have been invoked.
88
@type countCallable: 1-argument callable which takes an C{int}
90
@return: An instance of L{LoopingCall} with call counting enabled,
91
which provides the count as the first positional argument.
93
@rtype: L{LoopingCall}
99
now = self.clock.seconds()
100
lastTime = self._realLastTime
102
lastTime = self.starttime
104
lastTime -= self.interval
105
self._realLastTime = now
106
lastInterval = self._intervalOf(lastTime)
107
thisInterval = self._intervalOf(now)
108
count = thisInterval - lastInterval
109
return countCallable(count)
113
self._realLastTime = None
117
withCount = classmethod(withCount)
120
def _intervalOf(self, t):
122
Determine the number of intervals passed as of the given point in
125
@param t: The specified time (from the start of the L{LoopingCall}) to
126
be measured in intervals
128
@return: The C{int} number of intervals which have passed as of the
131
elapsedTime = t - self.starttime
132
intervalNum = int(elapsedTime / self.interval)
136
def start(self, interval, now=True):
138
Start running function every interval seconds.
140
@param interval: The number of seconds between calls. May be
141
less than one. Precision will depend on the underlying
142
platform, the available hardware, and the load on the system.
144
@param now: If True, run this call right now. Otherwise, wait
145
until the interval has elapsed before beginning.
147
@return: A Deferred whose callback will be invoked with
148
C{self} when C{self.stop} is called, or whose errback will be
149
invoked when the function raises an exception or returned a
150
deferred that has its errback invoked.
152
assert not self.running, ("Tried to start an already running "
155
raise ValueError, "interval must be >= 0"
157
d = self.deferred = defer.Deferred()
158
self.starttime = self.clock.seconds()
159
self._expectNextCallAt = self.starttime
160
self.interval = interval
161
self._runAtStart = now
169
"""Stop running function.
171
assert self.running, ("Tried to stop a LoopingCall that was "
174
if self.call is not None:
177
d, self.deferred = self.deferred, None
185
d, self.deferred = self.deferred, None
190
d, self.deferred = self.deferred, None
194
d = defer.maybeDeferred(self.f, *self.a, **self.kw)
199
def _reschedule(self):
201
Schedule the next iteration of this looping call.
203
if self.interval == 0:
204
self.call = self.clock.callLater(0, self)
207
currentTime = self.clock.seconds()
208
# Find how long is left until the interval comes around again.
209
untilNextTime = (self._expectNextCallAt - currentTime) % self.interval
210
# Make sure it is in the future, in case more than one interval worth
211
# of time passed since the previous call was made.
213
self._expectNextCallAt + self.interval, currentTime + untilNextTime)
214
# If the interval falls on the current time exactly, skip it and
215
# schedule the call for the next interval.
216
if nextTime == currentTime:
217
nextTime += self.interval
218
self._expectNextCallAt = nextTime
219
self.call = self.clock.callLater(nextTime - currentTime, self)
223
if hasattr(self.f, 'func_name'):
224
func = self.f.func_name
225
if hasattr(self.f, 'im_class'):
226
func = self.f.im_class.__name__ + '.' + func
228
func = reflect.safe_repr(self.f)
230
return 'LoopingCall<%r>(%s, *%s, **%s)' % (
231
self.interval, func, reflect.safe_repr(self.a),
232
reflect.safe_repr(self.kw))
236
class SchedulerError(Exception):
238
The operation could not be completed because the scheduler or one of its
239
tasks was in an invalid state. This exception should not be raised
240
directly, but is a superclass of various scheduler-state-related
246
class SchedulerStopped(SchedulerError):
248
The operation could not complete because the scheduler was stopped in
249
progress or was already stopped.
254
class TaskFinished(SchedulerError):
256
The operation could not complete because the task was already completed,
257
stopped, encountered an error or otherwise permanently stopped running.
262
class TaskDone(TaskFinished):
264
The operation could not complete because the task was already completed.
269
class TaskStopped(TaskFinished):
271
The operation could not complete because the task was stopped.
276
class TaskFailed(TaskFinished):
278
The operation could not complete because the task died with an unhandled
284
class NotPaused(SchedulerError):
286
This exception is raised when a task is resumed which was not previously
292
class _Timer(object):
295
self.end = time.time() + self.MAX_SLICE
299
return time.time() >= self.end
303
_EPSILON = 0.00000001
304
def _defaultScheduler(x):
305
from twisted.internet import reactor
306
return reactor.callLater(_EPSILON, x)
309
class CooperativeTask(object):
311
A L{CooperativeTask} is a task object inside a L{Cooperator}, which can be
312
paused, resumed, and stopped. It can also have its completion (or
313
termination) monitored.
315
@see: L{CooperativeTask.cooperate}
317
@ivar _iterator: the iterator to iterate when this L{CooperativeTask} is
320
@ivar _cooperator: the L{Cooperator} that this L{CooperativeTask}
321
participates in, which is used to re-insert it upon resume.
323
@ivar _deferreds: the list of L{defer.Deferred}s to fire when this task
324
completes, fails, or finishes.
326
@type _deferreds: L{list}
328
@type _cooperator: L{Cooperator}
330
@ivar _pauseCount: the number of times that this L{CooperativeTask} has
331
been paused; if 0, it is running.
333
@type _pauseCount: L{int}
335
@ivar _completionState: The completion-state of this L{CooperativeTask}.
336
C{None} if the task is not yet completed, an instance of L{TaskStopped}
337
if C{stop} was called to stop this task early, of L{TaskFailed} if the
338
application code in the iterator raised an exception which caused it to
339
terminate, and of L{TaskDone} if it terminated normally via raising
342
@type _completionState: L{TaskFinished}
345
def __init__(self, iterator, cooperator):
347
A private constructor: to create a new L{CooperativeTask}, see
348
L{Cooperator.cooperate}.
350
self._iterator = iterator
351
self._cooperator = cooperator
354
self._completionState = None
355
self._completionResult = None
356
cooperator._addTask(self)
361
Get a L{defer.Deferred} notification of when this task is complete.
363
@return: a L{defer.Deferred} that fires with the C{iterator} that this
364
L{CooperativeTask} was created with when the iterator has been
365
exhausted (i.e. its C{next} method has raised L{StopIteration}), or
366
fails with the exception raised by C{next} if it raises some other
369
@rtype: L{defer.Deferred}
372
if self._completionState is None:
373
self._deferreds.append(d)
375
d.callback(self._completionResult)
381
Pause this L{CooperativeTask}. Stop doing work until
382
L{CooperativeTask.resume} is called. If C{pause} is called more than
383
once, C{resume} must be called an equal number of times to resume this
386
@raise TaskFinished: if this task has already finished or completed.
389
self._pauseCount += 1
390
if self._pauseCount == 1:
391
self._cooperator._removeTask(self)
396
Resume processing of a paused L{CooperativeTask}.
398
@raise NotPaused: if this L{CooperativeTask} is not paused.
400
if self._pauseCount == 0:
402
self._pauseCount -= 1
403
if self._pauseCount == 0 and self._completionState is None:
404
self._cooperator._addTask(self)
407
def _completeWith(self, completionState, deferredResult):
409
@param completionState: a L{TaskFinished} exception or a subclass
410
thereof, indicating what exception should be raised when subsequent
411
operations are performed.
413
@param deferredResult: the result to fire all the deferreds with.
415
self._completionState = completionState
416
self._completionResult = deferredResult
417
if not self._pauseCount:
418
self._cooperator._removeTask(self)
420
# The Deferreds need to be invoked after all this is completed, because
421
# a Deferred may want to manipulate other tasks in a Cooperator. For
422
# example, if you call "stop()" on a cooperator in a callback on a
423
# Deferred returned from whenDone(), this CooperativeTask must be gone
424
# from the Cooperator by that point so that _completeWith is not
425
# invoked reentrantly; that would cause these Deferreds to blow up with
426
# an AlreadyCalledError, or the _removeTask to fail with a ValueError.
427
for d in self._deferreds:
428
d.callback(deferredResult)
433
Stop further processing of this task.
435
@raise TaskFinished: if this L{CooperativeTask} has previously
436
completed, via C{stop}, completion, or failure.
439
self._completeWith(TaskStopped(), Failure(TaskStopped()))
442
def _checkFinish(self):
444
If this task has been stopped, raise the appropriate subclass of
447
if self._completionState is not None:
448
raise self._completionState
451
def _oneWorkUnit(self):
453
Perform one unit of work for this task, retrieving one item from its
454
iterator, stopping if there are no further items in the iterator, and
455
pausing if the result was a L{defer.Deferred}.
458
result = self._iterator.next()
459
except StopIteration:
460
self._completeWith(TaskDone(), self._iterator)
462
self._completeWith(TaskFailed(), Failure())
464
if isinstance(result, defer.Deferred):
467
self._completeWith(TaskFailed(), f)
468
result.addCallbacks(lambda result: self.resume(),
473
class Cooperator(object):
475
Cooperative task scheduler.
479
terminationPredicateFactory=_Timer,
480
scheduler=_defaultScheduler,
483
Create a scheduler-like object to which iterators may be added.
485
@param terminationPredicateFactory: A no-argument callable which will
486
be invoked at the beginning of each step and should return a
487
no-argument callable which will return False when the step should be
488
terminated. The default factory is time-based and allows iterators to
489
run for 1/100th of a second at a time.
491
@param scheduler: A one-argument callable which takes a no-argument
492
callable and should invoke it at some future point. This will be used
493
to schedule each step of this Cooperator.
495
@param started: A boolean which indicates whether iterators should be
496
stepped as soon as they are added, or if they will be queued up until
497
L{Cooperator.start} is called.
500
self._metarator = iter(())
501
self._terminationPredicateFactory = terminationPredicateFactory
502
self._scheduler = scheduler
503
self._delayedCall = None
504
self._stopped = False
505
self._started = started
508
def coiterate(self, iterator, doneDeferred=None):
510
Add an iterator to the list of iterators this L{Cooperator} is
513
@param doneDeferred: If specified, this will be the Deferred used as
514
the completion deferred. It is suggested that you use the default,
515
which creates a new Deferred for you.
517
@return: a Deferred that will fire when the iterator finishes.
519
if doneDeferred is None:
520
doneDeferred = defer.Deferred()
521
CooperativeTask(iterator, self).whenDone().chainDeferred(doneDeferred)
525
def cooperate(self, iterator):
527
Start running the given iterator as a long-running cooperative task, by
528
calling next() on it as a periodic timed event.
530
@param iterator: the iterator to invoke.
532
@return: a L{CooperativeTask} object representing this task.
534
return CooperativeTask(iterator, self)
537
def _addTask(self, task):
539
Add a L{CooperativeTask} object to this L{Cooperator}.
542
self._tasks.append(task) # XXX silly, I know, but _completeWith
544
task._completeWith(SchedulerStopped(), Failure(SchedulerStopped()))
546
self._tasks.append(task)
550
def _removeTask(self, task):
552
Remove a L{CooperativeTask} from this L{Cooperator}.
554
self._tasks.remove(task)
557
def _tasksWhileNotStopped(self):
559
Yield all L{CooperativeTask} objects in a loop as long as this
560
L{Cooperator}'s termination condition has not been met.
562
terminator = self._terminationPredicateFactory()
564
for t in self._metarator:
568
self._metarator = iter(self._tasks)
573
Run one scheduler tick.
575
self._delayedCall = None
576
for taskObj in self._tasksWhileNotStopped():
577
taskObj._oneWorkUnit()
581
_mustScheduleOnStart = False
582
def _reschedule(self):
583
if not self._started:
584
self._mustScheduleOnStart = True
586
if self._delayedCall is None and self._tasks:
587
self._delayedCall = self._scheduler(self._tick)
592
Begin scheduling steps.
594
self._stopped = False
596
if self._mustScheduleOnStart:
597
del self._mustScheduleOnStart
603
Stop scheduling steps. Errback the completion Deferreds of all
604
iterators which have been added and forget about them.
607
for taskObj in self._tasks:
608
taskObj._completeWith(SchedulerStopped(),
609
Failure(SchedulerStopped()))
611
if self._delayedCall is not None:
612
self._delayedCall.cancel()
613
self._delayedCall = None
617
_theCooperator = Cooperator()
619
def coiterate(iterator):
621
Cooperatively iterate over the given iterator, dividing runtime between it
622
and all other iterators which have been passed to this function and not yet
625
return _theCooperator.coiterate(iterator)
629
def cooperate(iterator):
631
Start running the given iterator as a long-running cooperative task, by
632
calling next() on it as a periodic timed event.
634
@param iterator: the iterator to invoke.
636
@return: a L{CooperativeTask} object representing this task.
638
return _theCooperator.cooperate(iterator)
645
Provide a deterministic, easily-controlled implementation of
646
L{IReactorTime.callLater}. This is commonly useful for writing
647
deterministic unit tests for code which schedules events using this API.
649
implements(IReactorTime)
658
Pretend to be time.time(). This is used internally when an operation
659
such as L{IDelayedCall.reset} needs to determine a a time value
660
relative to the current time.
663
@return: The time which should be considered the current time.
668
def callLater(self, when, what, *a, **kw):
670
See L{twisted.internet.interfaces.IReactorTime.callLater}.
672
dc = base.DelayedCall(self.seconds() + when,
677
self.calls.append(dc)
678
self.calls.sort(lambda a, b: cmp(a.getTime(), b.getTime()))
681
def getDelayedCalls(self):
683
See L{twisted.internet.interfaces.IReactorTime.getDelayedCalls}
687
def advance(self, amount):
689
Move time on this clock forward by the given amount and run whatever
690
pending calls should be run.
692
@type amount: C{float}
693
@param amount: The number of seconds which to advance this clock's
696
self.rightNow += amount
697
while self.calls and self.calls[0].getTime() <= self.seconds():
698
call = self.calls.pop(0)
700
call.func(*call.args, **call.kw)
703
def pump(self, timings):
705
Advance incrementally by the given set of times.
707
@type timings: iterable of C{float}
709
for amount in timings:
713
def deferLater(clock, delay, callable, *args, **kw):
715
Call the given function after a certain period of time has passed.
717
@type clock: L{IReactorTime} provider
718
@param clock: The object which will be used to schedule the delayed
721
@type delay: C{float} or C{int}
722
@param delay: The number of seconds to wait before calling the function.
724
@param callable: The object to call after the delay.
726
@param *args: The positional arguments to pass to C{callable}.
728
@param **kw: The keyword arguments to pass to C{callable}.
730
@rtype: L{defer.Deferred}
732
@return: A deferred that fires with the result of the callable when the
733
specified time has elapsed.
736
d.addCallback(lambda ignored: callable(*args, **kw))
737
clock.callLater(delay, d.callback, None)
747
'SchedulerStopped', 'Cooperator', 'coiterate',