~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/internet/base.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.test.test_internet -*-
 
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
"""
 
6
Very basic functionality for a Reactor implementation.
 
7
"""
 
8
 
 
9
import socket # needed only for sync-dns
 
10
from zope.interface import implements, classImplements
 
11
 
 
12
import sys
 
13
import warnings
 
14
from heapq import heappush, heappop, heapify
 
15
 
 
16
import traceback
 
17
 
 
18
from twisted.python.compat import set
 
19
from twisted.python.util import unsignedID
 
20
from twisted.internet.interfaces import IReactorCore, IReactorTime, IReactorThreads
 
21
from twisted.internet.interfaces import IResolverSimple, IReactorPluggableResolver
 
22
from twisted.internet.interfaces import IConnector, IDelayedCall
 
23
from twisted.internet import fdesc, main, error, abstract, defer, threads
 
24
from twisted.python import log, failure, reflect
 
25
from twisted.python.runtime import seconds as runtimeSeconds, platform, platformType
 
26
from twisted.internet.defer import Deferred, DeferredList
 
27
from twisted.persisted import styles
 
28
 
 
29
# This import is for side-effects!  Even if you don't see any code using it
 
30
# in this module, don't delete it.
 
31
from twisted.python import threadable
 
32
 
 
33
 
 
34
class DelayedCall(styles.Ephemeral):
 
35
 
 
36
    implements(IDelayedCall)
 
37
    # enable .debug to record creator call stack, and it will be logged if
 
38
    # an exception occurs while the function is being run
 
39
    debug = False
 
40
    _str = None
 
41
 
 
42
    def __init__(self, time, func, args, kw, cancel, reset,
 
43
                 seconds=runtimeSeconds):
 
44
        """
 
45
        @param time: Seconds from the epoch at which to call C{func}.
 
46
        @param func: The callable to call.
 
47
        @param args: The positional arguments to pass to the callable.
 
48
        @param kw: The keyword arguments to pass to the callable.
 
49
        @param cancel: A callable which will be called with this
 
50
            DelayedCall before cancellation.
 
51
        @param reset: A callable which will be called with this
 
52
            DelayedCall after changing this DelayedCall's scheduled
 
53
            execution time. The callable should adjust any necessary
 
54
            scheduling details to ensure this DelayedCall is invoked
 
55
            at the new appropriate time.
 
56
        @param seconds: If provided, a no-argument callable which will be
 
57
            used to determine the current time any time that information is
 
58
            needed.
 
59
        """
 
60
        self.time, self.func, self.args, self.kw = time, func, args, kw
 
61
        self.resetter = reset
 
62
        self.canceller = cancel
 
63
        self.seconds = seconds
 
64
        self.cancelled = self.called = 0
 
65
        self.delayed_time = 0
 
66
        if self.debug:
 
67
            self.creator = traceback.format_stack()[:-2]
 
68
 
 
69
    def getTime(self):
 
70
        """Return the time at which this call will fire
 
71
 
 
72
        @rtype: C{float}
 
73
        @return: The number of seconds after the epoch at which this call is
 
74
        scheduled to be made.
 
75
        """
 
76
        return self.time + self.delayed_time
 
77
 
 
78
    def cancel(self):
 
79
        """Unschedule this call
 
80
 
 
81
        @raise AlreadyCancelled: Raised if this call has already been
 
82
        unscheduled.
 
83
 
 
84
        @raise AlreadyCalled: Raised if this call has already been made.
 
85
        """
 
86
        if self.cancelled:
 
87
            raise error.AlreadyCancelled
 
88
        elif self.called:
 
89
            raise error.AlreadyCalled
 
90
        else:
 
91
            self.canceller(self)
 
92
            self.cancelled = 1
 
93
            if self.debug:
 
94
                self._str = str(self)
 
95
            del self.func, self.args, self.kw
 
96
 
 
97
    def reset(self, secondsFromNow):
 
98
        """Reschedule this call for a different time
 
99
 
 
100
        @type secondsFromNow: C{float}
 
101
        @param secondsFromNow: The number of seconds from the time of the
 
102
        C{reset} call at which this call will be scheduled.
 
103
 
 
104
        @raise AlreadyCancelled: Raised if this call has been cancelled.
 
105
        @raise AlreadyCalled: Raised if this call has already been made.
 
106
        """
 
107
        if self.cancelled:
 
108
            raise error.AlreadyCancelled
 
109
        elif self.called:
 
110
            raise error.AlreadyCalled
 
111
        else:
 
112
            newTime = self.seconds() + secondsFromNow
 
113
            if newTime < self.time:
 
114
                self.delayed_time = 0
 
115
                self.time = newTime
 
116
                self.resetter(self)
 
117
            else:
 
118
                self.delayed_time = newTime - self.time
 
119
 
 
120
    def delay(self, secondsLater):
 
121
        """Reschedule this call for a later time
 
122
 
 
123
        @type secondsLater: C{float}
 
124
        @param secondsLater: The number of seconds after the originally
 
125
        scheduled time for which to reschedule this call.
 
126
 
 
127
        @raise AlreadyCancelled: Raised if this call has been cancelled.
 
128
        @raise AlreadyCalled: Raised if this call has already been made.
 
129
        """
 
130
        if self.cancelled:
 
131
            raise error.AlreadyCancelled
 
132
        elif self.called:
 
133
            raise error.AlreadyCalled
 
134
        else:
 
135
            self.delayed_time += secondsLater
 
136
            if self.delayed_time < 0:
 
137
                self.activate_delay()
 
138
                self.resetter(self)
 
139
 
 
140
    def activate_delay(self):
 
141
        self.time += self.delayed_time
 
142
        self.delayed_time = 0
 
143
 
 
144
    def active(self):
 
145
        """Determine whether this call is still pending
 
146
 
 
147
        @rtype: C{bool}
 
148
        @return: True if this call has not yet been made or cancelled,
 
149
        False otherwise.
 
150
        """
 
151
        return not (self.cancelled or self.called)
 
152
 
 
153
    def __le__(self, other):
 
154
        return self.time <= other.time
 
