~0x44/nova/bug838466

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/internet/task.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.test.test_task,twisted.test.test_cooperator -*-
 
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
"""
 
6
Scheduling utility methods and classes.
 
7
 
 
8
@author: Jp Calderone
 
9
"""
 
10
 
 
11
__metaclass__ = type
 
12
 
 
13
import time
 
14
 
 
15
from zope.interface import implements
 
16
 
 
17
from twisted.python import reflect
 
18
from twisted.python.failure import Failure
 
19
 
 
20
from twisted.internet import base, defer
 
21
from twisted.internet.interfaces import IReactorTime
 
22
 
 
23
 
 
24
class LoopingCall:
 
25
    """Call a function repeatedly.
 
26
 
 
27
    If C{f} returns a deferred, rescheduling will not take place until the
 
28
    deferred has fired. The result value is ignored.
 
29
 
 
30
    @ivar f: The function to call.
 
31
    @ivar a: A tuple of arguments to pass the function.
 
32
    @ivar kw: A dictionary of keyword arguments to pass to the function.
 
33
    @ivar clock: A provider of
 
34
        L{twisted.internet.interfaces.IReactorTime}.  The default is
 
35
        L{twisted.internet.reactor}. Feel free to set this to
 
36
        something else, but it probably ought to be set *before*
 
37
        calling L{start}.
 
38
 
 
39
    @type _expectNextCallAt: C{float}
 
40
    @ivar _expectNextCallAt: The time at which this instance most recently
 
41
        scheduled itself to run.
 
42
 
 
43
    @type _realLastTime: C{float}
 
44
    @ivar _realLastTime: When counting skips, the time at which the skip counter
 
45
        was last invoked.
 
46
 
 
47
    @type _runAtStart: C{bool}
 
48
    @ivar _runAtStart: A flag indicating whether the 'now' argument was passed
 
49
        to L{LoopingCall.start}.
 
50
    """
 
51
 
 
52
    call = None
 
53
    running = False
 
54
    deferred = None
 
55
    interval = None
 
56
    _expectNextCallAt = 0.0
 
57
    _runAtStart = False
 
58
    starttime = None
 
59
 
 
60
    def __init__(self, f, *a, **kw):
 
61
        self.f = f
 
62
        self.a = a
 
63
        self.kw = kw
 
64
        from twisted.internet import reactor
 
65
        self.clock = reactor
 
66
 
 
67
 
 
68
    def withCount(cls, countCallable):
 
69
        """
 
70
        An alternate constructor for L{LoopingCall} that makes available the
 
71
        number of calls which should have occurred since it was last invoked.
 
72
 
 
73
        Note that this number is an C{int} value; It represents the discrete
 
74
        number of calls that should have been made.  For example, if you are
 
75
        using a looping call to display an animation with discrete frames, this
 
76
        number would be the number of frames to advance.
 
77
 
 
78
        The count is normally 1, but can be higher. For example, if the reactor
 
79
        is blocked and takes too long to invoke the L{LoopingCall}, a Deferred
 
80
        returned from a previous call is not fired before an interval has
 
81
        elapsed, or if the callable itself blocks for longer than an interval,
 
82
        preventing I{itself} from being called.
 
83
 
 
84
        @param countCallable: A callable that will be invoked each time the
 
85
            resulting LoopingCall is run, with an integer specifying the number
 
86
            of calls that should have been invoked.
 
87
 
 
88
        @type countCallable: 1-argument callable which takes an C{int}
 
89
 
 
90
        @return: An instance of L{LoopingCall} with call counting enabled,
 
91
            which provides the count as the first positional argument.
 
92
 
 
93
        @rtype: L{LoopingCall}
 
94
 
 
95
        @since: 9.0
 
96
        """
 
97
 
 
98
        def counter():
 
99
            now = self.clock.seconds()
 
100
            lastTime = self._realLastTime
 
101
            if lastTime is None:
 
102
                lastTime = self.starttime
 
103
                if self._runAtStart:
 
104
                    lastTime -= self.interval
 