155
 
 
156
 
 
157
    def __str__(self):
 
158
        if self._str is not None:
 
159
            return self._str
 
160
        if hasattr(self, 'func'):
 
161
            if hasattr(self.func, 'func_name'):
 
162
                func = self.func.func_name
 
163
                if hasattr(self.func, 'im_class'):
 
164
                    func = self.func.im_class.__name__ + '.' + func
 
165
            else:
 
166
                func = reflect.safe_repr(self.func)
 
167
        else:
 
168
            func = None
 
169
 
 
170
        now = self.seconds()
 
171
        L = ["<DelayedCall 0x%x [%ss] called=%s cancelled=%s" % (
 
172
                unsignedID(self), self.time - now, self.called,
 
173
                self.cancelled)]
 
174
        if func is not None:
 
175
            L.extend((" ", func, "("))
 
176
            if self.args:
 
177
                L.append(", ".join([reflect.safe_repr(e) for e in self.args]))
 
178
                if self.kw:
 
179
                    L.append(", ")
 
180
            if self.kw:
 
181
                L.append(", ".join(['%s=%s' % (k, reflect.safe_repr(v)) for (k, v) in self.kw.iteritems()]))
 
182
            L.append(")")
 
183
 
 
184
        if self.debug:
 
185
            L.append("\n\ntraceback at creation: \n\n%s" % ('    '.join(self.creator)))
 
186
        L.append('>')
 
187
 
 
188
        return "".join(L)
 
189
 
 
190
 
 
191
 
 
192
class ThreadedResolver(object):
 
193
    """
 
194
    L{ThreadedResolver} uses a reactor, a threadpool, and
 
195
    L{socket.gethostbyname} to perform name lookups without blocking the
 
196
    reactor thread.  It also supports timeouts indepedently from whatever
 
197
    timeout logic L{socket.gethostbyname} might have.
 
198
 
 
199
    @ivar reactor: The reactor the threadpool of which will be used to call
 
200
        L{socket.gethostbyname} and the I/O thread of which the result will be
 
201
        delivered.
 
202
    """
 
203
    implements(IResolverSimple)
 
204
 
 
205
    def __init__(self, reactor):
 
206
        self.reactor = reactor
 
207
        self._runningQueries = {}
 
208
 
 
209
 
 
210
    def _fail(self, name, err):
 
211
        err = error.DNSLookupError("address %r not found: %s" % (name, err))
 
212
        return failure.Failure(err)
 
213
 
 
214
 
 
215
    def _cleanup(self, name, lookupDeferred):
 
216
        userDeferred, cancelCall = self._runningQueries[lookupDeferred]
 
217
        del self._runningQueries[lookupDeferred]
 
218
        userDeferred.errback(self._fail(name, "timeout error"))
 
219
 
 
220
 
 
221
    def _checkTimeout(self, result, name, lookupDeferred):
 
222
        try:
 
223
            userDeferred, cancelCall = self._runningQueries[lookupDeferred]
 
224
        except KeyError:
 
225
            pass
 
226
        else:
 
227
            del self._runningQueries[lookupDeferred]
 
228
            cancelCall.cancel()
 
229
 
 
230
            if isinstance(result, failure.Failure):
 
231
                userDeferred.errback(self._fail(name, result.getErrorMessage()))
 
232
            else:
 
233
                userDeferred.callback(result)
 
234
 
 
235
 
 
236
    def getHostByName(self, name, timeout = (1, 3, 11, 45)):
 
237
        """
 
238
        See L{twisted.internet.interfaces.IResolverSimple.getHostByName}.
 
239
 
 
240
        Note that the elements of C{timeout} are summed and the result is used
 
241
        as a timeout for the lookup.  Any intermediate timeout or retry logic
 
242
        is left up to the platform via L{socket.gethostbyname}.
 
243
        """
 
244
        if timeout:
 
245
            timeoutDelay = sum(timeout)
 
246
        else:
 
247
            timeoutDelay = 60
 
248
        userDeferred = defer.Deferred()
 
249
        lookupDeferred = threads.deferToThreadPool(
 
250
            self.reactor, self.reactor.getThreadPool(),
 
251
            socket.gethostbyname, name)
 
252
        cancelCall = self.reactor.callLater(
 
253
            timeoutDelay, self._cleanup, name, lookupDeferred)
 
254
        self._runningQueries[lookupDeferred] = (userDeferred, cancelCall)
 
255
        lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred)
 
256
        return userDeferred
 
257
 
 
258
 
 
259
 
 
260
class BlockingResolver:
 
261
    implements(IResolverSimple)
 
262
 
 
263
    def getHostByName(self, name, timeout = (1, 3, 11, 45)):
 
264
        try:
 
265
            address = socket.gethostbyname(name)
 
266
        except socket.error:
 
267
            msg = "address %r not found" % (name,)
 
268
            err = error.DNSLookupError(msg)
 
269
            return defer.fail(err)
 
270
        else:
 
271
            return defer.succeed(address)
 
272
 
 
273
 
 
274
class _ThreePhaseEvent(object):
 
275
    """
 
276
    Collection of callables (with arguments) which can be invoked as a group in
 
277
    a particular order.
 
278
 
 
279
    This provides the underlying implementation for the reactor's system event
 
280
    triggers.  An instance of this class tracks triggers for all phases of a
 
281
    single type of event.
 
282
 
 
283
    @ivar before: A list of the before-phase triggers containing three-tuples
 
284
        of a callable, a tuple of positional arguments, and a dict of keyword
 
285
        arguments
 
286
 
 
287
    @ivar finishedBefore: A list of the before-phase triggers which have
 
288
        already been executed.  This is only populated in the C{'BEFORE'} state.
 
289
 
 
290
    @ivar during: A list of the during-phase triggers containing three-tuples
 
291
        of a callable, a tuple of positional arguments, and a dict of keyword
 
292
        arguments
 
293
 
 
294
    @ivar after: A list of the after-phase triggers containing three-tuples
 
295
        of a callable, a tuple of positional arguments, and a dict of keyword
 
296
        arguments
 
297
 
 
298
    @ivar state: A string indicating what is currently going on with this
 
299
        object.  One of C{'BASE'} (for when nothing in particular is happening;
 
300
        this is the initial value), C{'BEFORE'} (when the before-phase triggers
 
301
        are in the process of being executed).
 
302
    """
 
303
    def __init__(self):
 
304
        self.before = []
 
305
        self.during = []
 
306
        self.after = []
 
307
        self.state = 'BASE'
 
308
 
 
309
 
 
310
    def addTrigger(self, phase, callable, *args, **kwargs):
 
311
        """
 
312
        Add a trigger to the indicate phase.
 
313
 
 
314
        @param phase: One of C{'before'}, C{'during'}, or C{'after'}.
 
315
 
 
316
        @param callable: An object to be called when this event is triggered.
 
317
        @param *args: Positional arguments to pass to C{callable}.
 
318
        @param **kwargs: Keyword arguments to pass to C{callable}.
 
319
 
 
320
        @return: An opaque handle which may be passed to L{removeTrigger} to
 
321
            reverse the effects of calling this method.
 
322
        """
 
323
        if phase not in ('before', 'during', 'after'):
 
324
            raise KeyError("invalid phase")
 
325
        getattr(self, phase).append((callable, args, kwargs))
 
326
        return phase, callable, args, kwargs
 
327
 
 
328
 
 
329
    def removeTrigger(self, handle):
 
330
        """
 
331
        Remove a previously added trigger callable.
 
332
 
 
333
        @param handle: An object previously returned by L{addTrigger}.  The
 
334
            trigger added by that call will be removed.
 
335
 
 
336
        @raise ValueError: If the trigger associated with C{handle} has already
 
337
            been removed or if C{handle} is not a valid handle.
 
338
        """
 
339
        return getattr(self, 'removeTrigger_' + self.state)(handle)
 
340
 
 
341
 
 
342
    def removeTrigger_BASE(self, handle):
 
343
        """
 
344
        Just try to remove the trigger.
 
345
 
 
346
        @see: removeTrigger
 
347
        """
 
348
        try:
 
349
            phase, callable, args, kwargs = handle
 
350
        except (TypeError, ValueError), e:
 
351
            raise ValueError("invalid trigger handle")
 
352
        else:
 
353
            if phase not in ('before', 'during', 'after'):
 
354
                raise KeyError("invalid phase")
 
355
            getattr(self, phase).remove((callable, args, kwargs))
 
356
 
 
357
 
 
358
    def removeTrigger_BEFORE(self, handle):
 
359
        """
 
360
        Remove the trigger if it has yet to be executed, otherwise emit a
 
361
        warning that in the future an exception will be raised when removing an
 
362
        already-executed trigger.
 
363
 
 
364
        @see: removeTrigger
 
365
        """
 
366
        phase, callable, args, kwargs = handle
 
367
        if phase != 'before':
 
368
            return self.removeTrigger_BASE(handle)
 
369
        if (callable, args, kwargs) in self.finishedBefore:
 
370
            warnings.warn(
 
371
                "Removing already-fired system event triggers will raise an "
 
372
                "exception in a future version of Twisted.",
 
373
                category=DeprecationWarning,
 
374
                stacklevel=3)
 
375
        else:
 
376
            self.removeTrigger_BASE(handle)
 
377
 
 
378
 
 
379
    def fireEvent(self):
 
380
        """
 
381
        Call the triggers added to this event.
 
382
        """
 
383
        self.state = 'BEFORE'
 
384
        self.finishedBefore = []
 
385
        beforeResults = []
 
386
        while self.before:
 
387
            callable, args, kwargs = self.before.pop(0)
 
388
            self.finishedBefore.append((callable, args, kwargs))
 
389
            try:
 
390
                result = callable(*args, **kwargs)
 
391
            except:
 
392
                log.err()
 
393
            else:
 
394
                if isinstance(result, Deferred):
 
395
                    beforeResults.append(result)
 
396
        DeferredList(beforeResults).addCallback(self._continueFiring)
 
397
 
 
398
 
 
399
    def _continueFiring(self, ignored):
 
400
        """
 
401
        Call the during and after phase triggers for this event.
 
402
        """
 
403
        self.state = 'BASE'
 
404
        self.finishedBefore = []
 
405
        for phase in self.during, self.after:
 
406
            while phase:
 
407
                callable, args, kwargs = phase.pop(0)
 
408
                try:
 
409
                    callable(*args, **kwargs)
 
410
                except:
 
411
                    log.err()
 
412
 
 
413
 
 
414
 
 
415
class ReactorBase(object):
 
416
    """
 
417
    Default base class for Reactors.
 
418
 
 
419
    @type _stopped: C{bool}
 
420
    @ivar _stopped: A flag which is true between paired calls to C{reactor.run}
 
421
        and C{reactor.stop}.  This should be replaced with an explicit state
 
422
        machine.
 
423
 
 
424
    @type _justStopped: C{bool}
 
425
    @ivar _justStopped: A flag which is true between the time C{reactor.stop}
 
426
        is called and the time the shutdown system event is fired.  This is
 
427
        used to determine whether that event should be fired after each
 
428
        iteration through the mainloop.  This should be replaced with an
 
429
        explicit state machine.
 
430
 
 
431
    @type _started: C{bool}
 
432
    @ivar _started: A flag which is true from the time C{reactor.run} is called
 
433
        until the time C{reactor.run} returns.  This is used to prevent calls
 
434
        to C{reactor.run} on a running reactor.  This should be replaced with
 
435
        an explicit state machine.
 
436
 
 
437
    @ivar running: See L{IReactorCore.running}
 
438
    """
 
439
    implements(IReactorCore, IReactorTime, IReactorPluggableResolver)
 
440
 
 
441
    _stopped = True
 
442
    installed = False
 
443
    usingThreads = False
 
444
    resolver = BlockingResolver()
 
445
 
 
446
    __name__ = "twisted.internet.reactor"
 
447
 
 
448
    def __init__(self):
 
449
        self.threadCallQueue = []
 
450
        self._eventTriggers = {}
 
451
        self._pendingTimedCalls = []
 