105
            self._realLastTime = now
 
106
            lastInterval = self._intervalOf(lastTime)
 
107
            thisInterval = self._intervalOf(now)
 
108
            count = thisInterval - lastInterval
 
109
            return countCallable(count)
 
110
 
 
111
        self = cls(counter)
 
112
 
 
113
        self._realLastTime = None
 
114
 
 
115
        return self
 
116
 
 
117
    withCount = classmethod(withCount)
 
118
 
 
119
 
 
120
    def _intervalOf(self, t):
 
121
        """
 
122
        Determine the number of intervals passed as of the given point in
 
123
        time.
 
124
 
 
125
        @param t: The specified time (from the start of the L{LoopingCall}) to
 
126
            be measured in intervals
 
127
 
 
128
        @return: The C{int} number of intervals which have passed as of the
 
129
            given point in time.
 
130
        """
 
131
        elapsedTime = t - self.starttime
 
132
        intervalNum = int(elapsedTime / self.interval)
 
133
        return intervalNum
 
134
 
 
135
 
 
136
    def start(self, interval, now=True):
 
137
        """
 
138
        Start running function every interval seconds.
 
139
 
 
140
        @param interval: The number of seconds between calls.  May be
 
141
        less than one.  Precision will depend on the underlying
 
142
        platform, the available hardware, and the load on the system.
 
143
 
 
144
        @param now: If True, run this call right now.  Otherwise, wait
 
145
        until the interval has elapsed before beginning.
 
146
 
 
147
        @return: A Deferred whose callback will be invoked with
 
148
        C{self} when C{self.stop} is called, or whose errback will be
 
149
        invoked when the function raises an exception or returned a
 
150
        deferred that has its errback invoked.
 
151
        """
 
152
        assert not self.running, ("Tried to start an already running "
 
153
                                  "LoopingCall.")
 
154
        if interval < 0:
 
155
            raise ValueError, "interval must be >= 0"
 
156
        self.running = True
 
157
        d = self.deferred = defer.Deferred()
 
158
        self.starttime = self.clock.seconds()
 
159
        self._expectNextCallAt = self.starttime
 
160
        self.interval = interval
 
161
        self._runAtStart = now
 
162
        if now:
 
163
            self()
 
164
        else:
 
165
            self._reschedule()
 
166
        return d
 
167
 
 
168
    def stop(self):
 
169
        """Stop running function.
 
170
        """
 
171
        assert self.running, ("Tried to stop a LoopingCall that was "
 
172
                              "not running.")
 
173
        self.running = False
 
174
        if self.call is not None:
 
175
            self.call.cancel()
 
176
            self.call = None
 
177
            d, self.deferred = self.deferred, None
 
178
            d.callback(self)
 
179
 
 
180
    def __call__(self):
 
181
        def cb(result):
 
182
            if self.running:
 
183
                self._reschedule()
 
184
            else:
 
185
                d, self.deferred = self.deferred, None
 
186
                d.callback(self)
 
187
 
 
188
        def eb(failure):
 
189
            self.running = False
 
190
            d, self.deferred = self.deferred, None
 
191
            d.errback(failure)
 
192
 
 
193
        self.call = None
 
194
        d = defer.maybeDeferred(self.f, *self.a, **self.kw)
 
195
        d.addCallback(cb)
 
196
        d.addErrback(eb)
 
197
 
 
198
 
 
199
    def _reschedule(self):
 
200
        """
 
201
        Schedule the next iteration of this looping call.
 
202
        """
 
203
        if self.interval == 0:
 
204
            self.call = self.clock.callLater(0, self)
 
205
            return
 
206
 
 
207
        currentTime = self.clock.seconds()
 
208
        # Find how long is left until the interval comes around again.
 
209
        untilNextTime = (self._expectNextCallAt - currentTime) % self.interval
 
210
        # Make sure it is in the future, in case more than one interval worth
 
211
        # of time passed since the previous call was made.
 
212
        nextTime = max(
 
213
            self._expectNextCallAt + self.interval, currentTime + untilNextTime)
 