452
        self._newTimedCalls = []
 
453
        self._cancellations = 0
 
454
        self.running = False
 
455
        self._started = False
 
456
        self._justStopped = False
 
457
        # reactor internal readers, e.g. the waker.
 
458
        self._internalReaders = set()
 
459
        self.waker = None
 
460
 
 
461
        # Arrange for the running attribute to change to True at the right time
 
462
        # and let a subclass possibly do other things at that time (eg install
 
463
        # signal handlers).
 
464
        self.addSystemEventTrigger(
 
465
            'during', 'startup', self._reallyStartRunning)
 
466
        self.addSystemEventTrigger('during', 'shutdown', self.crash)
 
467
        self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
 
468
 
 
469
        if platform.supportsThreads():
 
470
            self._initThreads()
 
471
 
 
472
    # override in subclasses
 
473
 
 
474
    _lock = None
 
475
 
 
476
    def installWaker(self):
 
477
        raise NotImplementedError(
 
478
            reflect.qual(self.__class__) + " did not implement installWaker")
 
479
 
 
480
    def installResolver(self, resolver):
 
481
        assert IResolverSimple.providedBy(resolver)
 
482
        oldResolver = self.resolver
 
483
        self.resolver = resolver
 
484
        return oldResolver
 
485
 
 
486
    def wakeUp(self):
 
487
        """
 
488
        Wake up the event loop.
 
489
        """
 
490
        if self.waker:
 
491
            self.waker.wakeUp()
 
492
        # if the waker isn't installed, the reactor isn't running, and
 
493
        # therefore doesn't need to be woken up
 
494
 
 
495
    def doIteration(self, delay):
 
496
        """
 
497
        Do one iteration over the readers and writers which have been added.
 
498
        """
 
499
        raise NotImplementedError(
 
500
            reflect.qual(self.__class__) + " did not implement doIteration")
 
501
 
 
502
    def addReader(self, reader):
 
503
        raise NotImplementedError(
 
504
            reflect.qual(self.__class__) + " did not implement addReader")
 
505
 
 
506
    def addWriter(self, writer):
 
507
        raise NotImplementedError(
 
508
            reflect.qual(self.__class__) + " did not implement addWriter")
 
509
 
 
510
    def removeReader(self, reader):
 
511
        raise NotImplementedError(
 
512
            reflect.qual(self.__class__) + " did not implement removeReader")
 
513
 
 
514
    def removeWriter(self, writer):
 
515
        raise NotImplementedError(
 
516
            reflect.qual(self.__class__) + " did not implement removeWriter")
 
517
 
 
518
    def removeAll(self):
 
519
        raise NotImplementedError(
 
520
            reflect.qual(self.__class__) + " did not implement removeAll")
 
521
 
 
522
 
 
523
    def getReaders(self):
 
524
        raise NotImplementedError(
 
525
            reflect.qual(self.__class__) + " did not implement getReaders")
 
526
 
 
527
 
 
528
    def getWriters(self):
 
529
        raise NotImplementedError(
 
530
            reflect.qual(self.__class__) + " did not implement getWriters")
 
531
 
 
532
 
 
533
    def resolve(self, name, timeout = (1, 3, 11, 45)):
 
534
        """Return a Deferred that will resolve a hostname.
 
535
        """
 
536
        if not name:
 
537
            # XXX - This is *less than* '::', and will screw up IPv6 servers
 
538
            return defer.succeed('0.0.0.0')
 
539
        if abstract.isIPAddress(name):
 
540
            return defer.succeed(name)
 
541
        return self.resolver.getHostByName(name, timeout)
 
542
 
 
543
    # Installation.
 
544
 
 
545
    # IReactorCore
 
546
    def stop(self):
 
547
        """
 
548
        See twisted.internet.interfaces.IReactorCore.stop.
 
549
        """
 
550
        if self._stopped:
 
551
            raise error.ReactorNotRunning(
 
552
                "Can't stop reactor that isn't running.")
 
553
        self._stopped = True
 
554
        self._justStopped = True
 
555
 
 
556
 
 
557
    def crash(self):
 
558
        """
 
559
        See twisted.internet.interfaces.IReactorCore.crash.
 
560
 
 
561
        Reset reactor state tracking attributes and re-initialize certain
 
562
        state-transition helpers which were set up in C{__init__} but later
 
563
        destroyed (through use).
 
564
        """
 
565
        self._started = False
 
566
        self.running = False
 
567
        self.addSystemEventTrigger(
 
568
            'during', 'startup', self._reallyStartRunning)
 
569
 
 
570
    def sigInt(self, *args):
 
571
        """Handle a SIGINT interrupt.
 
572
        """
 
573
        log.msg("Received SIGINT, shutting down.")
 
574
        self.callFromThread(self.stop)
 
575
 
 
576
    def sigBreak(self, *args):
 
577
        """Handle a SIGBREAK interrupt.
 
578
        """
 
579
        log.msg("Received SIGBREAK, shutting down.")
 
580
        self.callFromThread(self.stop)
 
581
 
 
582
    def sigTerm(self, *args):
 
583
        """Handle a SIGTERM interrupt.
 
584
        """
 
585
        log.msg("Received SIGTERM, shutting down.")
 
586
        self.callFromThread(self.stop)
 
587
 
 
588
    def disconnectAll(self):
 
589
        """Disconnect every reader, and writer in the system.
 
590
        """
 
591
        selectables = self.removeAll()
 
592
        for reader in selectables:
 
593
            log.callWithLogger(reader,
 
594
                               reader.connectionLost,
 
595
                               failure.Failure(main.CONNECTION_LOST))
 
596
 
 
597
 
 
598
    def iterate(self, delay=0):
 
599
        """See twisted.internet.interfaces.IReactorCore.iterate.
 
600
        """
 
601
        self.runUntilCurrent()
 
602
        self.doIteration(delay)
 
603
 
 
604
 
 
605
    def fireSystemEvent(self, eventType):
 
606
        """See twisted.internet.interfaces.IReactorCore.fireSystemEvent.
 
607
        """
 
608
        event = self._eventTriggers.get(eventType)
 
609
        if event is not None:
 
610
            event.fireEvent()
 
611
 
 
612
 
 
613
    def addSystemEventTrigger(self, _phase, _eventType, _f, *args, **kw):
 
614
        """See twisted.internet.interfaces.IReactorCore.addSystemEventTrigger.
 
615
        """
 
616
        assert callable(_f), "%s is not callable" % _f
 
617
        if _eventType not in self._eventTriggers:
 
618
            self._eventTriggers[_eventType] = _ThreePhaseEvent()
 
619
        return (_eventType, self._eventTriggers[_eventType].addTrigger(
 
620
            _phase, _f, *args, **kw))
 
621
 
 
622
 
 
623
    def removeSystemEventTrigger(self, triggerID):
 
624
        """See twisted.internet.interfaces.IReactorCore.removeSystemEventTrigger.
 
625
        """
 
626
        eventType, handle = triggerID
 
627
        self._eventTriggers[eventType].removeTrigger(handle)
 
628
 
 
629
 
 
630
    def callWhenRunning(self, _callable, *args, **kw):
 
631
        """See twisted.internet.interfaces.IReactorCore.callWhenRunning.
 
632
        """
 
633
        if self.running:
 
634
            _callable(*args, **kw)
 
635
        else:
 
636
            return self.addSystemEventTrigger('after', 'startup',
 
637
                                              _callable, *args, **kw)
 
638
 
 
639
    def startRunning(self):
 
640
        """
 
641
        Method called when reactor starts: do some initialization and fire
 
642
        startup events.
 
643
 
 
644
        Don't call this directly, call reactor.run() instead: it should take
 
645
        care of calling this.
 
646
 
 
647
        This method is somewhat misnamed.  The reactor will not necessarily be
 
648
        in the running state by the time this method returns.  The only
 
649
        guarantee is that it will be on its way to the running state.
 
650
        """
 
651
        if self._started:
 
652
            raise error.ReactorAlreadyRunning()
 
653
        self._started = True
 
654
        self._stopped = False
 
655
        threadable.registerAsIOThread()
 
656
        self.fireSystemEvent('startup')
 
657
 
 
658
 
 
659
    def _reallyStartRunning(self):
 
660
        """
 
661
        Method called to transition to the running state.  This should happen
 
662
        in the I{during startup} event trigger phase.
 
663
        """
 
664
        self.running = True
 
665
 
 
666
    # IReactorTime
 
667
 
 
668
    seconds = staticmethod(runtimeSeconds)
 
669
 
 
670
    def callLater(self, _seconds, _f, *args, **kw):
 
671
        """See twisted.internet.interfaces.IReactorTime.callLater.
 
672
        """
 
673
        assert callable(_f), "%s is not callable" % _f
 
674
        assert sys.maxint >= _seconds >= 0, \
 
675
               "%s is not greater than or equal to 0 seconds" % (_seconds,)
 
676
        tple = DelayedCall(self.seconds() + _seconds, _f, args, kw,
 
677
                           self._cancelCallLater,
 
678
                           self._moveCallLaterSooner,
 
679
                           seconds=self.seconds)
 
680
        self._newTimedCalls.append(tple)
 
681
        return tple
 
682
 
 
683
    def _moveCallLaterSooner(self, tple):
 
684
        # Linear time find: slow.
 
685
        heap = self._pendingTimedCalls
 
686
        try:
 
687
            pos = heap.index(tple)
 
688
 
 
689
            # Move elt up the heap until it rests at the right place.
 
690
            elt = heap[pos]
 
691
            while pos != 0:
 
692
                parent = (pos-1) // 2
 
693
                if heap[parent] <= elt:
 
694
                    break
 
695
                # move parent down
 
696
                heap[pos] = heap[parent]
 
697
                pos = parent
 
698
            heap[pos] = elt
 
699
        except ValueError:
 
700
            # element was not found in heap - oh well...
 
701
            pass
 
702
 
 
703
    def _cancelCallLater(self, tple):
 
704
        self._cancellations+=1
 
705
 
 
706
    def cancelCallLater(self, callID):
 
707
        """See twisted.internet.interfaces.IReactorTime.cancelCallLater.
 
708
        """
 
709
        # DO NOT DELETE THIS - this is documented in Python in a Nutshell, so we
 
710
        # we can't get rid of it for a long time.
 
711
        warnings.warn("reactor.cancelCallLater(callID) is deprecated - use callID.cancel() instead")
 
712
        callID.cancel()
 
713
 
 
714
    def getDelayedCalls(self):
 
715
        """Return all the outstanding delayed calls in the system.
 
716
        They are returned in no particular order.
 
717
        This method is not efficient -- it is really only meant for
 
718
        test cases."""
 
719
        return [x for x in (self._pendingTimedCalls + self._newTimedCalls) if not x.cancelled]
 
720
 
 
721
    def _insertNewDelayedCalls(self):
 
722
        for call in self._newTimedCalls:
 
723
            if call.cancelled:
 
724
                self._cancellations-=1
 
725
            else:
 
726
                call.activate_delay()
 
727
                heappush(self._pendingTimedCalls, call)
 
728
        self._newTimedCalls = []
 
729
 
 
730
    def timeout(self):
 
731
        # insert new delayed calls to make sure to include them in timeout value
 
732
        self._insertNewDelayedCalls()
 
733
 
 
734
        if not self._pendingTimedCalls:
 
735
            return None
 
736
 
 
737
        return max(0, self._pendingTimedCalls[0].time - self.seconds())
 
738
 
 
739
 
 
740
    def runUntilCurrent(self):
 
741
        """Run all pending timed calls.
 
742
        """
 
743
        if self.threadCallQueue:
 
744
            # Keep track of how many calls we actually make, as we're
 
745
            # making them, in case another call is added to the queue
 
746
            # while we're in this loop.
 
747
            count = 0
 
748
            total = len(self.threadCallQueue)
 
749
            for (f, a, kw) in self.threadCallQueue:
 
750
                try:
 
751
                    f(*a, **kw)
 
752
                except:
 
753
                    log.err()
 
754
                count += 1
 
755
                if count == total:
 