214
        # If the interval falls on the current time exactly, skip it and
 
215
        # schedule the call for the next interval.
 
216
        if nextTime == currentTime:
 
217
            nextTime += self.interval
 
218
        self._expectNextCallAt = nextTime
 
219
        self.call = self.clock.callLater(nextTime - currentTime, self)
 
220
 
 
221
 
 
222
    def __repr__(self):
 
223
        if hasattr(self.f, 'func_name'):
 
224
            func = self.f.func_name
 
225
            if hasattr(self.f, 'im_class'):
 
226
                func = self.f.im_class.__name__ + '.' + func
 
227
        else:
 
228
            func = reflect.safe_repr(self.f)
 
229
 
 
230
        return 'LoopingCall<%r>(%s, *%s, **%s)' % (
 
231
            self.interval, func, reflect.safe_repr(self.a),
 
232
            reflect.safe_repr(self.kw))
 
233
 
 
234
 
 
235
 
 
236
class SchedulerError(Exception):
 
237
    """
 
238
    The operation could not be completed because the scheduler or one of its
 
239
    tasks was in an invalid state.  This exception should not be raised
 
240
    directly, but is a superclass of various scheduler-state-related
 
241
    exceptions.
 
242
    """
 
243
 
 
244
 
 
245
 
 
246
class SchedulerStopped(SchedulerError):
 
247
    """
 
248
    The operation could not complete because the scheduler was stopped in
 
249
    progress or was already stopped.
 
250
    """
 
251
 
 
252
 
 
253
 
 
254
class TaskFinished(SchedulerError):
 
255
    """
 
256
    The operation could not complete because the task was already completed,
 
257
    stopped, encountered an error or otherwise permanently stopped running.
 
258
    """
 
259
 
 
260
 
 
261
 
 
262
class TaskDone(TaskFinished):
 
263
    """
 
264
    The operation could not complete because the task was already completed.
 
265
    """
 
266
 
 
267
 
 
268
 
 
269
class TaskStopped(TaskFinished):
 
270
    """
 
271
    The operation could not complete because the task was stopped.
 
272
    """
 
273
 
 
274
 
 
275
 
 
276
class TaskFailed(TaskFinished):
 
277
    """
 
278
    The operation could not complete because the task died with an unhandled
 
279
    error.
 
280
    """
 
281
 
 
282
 
 
283
 
 
284
class NotPaused(SchedulerError):
 
285
    """
 
286
    This exception is raised when a task is resumed which was not previously
 
287
    paused.
 
288
    """
 
289
 
 
290
 
 
291
 
 
292
class _Timer(object):
 
293
    MAX_SLICE = 0.01
 
294
    def __init__(self):
 
295
        self.end = time.time() + self.MAX_SLICE
 
296
 
 
297
 
 
298
    def __call__(self):
 
299
        return time.time() >= self.end
 
300
 
 
301
 
 
302
 
 
303
_EPSILON = 0.00000001
 
304
def _defaultScheduler(x):
 
305
    from twisted.internet import reactor
 
306
    return reactor.callLater(_EPSILON, x)
 
307
 
 
308
 
 
309
class CooperativeTask(object):
 
310
    """
 
311
    A L{CooperativeTask} is a task object inside a L{Cooperator}, which can be
 
312
    paused, resumed, and stopped.  It can also have its completion (or
 
313
    termination) monitored.
 
314
 
 
315
    @see: L{CooperativeTask.cooperate}
 
316
 
 
317
    @ivar _iterator: the iterator to iterate when this L{CooperativeTask} is
 
318
        asked to do work.
 
319
 
 
320
    @ivar _cooperator: the L{Cooperator} that this L{CooperativeTask}
 
321
        participates in, which is used to re-insert it upon resume.
 
322
 
 
323
    @ivar _deferreds: the list of L{defer.Deferred}s to fire when this task
 
324
        completes, fails, or finishes.
 
325
 
 
326
    @type _deferreds: L{list}
 
327
 
 
328
    @type _cooperator: L{Cooperator}
 
329
 
 
330
    @ivar _pauseCount: the number of times that this L{CooperativeTask} has
 
331
        been paused; if 0, it is running.
 
332
 
 
333
    @type _pauseCount: L{int}
 
334
 
 
335
    @ivar _completionState: The completion-state of this L{CooperativeTask}.
 
336
        C{None} if the task is not yet completed, an instance of L{TaskStopped}
 
337
        if C{stop} was called to stop this task early, of L{TaskFailed} if the
 
338
        application code in the iterator raised an exception which caused it to
 
339
        terminate, and of L{TaskDone} if it terminated normally via raising
 
340
        L{StopIteration}.
 
341
 
 
342
    @type _completionState: L{TaskFinished}
 
343
    """
 