756
                    break
 
757
            del self.threadCallQueue[:count]
 
758
            if self.threadCallQueue:
 
759
                self.wakeUp()
 
760
 
 
761
        # insert new delayed calls now
 
762
        self._insertNewDelayedCalls()
 
763
 
 
764
        now = self.seconds()
 
765
        while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now):
 
766
            call = heappop(self._pendingTimedCalls)
 
767
            if call.cancelled:
 
768
                self._cancellations-=1
 
769
                continue
 
770
 
 
771
            if call.delayed_time > 0:
 
772
                call.activate_delay()
 
773
                heappush(self._pendingTimedCalls, call)
 
774
                continue
 
775
 
 
776
            try:
 
777
                call.called = 1
 
778
                call.func(*call.args, **call.kw)
 
779
            except:
 
780
                log.deferr()
 
781
                if hasattr(call, "creator"):
 
782
                    e = "\n"
 
783
                    e += " C: previous exception occurred in " + \
 
784
                         "a DelayedCall created here:\n"
 
785
                    e += " C:"
 
786
                    e += "".join(call.creator).rstrip().replace("\n","\n C:")
 
787
                    e += "\n"
 
788
                    log.msg(e)
 
789
 
 
790
 
 
791
        if (self._cancellations > 50 and
 
792
             self._cancellations > len(self._pendingTimedCalls) >> 1):
 
793
            self._cancellations = 0
 
794
            self._pendingTimedCalls = [x for x in self._pendingTimedCalls
 
795
                                       if not x.cancelled]
 
796
            heapify(self._pendingTimedCalls)
 
797
 
 
798
        if self._justStopped:
 
799
            self._justStopped = False
 
800
            self.fireSystemEvent("shutdown")
 
801
 
 
802
    # IReactorProcess
 
803
 
 
804
    def _checkProcessArgs(self, args, env):
 
805
        """
 
806
        Check for valid arguments and environment to spawnProcess.
 
807
 
 
808
        @return: A two element tuple giving values to use when creating the
 
809
        process.  The first element of the tuple is a C{list} of C{str}
 
810
        giving the values for argv of the child process.  The second element
 
811
        of the tuple is either C{None} if C{env} was C{None} or a C{dict}
 
812
        mapping C{str} environment keys to C{str} environment values.
 
813
        """
 
814
        # Any unicode string which Python would successfully implicitly
 
815
        # encode to a byte string would have worked before these explicit
 
816
        # checks were added.  Anything which would have failed with a
 
817
        # UnicodeEncodeError during that implicit encoding step would have
 
818
        # raised an exception in the child process and that would have been
 
819
        # a pain in the butt to debug.
 
820
        #
 
821
        # So, we will explicitly attempt the same encoding which Python
 
822
        # would implicitly do later.  If it fails, we will report an error
 
823
        # without ever spawning a child process.  If it succeeds, we'll save
 
824
        # the result so that Python doesn't need to do it implicitly later.
 
825
        #
 
826
        # For any unicode which we can actually encode, we'll also issue a
 
827
        # deprecation warning, because no one should be passing unicode here
 
828
        # anyway.
 
829
        #
 
830
        # -exarkun
 
831
        defaultEncoding = sys.getdefaultencoding()
 
832
 
 
833
        # Common check function
 
834
        def argChecker(arg):
 
835
            """
 
836
            Return either a str or None.  If the given value is not
 
837
            allowable for some reason, None is returned.  Otherwise, a
 
838
            possibly different object which should be used in place of arg
 
839
            is returned.  This forces unicode encoding to happen now, rather
 
840
            than implicitly later.
 
841
            """
 
842
            if isinstance(arg, unicode):
 
843
                try:
 
844
                    arg = arg.encode(defaultEncoding)
 
845
                except UnicodeEncodeError:
 
846
                    return None
 
847
                warnings.warn(
 
848
                    "Argument strings and environment keys/values passed to "
 
849
                    "reactor.spawnProcess should be str, not unicode.",
 
850
                    category=DeprecationWarning,
 
851
                    stacklevel=4)
 
852
            if isinstance(arg, str) and '\0' not in arg:
 
853
                return arg
 
854
            return None
 
855
 
 
856
        # Make a few tests to check input validity
 
857
        if not isinstance(args, (tuple, list)):
 
858
            raise TypeError("Arguments must be a tuple or list")
 
859
 
 
860
        outputArgs = []
 
861
        for arg in args:
 
862
            arg = argChecker(arg)
 
863
            if arg is None:
 
864
                raise TypeError("Arguments contain a non-string value")
 
865
            else:
 
866
                outputArgs.append(arg)
 
867
 
 
868
        outputEnv = None
 
869
        if env is not None:
 
870
            outputEnv = {}
 
871
            for key, val in env.iteritems():
 
872
                key = argChecker(key)
 
873
                if key is None:
 
874
                    raise TypeError("Environment contains a non-string key")
 
875
                val = argChecker(val)
 
876
                if val is None:
 
877
                    raise TypeError("Environment contains a non-string value")
 
878
                outputEnv[key] = val
 
879
        return outputArgs, outputEnv
 
880
 
 
881
    # IReactorThreads
 
882
    if platform.supportsThreads():
 
883
        threadpool = None
 
884
        # ID of the trigger starting the threadpool
 
885
        _threadpoolStartupID = None
 
886
        # ID of the trigger stopping the threadpool
 
887
        threadpoolShutdownID = None
 
888
 
 
889
        def _initThreads(self):
 
890
            self.usingThreads = True
 
891
            self.resolver = ThreadedResolver(self)
 
892
            self.installWaker()
 
893
 
 
894
        def callFromThread(self, f, *args, **kw):
 
895
            """
 
896
            See L{twisted.internet.interfaces.IReactorThreads.callFromThread}.
 
897
            """
 
898
            assert callable(f), "%s is not callable" % (f,)
 
899
            # lists are thread-safe in CPython, but not in Jython
 
900
            # this is probably a bug in Jython, but until fixed this code
 
901
            # won't work in Jython.
 
902
            self.threadCallQueue.append((f, args, kw))
 
903
            self.wakeUp()
 
904
 
 
905
        def _initThreadPool(self):
 
906
            """
 
907
            Create the threadpool accessible with callFromThread.
 
908
            """
 
909
            from twisted.python import threadpool
 
910
            self.threadpool = threadpool.ThreadPool(
 
911
                0, 10, 'twisted.internet.reactor')
 
912
            self._threadpoolStartupID = self.callWhenRunning(
 
913
                self.threadpool.start)
 
914
            self.threadpoolShutdownID = self.addSystemEventTrigger(
 
915
                'during', 'shutdown', self._stopThreadPool)
 
916
 
 
917
        def _stopThreadPool(self):
 
918
            """
 
919
            Stop the reactor threadpool.  This method is only valid if there
 
920
            is currently a threadpool (created by L{_initThreadPool}).  It
 
921
            is not intended to be called directly; instead, it will be
 
922
            called by a shutdown trigger created in L{_initThreadPool}.
 
923
            """
 
924
            triggers = [self._threadpoolStartupID, self.threadpoolShutdownID]
 
925
            for trigger in filter(None, triggers):
 
926
                try:
 
927
                    self.removeSystemEventTrigger(trigger)
 
928
                except ValueError:
 
929
                    pass
 
930
            self._threadpoolStartupID = None
 
931
            self.threadpoolShutdownID = None
 
932
            self.threadpool.stop()
 
933
            self.threadpool = None
 
934
 
 
935
 
 
936
        def getThreadPool(self):
 
937
            """
 
938
            See L{twisted.internet.interfaces.IReactorThreads.getThreadPool}.
 
939
            """
 
940
            if self.threadpool is None:
 
941
                self._initThreadPool()
 
942
            return self.threadpool
 
943
 
 
944
 
 
945
        def callInThread(self, _callable, *args, **kwargs):
 
946
            """
 
947
            See L{twisted.internet.interfaces.IReactorThreads.callInThread}.
 
948
            """
 
949
            self.getThreadPool().callInThread(_callable, *args, **kwargs)
 
950
 
 
951
        def suggestThreadPoolSize(self, size):
 
952
            """
 
953
            See L{twisted.internet.interfaces.IReactorThreads.suggestThreadPoolSize}.
 
954
            """
 
955
            self.getThreadPool().adjustPoolsize(maxthreads=size)
 
956
    else:
 
957
        # This is for signal handlers.
 
958
        def callFromThread(self, f, *args, **kw):
 
959
            assert callable(f), "%s is not callable" % (f,)
 
960
            # See comment in the other callFromThread implementation.
 
961
            self.threadCallQueue.append((f, args, kw))
 
962
 
 
963
if platform.supportsThreads():
 
964
    classImplements(ReactorBase, IReactorThreads)
 
965
 
 
966
 
 
967
class BaseConnector(styles.Ephemeral):
 
968
    """Basic implementation of connector.
 
969
 
 
970
    State can be: "connecting", "connected", "disconnected"
 
971
    """
 
972
 
 
973
    implements(IConnector)
 
974
 
 
975
    timeoutID = None
 
976
    factoryStarted = 0
 
977
 
 
978
    def __init__(self, factory, timeout, reactor):
 
979
        self.state = "disconnected"
 
980
        self.reactor = reactor
 
981
        self.factory = factory
 
982
        self.timeout = timeout
 
983
 
 
984
    def disconnect(self):
 
985
        """Disconnect whatever our state is."""
 
986
        if self.state == 'connecting':
 
987
            self.stopConnecting()
 
988
        elif self.state == 'connected':
 
989
            self.transport.loseConnection()
 
990
 
 
991
    def connect(self):
 
992
        """Start connection to remote server."""
 
993
        if self.state != "disconnected":
 
994
            raise RuntimeError, "can't connect in this state"
 
995
 
 
996
        self.state = "connecting"
 
997
        if not self.factoryStarted:
 
998
            self.factory.doStart()
 
999
            self.factoryStarted = 1
 
1000
        self.transport = transport = self._makeTransport()
 
1001
        if self.timeout is not None:
 
1002
            self.timeoutID = self.reactor.callLater(self.timeout, transport.failIfNotConnected, error.TimeoutError())
 
1003
        self.factory.startedConnecting(self)
 
1004
 
 
1005
    def stopConnecting(self):
 
1006
        """Stop attempting to connect."""
 
1007
        if self.state != "connecting":
 
1008
            raise error.NotConnectingError, "we're not trying to connect"
 
1009
 
 
1010
        self.state = "disconnected"
 
1011
        self.transport.failIfNotConnected(error.UserError())
 
1012
        del self.transport
 
1013
 
 
1014
    def cancelTimeout(self):
 
1015
        if self.timeoutID is not None:
 
1016
            try:
 
1017
                self.timeoutID.cancel()
 
1018
            except ValueError:
 
1019
                pass
 
1020
            del self.timeoutID
 
1021
 
 
1022
    def buildProtocol(self, addr):
 
1023
        self.state = "connected"
 
1024
        self.cancelTimeout()
 
1025
        return self.factory.buildProtocol(addr)
 
1026
 
 
1027
    def connectionFailed(self, reason):
 
1028
        self.cancelTimeout()
 
1029
        self.transport = None
 
1030
        self.state = "disconnected"
 
1031
        self.factory.clientConnectionFailed(self, reason)
 
1032
        if self.state == "disconnected":
 
1033
            # factory hasn't called our connect() method
 
1034
            self.factory.doStop()
 
1035
            self.factoryStarted = 0
 
1036
 
 
1037
    def connectionLost(self, reason):
 
1038
        self.state = "disconnected"
 
1039
        self.factory.clientConnectionLost(self, reason)
 
1040
        if self.state == "disconnected":
 
1041
            # factory hasn't called our connect() method
 
1042
            self.factory.doStop()
 
1043
            self.factoryStarted = 0
 
1044
 
 
1045
    def getDestination(self):
 
1046
        raise NotImplementedError(
 
1047
            reflect.qual(self.__class__) + " did not implement "
 
1048
            "getDestination")
 