344
 
 
345
    def __init__(self, iterator, cooperator):
 
346
        """
 
347
        A private constructor: to create a new L{CooperativeTask}, see
 
348
        L{Cooperator.cooperate}.
 
349
        """
 
350
        self._iterator = iterator
 
351
        self._cooperator = cooperator
 
352
        self._deferreds = []
 
353
        self._pauseCount = 0
 
354
        self._completionState = None
 
355
        self._completionResult = None
 
356
        cooperator._addTask(self)
 
357
 
 
358
 
 
359
    def whenDone(self):
 
360
        """
 
361
        Get a L{defer.Deferred} notification of when this task is complete.
 
362
 
 
363
        @return: a L{defer.Deferred} that fires with the C{iterator} that this
 
364
            L{CooperativeTask} was created with when the iterator has been
 
365
            exhausted (i.e. its C{next} method has raised L{StopIteration}), or
 
366
            fails with the exception raised by C{next} if it raises some other
 
367
            exception.
 
368
 
 
369
        @rtype: L{defer.Deferred}
 
370
        """
 
371
        d = defer.Deferred()
 
372
        if self._completionState is None:
 
373
            self._deferreds.append(d)
 
374
        else:
 
375
            d.callback(self._completionResult)
 
376
        return d
 
377
 
 
378
 
 
379
    def pause(self):
 
380
        """
 
381
        Pause this L{CooperativeTask}.  Stop doing work until
 
382
        L{CooperativeTask.resume} is called.  If C{pause} is called more than
 
383
        once, C{resume} must be called an equal number of times to resume this
 
384
        task.
 
385
 
 
386
        @raise TaskFinished: if this task has already finished or completed.
 
387
        """
 
388
        self._checkFinish()
 
389
        self._pauseCount += 1
 
390
        if self._pauseCount == 1:
 
391
            self._cooperator._removeTask(self)
 
392
 
 
393
 
 
394
    def resume(self):
 
395
        """
 
396
        Resume processing of a paused L{CooperativeTask}.
 
397
 
 
398
        @raise NotPaused: if this L{CooperativeTask} is not paused.
 
399
        """
 
400
        if self._pauseCount == 0:
 
401
            raise NotPaused()
 
402
        self._pauseCount -= 1
 
403
        if self._pauseCount == 0 and self._completionState is None:
 
404
            self._cooperator._addTask(self)
 
405
 
 
406
 
 
407
    def _completeWith(self, completionState, deferredResult):
 
408
        """
 
409
        @param completionState: a L{TaskFinished} exception or a subclass
 
410
            thereof, indicating what exception should be raised when subsequent
 
411
            operations are performed.
 
412
 
 
413
        @param deferredResult: the result to fire all the deferreds with.
 
414
        """
 
415
        self._completionState = completionState
 
416
        self._completionResult = deferredResult
 
417
        if not self._pauseCount:
 
418
            self._cooperator._removeTask(self)
 
419
 
 
420
        # The Deferreds need to be invoked after all this is completed, because
 
421
        # a Deferred may want to manipulate other tasks in a Cooperator.  For
 
422
        # example, if you call "stop()" on a cooperator in a callback on a
 
423
        # Deferred returned from whenDone(), this CooperativeTask must be gone
 