1049
 
 
1050
 
 
1051
 
 
1052
class BasePort(abstract.FileDescriptor):
 
1053
    """Basic implementation of a ListeningPort.
 
1054
 
 
1055
    Note: This does not actually implement IListeningPort.
 
1056
    """
 
1057
 
 
1058
    addressFamily = None
 
1059
    socketType = None
 
1060
 
 
1061
    def createInternetSocket(self):
 
1062
        s = socket.socket(self.addressFamily, self.socketType)
 
1063
        s.setblocking(0)
 
1064
        fdesc._setCloseOnExec(s.fileno())
 
1065
        return s
 
1066
 
 
1067
 
 
1068
    def doWrite(self):
 
1069
        """Raises a RuntimeError"""
 
1070
        raise RuntimeError, "doWrite called on a %s" % reflect.qual(self.__class__)
 
1071
 
 
1072
 
 
1073
 
 
1074
class _SignalReactorMixin:
 
1075
    """
 
1076
    Private mixin to manage signals: it installs signal handlers at start time,
 
1077
    and define run method.
 
1078
 
 
1079
    It can only be used mixed in with L{ReactorBase}, and has to be defined
 
1080
    first in the inheritance (so that method resolution order finds
 
1081
    startRunning first).
 
1082
 
 
1083
    @type _installSignalHandlers: C{bool}
 
1084
    @ivar _installSignalHandlers: A flag which indicates whether any signal
 
1085
        handlers will be installed during startup.  This includes handlers for
 
1086
        SIGCHLD to monitor child processes, and SIGINT, SIGTERM, and SIGBREAK
 
1087
        to stop the reactor.
 
1088
    """
 
1089
 
 
1090
    _installSignalHandlers = False
 
1091
 
 
1092
    def _handleSignals(self):
 
1093
        """
 
1094
        Install the signal handlers for the Twisted event loop.
 
1095
        """
 
1096
        try:
 
1097
            import signal
 
1098
        except ImportError:
 
1099
            log.msg("Warning: signal module unavailable -- "
 
1100
                    "not installing signal handlers.")
 
1101
            return
 
1102
 
 
1103
        if signal.getsignal(signal.SIGINT) == signal.default_int_handler:
 
1104
            # only handle if there isn't already a handler, e.g. for Pdb.
 
1105
            signal.signal(signal.SIGINT, self.sigInt)
 
1106
        signal.signal(signal.SIGTERM, self.sigTerm)
 
1107
 
 
1108
        # Catch Ctrl-Break in windows
 
1109
        if hasattr(signal, "SIGBREAK"):
 
1110
            signal.signal(signal.SIGBREAK, self.sigBreak)
 
1111
 
 
1112
        if platformType == 'posix':
 
1113
            signal.signal(signal.SIGCHLD, self._handleSigchld)
 
1114
            # Also call the signal handler right now, in case we missed any
 
1115
            # signals before we installed it.  This should only happen if
 
1116
            # someone used spawnProcess before calling reactor.run (and the
 
1117
            # process also exited already).
 
1118
            self._handleSigchld(signal.SIGCHLD, None)
 
1119
 
 
1120
 
 
1121
    def _handleSigchld(self, signum, frame, _threadSupport=platform.supportsThreads()):
 
1122
        """
 
1123
        Reap all processes on SIGCHLD.
 
1124
 
 
1125
        This gets called on SIGCHLD. We do no processing inside a signal
 
1126
        handler, as the calls we make here could occur between any two
 
1127
        python bytecode instructions. Deferring processing to the next
 
1128
        eventloop round prevents us from violating the state constraints
 
1129
        of arbitrary classes.
 
1130
        """
 
1131
        from twisted.internet.process import reapAllProcesses
 
1132
        if _threadSupport:
 
1133
            self.callFromThread(reapAllProcesses)
 
1134
        else:
 
1135
            self.callLater(0, reapAllProcesses)
 
1136
 
 
1137
 
 
1138
    def startRunning(self, installSignalHandlers=True):
 
1139
        """
 
1140
        Extend the base implementation in order to remember whether signal
 
1141
        handlers should be installed later.
 
1142
 
 
1143
        @type installSignalHandlers: C{bool}
 
1144
        @param installSignalHandlers: A flag which, if set, indicates that
 
1145
            handlers for a number of (implementation-defined) signals should be
 
1146
            installed during startup.
 
1147
        """
 
1148
        self._installSignalHandlers = installSignalHandlers
 
1149
        ReactorBase.startRunning(self)
 
1150
 
 
1151
 
 
1152
    def _reallyStartRunning(self):
 
1153
        """
 
1154
        Extend the base implementation by also installing signal handlers, if
 
1155
        C{self._installSignalHandlers} is true.
 
1156
        """
 
1157
        ReactorBase._reallyStartRunning(self)
 
1158
        if self._installSignalHandlers:
 
1159
            # Make sure this happens before after-startup events, since the
 
1160
            # expectation of after-startup is that the reactor is fully
 
1161
            # initialized.  Don't do it right away for historical reasons
 
1162
            # (perhaps some before-startup triggers don't want there to be a
 
1163
            # custom SIGCHLD handler so that they can run child processes with
 
1164
            # some blocking api).
 
1165
            self._handleSignals()
 
1166
 
 
1167
 
 
1168
    def run(self, installSignalHandlers=True):
 
1169
        self.startRunning(installSignalHandlers=installSignalHandlers)
 
1170
        self.mainLoop()
 
1171
 
 
1172
 
 
1173
    def mainLoop(self):
 
1174
        while self._started:
 
1175
            try:
 
1176
                while self._started:
 
1177
                    # Advance simulation time in delayed event
 
1178
                    # processors.
 
1179
                    self.runUntilCurrent()
 
1180
                    t2 = self.timeout()
 
1181
                    t = self.running and t2
 
1182
                    self.doIteration(t)
 
1183
            except:
 
1184
                log.msg("Unexpected error in main loop.")
 
1185
                log.err()
 
1186
            else:
 
1187
                log.msg('Main loop terminated.')
 
1188
 
 
1189
 
 
1190
 
 
1191
__all__ = []