424
        # from the Cooperator by that point so that _completeWith is not
 
425
        # invoked reentrantly; that would cause these Deferreds to blow up with
 
426
        # an AlreadyCalledError, or the _removeTask to fail with a ValueError.
 
427
        for d in self._deferreds:
 
428
            d.callback(deferredResult)
 
429
 
 
430
 
 
431
    def stop(self):
 
432
        """
 
433
        Stop further processing of this task.
 
434
 
 
435
        @raise TaskFinished: if this L{CooperativeTask} has previously
 
436
            completed, via C{stop}, completion, or failure.
 
437
        """
 
438
        self._checkFinish()
 
439
        self._completeWith(TaskStopped(), Failure(TaskStopped()))
 
440
 
 
441
 
 
442
    def _checkFinish(self):
 
443
        """
 
444
        If this task has been stopped, raise the appropriate subclass of
 
445
        L{TaskFinished}.
 
446
        """
 
447
        if self._completionState is not None:
 
448
            raise self._completionState
 
449
 
 
450
 
 
451
    def _oneWorkUnit(self):
 
452
        """
 
453
        Perform one unit of work for this task, retrieving one item from its
 
454
        iterator, stopping if there are no further items in the iterator, and
 
455
        pausing if the result was a L{defer.Deferred}.
 
456
        """
 
457
        try:
 
458
            result = self._iterator.next()
 
459
        except StopIteration:
 
460
            self._completeWith(TaskDone(), self._iterator)
 
461
        except:
 
462
            self._completeWith(TaskFailed(), Failure())
 
463
        else:
 
464
            if isinstance(result, defer.Deferred):
 
465
                self.pause()
 
466
                def failLater(f):
 
467
                    self._completeWith(TaskFailed(), f)
 
468
                result.addCallbacks(lambda result: self.resume(),
 
469
                                    failLater)
 
470
 
 
471
 
 
472
 
 
473
class Cooperator(object):
 
474
    """
 
475
    Cooperative task scheduler.
 
476
    """
 
477
 
 
478
    def __init__(self,
 
479
                 terminationPredicateFactory=_Timer,
 
480
                 scheduler=_defaultScheduler,
 
481
                 started=True):
 
482
        """
 
483
        Create a scheduler-like object to which iterators may be added.
 
484
 
 
485
        @param terminationPredicateFactory: A no-argument callable which will
 
486
        be invoked at the beginning of each step and should return a
 
487
        no-argument callable which will return False when the step should be
 
488
        terminated.  The default factory is time-based and allows iterators to
 
489
        run for 1/100th of a second at a time.
 
490
 
 
491
        @param scheduler: A one-argument callable which takes a no-argument
 
492
        callable and should invoke it at some future point.  This will be used
 
493
        to schedule each step of this Cooperator.
 
494
 
 
495
        @param started: A boolean which indicates whether iterators should be
 
496
        stepped as soon as they are added, or if they will be queued up until
 
497
        L{Cooperator.start} is called.
 
498
        """
 
499
        self._tasks = []
 
500
        self._metarator = iter(())
 
501
        self._terminationPredicateFactory = terminationPredicateFactory
 
502
        self._scheduler = scheduler
 
503
        self._delayedCall = None
 
504
        self._stopped = False
 
505
        self._started = started
 
506
 
 
507
 
 
508
    def coiterate(self, iterator, doneDeferred=None):
 
509
        """
 
510
        Add an iterator to the list of iterators this L{Cooperator} is
 
511
        currently running.
 
512
 
 
513
        @param doneDeferred: If specified, this will be the Deferred used as
 
514
            the completion deferred.  It is suggested that you use the default,
 
515
            which creates a new Deferred for you.
 
516
 
 
517
        @return: a Deferred that will fire when the iterator finishes.
 
518
        """
 
519
        if doneDeferred is None:
 
520
            doneDeferred = defer.Deferred()
 
521
        CooperativeTask(iterator, self).whenDone().chainDeferred(doneDeferred)
 
522
        return doneDeferred
 
523
 
 
524
 
 
525
    def cooperate(self, iterator):
 
526
        """
 
527
        Start running the given iterator as a long-running cooperative task, by
 
528
        calling next() on it as a periodic timed event.
 
529
 
 
530
        @param iterator: the iterator to invoke.
 
531
 
 
532
        @return: a L{CooperativeTask} object representing this task.
 
533
        """
 
534
        return CooperativeTask(iterator, self)
 
535
 
 
536
 
 
537
    def _addTask(self, task):
 
538
        """
 
539
        Add a L{CooperativeTask} object to this L{Cooperator}.
 
540
        """
 
541
        if self._stopped:
 
542
            self._tasks.append(task) # XXX silly, I know, but _completeWith
 
543
                                     # does the inverse
 
544
            task._completeWith(SchedulerStopped(), Failure(SchedulerStopped()))
 
545
        else:
 
546
            self._tasks.append(task)
 
547
            self._reschedule()
 
548
 
 
549
 
 
550
    def _removeTask(self, task):
 
551
        """
 
552
        Remove a L{CooperativeTask} from this L{Cooperator}.
 
553
        """
 
554
        self._tasks.remove(task)
 
555
 
 
556
 
 
557
    def _tasksWhileNotStopped(self):
 
558
        """
 
559
        Yield all L{CooperativeTask} objects in a loop as long as this
 
560
        L{Cooperator}'s termination condition has not been met.
 
561
        """
 
562
        terminator = self._terminationPredicateFactory()
 
563
        while self._tasks:
 
564
            for t in self._metarator:
 
565
                yield t
 
566
                if terminator():
 
567
                    return
 
568
            self._metarator = iter(self._tasks)
 
569
 
 
570
 
 
571
    def _tick(self):
 
572
        """
 
573
        Run one scheduler tick.
 
574
        """
 
575
        self._delayedCall = None
 
576
        for taskObj in self._tasksWhileNotStopped():
 
577
            taskObj._oneWorkUnit()
 
578
        self._reschedule()
 
579
 
 
580
 
 
581
    _mustScheduleOnStart = False
 
582
    def _reschedule(self):
 
583
        if not self._started:
 
584
            self._mustScheduleOnStart = True
 
585
            return
 
586
        if self._delayedCall is None and self._tasks:
 
587
            self._delayedCall = self._scheduler(self._tick)
 
588
 
 
589
 
 
590
    def start(self):
 
591
        """
 
592
        Begin scheduling steps.
 
593
        """
 
594
        self._stopped = False
 
595
        self._started = True
 
596
        if self._mustScheduleOnStart:
 
597
            del self._mustScheduleOnStart
 
598
            self._reschedule()
 
599
 
 
600
 
 
601
    def stop(self):
 
602
        """
 
603
        Stop scheduling steps.  Errback the completion Deferreds of all
 
604
        iterators which have been added and forget about them.
 
605
        """
 
606
        self._stopped = True
 
607
        for taskObj in self._tasks:
 
608
            taskObj._completeWith(SchedulerStopped(),
 
609
                                  Failure(SchedulerStopped()))
 
610
        self._tasks = []
 
611
        if self._delayedCall is not None:
 
612
            self._delayedCall.cancel()
 
613
            self._delayedCall = None
 
614
 
 
615
 
 
616
 
 
617
_theCooperator = Cooperator()
 
618
 
 
619
def coiterate(iterator):
 
620
    """
 
621
    Cooperatively iterate over the given iterator, dividing runtime between it
 
622
    and all other iterators which have been passed to this function and not yet
 
623
    exhausted.
 
624
    """
 
625
    return _theCooperator.coiterate(iterator)
 
626
 
 
627
 
 
628
 
 
629
def cooperate(iterator):
 
630
    """
 
631
    Start running the given iterator as a long-running cooperative task, by
 
632
    calling next() on it as a periodic timed event.
 
633
 
 
634
    @param iterator: the iterator to invoke.
 
635
 
 
636
    @return: a L{CooperativeTask} object representing this task.
 
637
    """
 
638
    return _theCooperator.cooperate(iterator)
 
639
 
 
640
 
 
641
 
 
642
 
 
643
class Clock:
 
644
    """
 
645
    Provide a deterministic, easily-controlled implementation of
 
646
    L{IReactorTime.callLater}.  This is commonly useful for writing
 
647
    deterministic unit tests for code which schedules events using this API.
 
648
    """
 
649
    implements(IReactorTime)
 
650
 
 
651
    rightNow = 0.0
 
652
 
 
653
    def __init__(self):
 
654
        self.calls = []
 
655
 
 
656
    def seconds(self):
 
657
        """
 
658
        Pretend to be time.time().  This is used internally when an operation
 
659
        such as L{IDelayedCall.reset} needs to determine a a time value
 
660
        relative to the current time.
 
661
 
 
662
        @rtype: C{float}
 
663
        @return: The time which should be considered the current time.
 
664
        """
 
665
        return self.rightNow
 
666
 
 
667
 
 
668
    def callLater(self, when, what, *a, **kw):
 
669
        """
 
670
        See L{twisted.internet.interfaces.IReactorTime.callLater}.
 
671
        """
 
672
        dc = base.DelayedCall(self.seconds() + when,
 
673
                               what, a, kw,
 
674
                               self.calls.remove,
 
675
                               lambda c: None,
 
676
                               self.seconds)
 
677
        self.calls.append(dc)
 
678
        self.calls.sort(lambda a, b: cmp(a.getTime(), b.getTime()))
 
679
        return dc
 
680
 
 
681
    def getDelayedCalls(self):
 
682
        """
 
683
        See L{twisted.internet.interfaces.IReactorTime.getDelayedCalls}
 
684
        """
 
685
        return self.calls
 
686
 
 
687
    def advance(self, amount):
 
688
        """
 
689
        Move time on this clock forward by the given amount and run whatever
 
690
        pending calls should be run.
 
691
 
 
692
        @type amount: C{float}
 
693
        @param amount: The number of seconds which to advance this clock's
 
694
        time.
 
695
        """
 
696
        self.rightNow += amount
 
697
        while self.calls and self.calls[0].getTime() <= self.seconds():
 
698
            call = self.calls.pop(0)
 
699
            call.called = 1
 
700
            call.func(*call.args, **call.kw)
 
701
 
 
702
 
 
703
    def pump(self, timings):
 
704
        """
 
705
        Advance incrementally by the given set of times.
 
706
 
 
707
        @type timings: iterable of C{float}
 
708
        """
 
709
        for amount in timings:
 
710
            self.advance(amount)
 
711
 
 
712
 
 
713
def deferLater(clock, delay, callable, *args, **kw):
 
714
    """
 
715
    Call the given function after a certain period of time has passed.
 
716
 
 
717
    @type clock: L{IReactorTime} provider
 
718
    @param clock: The object which will be used to schedule the delayed
 
719
        call.
 
720
 
 
721
    @type delay: C{float} or C{int}
 
722
    @param delay: The number of seconds to wait before calling the function.
 
723
 
 
724
    @param callable: The object to call after the delay.
 
725
 
 
726
    @param *args: The positional arguments to pass to C{callable}.
 
727
 
 
728
    @param **kw: The keyword arguments to pass to C{callable}.
 
729
 
 
730
    @rtype: L{defer.Deferred}
 
731
 
 
732
    @return: A deferred that fires with the result of the callable when the
 
733
        specified time has elapsed.
 
734
    """
 
735
    d = defer.Deferred()
 
736
    d.addCallback(lambda ignored: callable(*args, **kw))
 
737
    clock.callLater(delay, d.callback, None)
 
738
    return d
 
739
 
 
740
 
 
741
 
 
742
__all__ = [
 
743
    'LoopingCall',
 
744
 
 
745
    'Clock',
 
746
 
 
747
    'SchedulerStopped', 'Cooperator', 'coiterate',
 
748
 
 
749
    'deferLater',
 
750
    ]