1
# -*- test-case-name: twisted.test.test_internet -*-
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
Very basic functionality for a Reactor implementation.
9
import socket # needed only for sync-dns
10
from zope.interface import implements, classImplements
14
from heapq import heappush, heappop, heapify
18
from twisted.python.compat import set
19
from twisted.python.util import unsignedID
20
from twisted.internet.interfaces import IReactorCore, IReactorTime, IReactorThreads
21
from twisted.internet.interfaces import IResolverSimple, IReactorPluggableResolver
22
from twisted.internet.interfaces import IConnector, IDelayedCall
23
from twisted.internet import fdesc, main, error, abstract, defer, threads
24
from twisted.python import log, failure, reflect
25
from twisted.python.runtime import seconds as runtimeSeconds, platform, platformType
26
from twisted.internet.defer import Deferred, DeferredList
27
from twisted.persisted import styles
29
# This import is for side-effects! Even if you don't see any code using it
30
# in this module, don't delete it.
31
from twisted.python import threadable
34
class DelayedCall(styles.Ephemeral):
36
implements(IDelayedCall)
37
# enable .debug to record creator call stack, and it will be logged if
38
# an exception occurs while the function is being run
42
def __init__(self, time, func, args, kw, cancel, reset,
43
seconds=runtimeSeconds):
45
@param time: Seconds from the epoch at which to call C{func}.
46
@param func: The callable to call.
47
@param args: The positional arguments to pass to the callable.
48
@param kw: The keyword arguments to pass to the callable.
49
@param cancel: A callable which will be called with this
50
DelayedCall before cancellation.
51
@param reset: A callable which will be called with this
52
DelayedCall after changing this DelayedCall's scheduled
53
execution time. The callable should adjust any necessary
54
scheduling details to ensure this DelayedCall is invoked
55
at the new appropriate time.
56
@param seconds: If provided, a no-argument callable which will be
57
used to determine the current time any time that information is
60
self.time, self.func, self.args, self.kw = time, func, args, kw
62
self.canceller = cancel
63
self.seconds = seconds
64
self.cancelled = self.called = 0
67
self.creator = traceback.format_stack()[:-2]
70
"""Return the time at which this call will fire
73
@return: The number of seconds after the epoch at which this call is
76
return self.time + self.delayed_time
79
"""Unschedule this call
81
@raise AlreadyCancelled: Raised if this call has already been
84
@raise AlreadyCalled: Raised if this call has already been made.
87
raise error.AlreadyCancelled
89
raise error.AlreadyCalled
95
del self.func, self.args, self.kw
97
def reset(self, secondsFromNow):
98
"""Reschedule this call for a different time
100
@type secondsFromNow: C{float}
101
@param secondsFromNow: The number of seconds from the time of the
102
C{reset} call at which this call will be scheduled.
104
@raise AlreadyCancelled: Raised if this call has been cancelled.
105
@raise AlreadyCalled: Raised if this call has already been made.
108
raise error.AlreadyCancelled
110
raise error.AlreadyCalled
112
newTime = self.seconds() + secondsFromNow
113
if newTime < self.time:
114
self.delayed_time = 0
118
self.delayed_time = newTime - self.time
120
def delay(self, secondsLater):
121
"""Reschedule this call for a later time
123
@type secondsLater: C{float}
124
@param secondsLater: The number of seconds after the originally
125
scheduled time for which to reschedule this call.
127
@raise AlreadyCancelled: Raised if this call has been cancelled.
128
@raise AlreadyCalled: Raised if this call has already been made.
131
raise error.AlreadyCancelled
133
raise error.AlreadyCalled
135
self.delayed_time += secondsLater
136
if self.delayed_time < 0:
137
self.activate_delay()
140
def activate_delay(self):
141
self.time += self.delayed_time
142
self.delayed_time = 0
145
"""Determine whether this call is still pending
148
@return: True if this call has not yet been made or cancelled,
151
return not (self.cancelled or self.called)
153
def __le__(self, other):
154
return self.time <= other.time
158
if self._str is not None:
160
if hasattr(self, 'func'):
161
if hasattr(self.func, 'func_name'):
162
func = self.func.func_name
163
if hasattr(self.func, 'im_class'):
164
func = self.func.im_class.__name__ + '.' + func
166
func = reflect.safe_repr(self.func)
171
L = ["<DelayedCall 0x%x [%ss] called=%s cancelled=%s" % (
172
unsignedID(self), self.time - now, self.called,
175
L.extend((" ", func, "("))
177
L.append(", ".join([reflect.safe_repr(e) for e in self.args]))
181
L.append(", ".join(['%s=%s' % (k, reflect.safe_repr(v)) for (k, v) in self.kw.iteritems()]))
185
L.append("\n\ntraceback at creation: \n\n%s" % (' '.join(self.creator)))
192
class ThreadedResolver(object):
194
L{ThreadedResolver} uses a reactor, a threadpool, and
195
L{socket.gethostbyname} to perform name lookups without blocking the
196
reactor thread. It also supports timeouts indepedently from whatever
197
timeout logic L{socket.gethostbyname} might have.
199
@ivar reactor: The reactor the threadpool of which will be used to call
200
L{socket.gethostbyname} and the I/O thread of which the result will be
203
implements(IResolverSimple)
205
def __init__(self, reactor):
206
self.reactor = reactor
207
self._runningQueries = {}
210
def _fail(self, name, err):
211
err = error.DNSLookupError("address %r not found: %s" % (name, err))
212
return failure.Failure(err)
215
def _cleanup(self, name, lookupDeferred):
216
userDeferred, cancelCall = self._runningQueries[lookupDeferred]
217
del self._runningQueries[lookupDeferred]
218
userDeferred.errback(self._fail(name, "timeout error"))
221
def _checkTimeout(self, result, name, lookupDeferred):
223
userDeferred, cancelCall = self._runningQueries[lookupDeferred]
227
del self._runningQueries[lookupDeferred]
230
if isinstance(result, failure.Failure):
231
userDeferred.errback(self._fail(name, result.getErrorMessage()))
233
userDeferred.callback(result)
236
def getHostByName(self, name, timeout = (1, 3, 11, 45)):
238
See L{twisted.internet.interfaces.IResolverSimple.getHostByName}.
240
Note that the elements of C{timeout} are summed and the result is used
241
as a timeout for the lookup. Any intermediate timeout or retry logic
242
is left up to the platform via L{socket.gethostbyname}.
245
timeoutDelay = sum(timeout)
248
userDeferred = defer.Deferred()
249
lookupDeferred = threads.deferToThreadPool(
250
self.reactor, self.reactor.getThreadPool(),
251
socket.gethostbyname, name)
252
cancelCall = self.reactor.callLater(
253
timeoutDelay, self._cleanup, name, lookupDeferred)
254
self._runningQueries[lookupDeferred] = (userDeferred, cancelCall)
255
lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred)
260
class BlockingResolver:
261
implements(IResolverSimple)
263
def getHostByName(self, name, timeout = (1, 3, 11, 45)):
265
address = socket.gethostbyname(name)
267
msg = "address %r not found" % (name,)
268
err = error.DNSLookupError(msg)
269
return defer.fail(err)
271
return defer.succeed(address)
274
class _ThreePhaseEvent(object):
276
Collection of callables (with arguments) which can be invoked as a group in
279
This provides the underlying implementation for the reactor's system event
280
triggers. An instance of this class tracks triggers for all phases of a
281
single type of event.
283
@ivar before: A list of the before-phase triggers containing three-tuples
284
of a callable, a tuple of positional arguments, and a dict of keyword
287
@ivar finishedBefore: A list of the before-phase triggers which have
288
already been executed. This is only populated in the C{'BEFORE'} state.
290
@ivar during: A list of the during-phase triggers containing three-tuples
291
of a callable, a tuple of positional arguments, and a dict of keyword
294
@ivar after: A list of the after-phase triggers containing three-tuples
295
of a callable, a tuple of positional arguments, and a dict of keyword
298
@ivar state: A string indicating what is currently going on with this
299
object. One of C{'BASE'} (for when nothing in particular is happening;
300
this is the initial value), C{'BEFORE'} (when the before-phase triggers
301
are in the process of being executed).
310
def addTrigger(self, phase, callable, *args, **kwargs):
312
Add a trigger to the indicate phase.
314
@param phase: One of C{'before'}, C{'during'}, or C{'after'}.
316
@param callable: An object to be called when this event is triggered.
317
@param *args: Positional arguments to pass to C{callable}.
318
@param **kwargs: Keyword arguments to pass to C{callable}.
320
@return: An opaque handle which may be passed to L{removeTrigger} to
321
reverse the effects of calling this method.
323
if phase not in ('before', 'during', 'after'):
324
raise KeyError("invalid phase")
325
getattr(self, phase).append((callable, args, kwargs))
326
return phase, callable, args, kwargs
329
def removeTrigger(self, handle):
331
Remove a previously added trigger callable.
333
@param handle: An object previously returned by L{addTrigger}. The
334
trigger added by that call will be removed.
336
@raise ValueError: If the trigger associated with C{handle} has already
337
been removed or if C{handle} is not a valid handle.
339
return getattr(self, 'removeTrigger_' + self.state)(handle)
342
def removeTrigger_BASE(self, handle):
344
Just try to remove the trigger.
349
phase, callable, args, kwargs = handle
350
except (TypeError, ValueError), e:
351
raise ValueError("invalid trigger handle")
353
if phase not in ('before', 'during', 'after'):
354
raise KeyError("invalid phase")
355
getattr(self, phase).remove((callable, args, kwargs))
358
def removeTrigger_BEFORE(self, handle):
360
Remove the trigger if it has yet to be executed, otherwise emit a
361
warning that in the future an exception will be raised when removing an
362
already-executed trigger.
366
phase, callable, args, kwargs = handle
367
if phase != 'before':
368
return self.removeTrigger_BASE(handle)
369
if (callable, args, kwargs) in self.finishedBefore:
371
"Removing already-fired system event triggers will raise an "
372
"exception in a future version of Twisted.",
373
category=DeprecationWarning,
376
self.removeTrigger_BASE(handle)
381
Call the triggers added to this event.
383
self.state = 'BEFORE'
384
self.finishedBefore = []
387
callable, args, kwargs = self.before.pop(0)
388
self.finishedBefore.append((callable, args, kwargs))
390
result = callable(*args, **kwargs)
394
if isinstance(result, Deferred):
395
beforeResults.append(result)
396
DeferredList(beforeResults).addCallback(self._continueFiring)
399
def _continueFiring(self, ignored):
401
Call the during and after phase triggers for this event.
404
self.finishedBefore = []
405
for phase in self.during, self.after:
407
callable, args, kwargs = phase.pop(0)
409
callable(*args, **kwargs)
415
class ReactorBase(object):
417
Default base class for Reactors.
419
@type _stopped: C{bool}
420
@ivar _stopped: A flag which is true between paired calls to C{reactor.run}
421
and C{reactor.stop}. This should be replaced with an explicit state
424
@type _justStopped: C{bool}
425
@ivar _justStopped: A flag which is true between the time C{reactor.stop}
426
is called and the time the shutdown system event is fired. This is
427
used to determine whether that event should be fired after each
428
iteration through the mainloop. This should be replaced with an
429
explicit state machine.
431
@type _started: C{bool}
432
@ivar _started: A flag which is true from the time C{reactor.run} is called
433
until the time C{reactor.run} returns. This is used to prevent calls
434
to C{reactor.run} on a running reactor. This should be replaced with
435
an explicit state machine.
437
@ivar running: See L{IReactorCore.running}
439
implements(IReactorCore, IReactorTime, IReactorPluggableResolver)
444
resolver = BlockingResolver()
446
__name__ = "twisted.internet.reactor"
449
self.threadCallQueue = []
450
self._eventTriggers = {}
451
self._pendingTimedCalls = []
452
self._newTimedCalls = []
453
self._cancellations = 0
455
self._started = False
456
self._justStopped = False
457
# reactor internal readers, e.g. the waker.
458
self._internalReaders = set()
461
# Arrange for the running attribute to change to True at the right time
462
# and let a subclass possibly do other things at that time (eg install
464
self.addSystemEventTrigger(
465
'during', 'startup', self._reallyStartRunning)
466
self.addSystemEventTrigger('during', 'shutdown', self.crash)
467
self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
469
if platform.supportsThreads():
472
# override in subclasses
476
def installWaker(self):
477
raise NotImplementedError(
478
reflect.qual(self.__class__) + " did not implement installWaker")
480
def installResolver(self, resolver):
481
assert IResolverSimple.providedBy(resolver)
482
oldResolver = self.resolver
483
self.resolver = resolver
488
Wake up the event loop.
492
# if the waker isn't installed, the reactor isn't running, and
493
# therefore doesn't need to be woken up
495
def doIteration(self, delay):
497
Do one iteration over the readers and writers which have been added.
499
raise NotImplementedError(
500
reflect.qual(self.__class__) + " did not implement doIteration")
502
def addReader(self, reader):
503
raise NotImplementedError(
504
reflect.qual(self.__class__) + " did not implement addReader")
506
def addWriter(self, writer):
507
raise NotImplementedError(
508
reflect.qual(self.__class__) + " did not implement addWriter")
510
def removeReader(self, reader):
511
raise NotImplementedError(
512
reflect.qual(self.__class__) + " did not implement removeReader")
514
def removeWriter(self, writer):
515
raise NotImplementedError(
516
reflect.qual(self.__class__) + " did not implement removeWriter")
519
raise NotImplementedError(
520
reflect.qual(self.__class__) + " did not implement removeAll")
523
def getReaders(self):
524
raise NotImplementedError(
525
reflect.qual(self.__class__) + " did not implement getReaders")
528
def getWriters(self):
529
raise NotImplementedError(
530
reflect.qual(self.__class__) + " did not implement getWriters")
533
def resolve(self, name, timeout = (1, 3, 11, 45)):
534
"""Return a Deferred that will resolve a hostname.
537
# XXX - This is *less than* '::', and will screw up IPv6 servers
538
return defer.succeed('0.0.0.0')
539
if abstract.isIPAddress(name):
540
return defer.succeed(name)
541
return self.resolver.getHostByName(name, timeout)
548
See twisted.internet.interfaces.IReactorCore.stop.
551
raise error.ReactorNotRunning(
552
"Can't stop reactor that isn't running.")
554
self._justStopped = True
559
See twisted.internet.interfaces.IReactorCore.crash.
561
Reset reactor state tracking attributes and re-initialize certain
562
state-transition helpers which were set up in C{__init__} but later
563
destroyed (through use).
565
self._started = False
567
self.addSystemEventTrigger(
568
'during', 'startup', self._reallyStartRunning)
570
def sigInt(self, *args):
571
"""Handle a SIGINT interrupt.
573
log.msg("Received SIGINT, shutting down.")
574
self.callFromThread(self.stop)
576
def sigBreak(self, *args):
577
"""Handle a SIGBREAK interrupt.
579
log.msg("Received SIGBREAK, shutting down.")
580
self.callFromThread(self.stop)
582
def sigTerm(self, *args):
583
"""Handle a SIGTERM interrupt.
585
log.msg("Received SIGTERM, shutting down.")
586
self.callFromThread(self.stop)
588
def disconnectAll(self):
589
"""Disconnect every reader, and writer in the system.
591
selectables = self.removeAll()
592
for reader in selectables:
593
log.callWithLogger(reader,
594
reader.connectionLost,
595
failure.Failure(main.CONNECTION_LOST))
598
def iterate(self, delay=0):
599
"""See twisted.internet.interfaces.IReactorCore.iterate.
601
self.runUntilCurrent()
602
self.doIteration(delay)
605
def fireSystemEvent(self, eventType):
606
"""See twisted.internet.interfaces.IReactorCore.fireSystemEvent.
608
event = self._eventTriggers.get(eventType)
609
if event is not None:
613
def addSystemEventTrigger(self, _phase, _eventType, _f, *args, **kw):
614
"""See twisted.internet.interfaces.IReactorCore.addSystemEventTrigger.
616
assert callable(_f), "%s is not callable" % _f
617
if _eventType not in self._eventTriggers:
618
self._eventTriggers[_eventType] = _ThreePhaseEvent()
619
return (_eventType, self._eventTriggers[_eventType].addTrigger(
620
_phase, _f, *args, **kw))
623
def removeSystemEventTrigger(self, triggerID):
624
"""See twisted.internet.interfaces.IReactorCore.removeSystemEventTrigger.
626
eventType, handle = triggerID
627
self._eventTriggers[eventType].removeTrigger(handle)
630
def callWhenRunning(self, _callable, *args, **kw):
631
"""See twisted.internet.interfaces.IReactorCore.callWhenRunning.
634
_callable(*args, **kw)
636
return self.addSystemEventTrigger('after', 'startup',
637
_callable, *args, **kw)
639
def startRunning(self):
641
Method called when reactor starts: do some initialization and fire
644
Don't call this directly, call reactor.run() instead: it should take
645
care of calling this.
647
This method is somewhat misnamed. The reactor will not necessarily be
648
in the running state by the time this method returns. The only
649
guarantee is that it will be on its way to the running state.
652
raise error.ReactorAlreadyRunning()
654
self._stopped = False
655
threadable.registerAsIOThread()
656
self.fireSystemEvent('startup')
659
def _reallyStartRunning(self):
661
Method called to transition to the running state. This should happen
662
in the I{during startup} event trigger phase.
668
seconds = staticmethod(runtimeSeconds)
670
def callLater(self, _seconds, _f, *args, **kw):
671
"""See twisted.internet.interfaces.IReactorTime.callLater.
673
assert callable(_f), "%s is not callable" % _f
674
assert sys.maxint >= _seconds >= 0, \
675
"%s is not greater than or equal to 0 seconds" % (_seconds,)
676
tple = DelayedCall(self.seconds() + _seconds, _f, args, kw,
677
self._cancelCallLater,
678
self._moveCallLaterSooner,
679
seconds=self.seconds)
680
self._newTimedCalls.append(tple)
683
def _moveCallLaterSooner(self, tple):
684
# Linear time find: slow.
685
heap = self._pendingTimedCalls
687
pos = heap.index(tple)
689
# Move elt up the heap until it rests at the right place.
692
parent = (pos-1) // 2
693
if heap[parent] <= elt:
696
heap[pos] = heap[parent]
700
# element was not found in heap - oh well...
703
def _cancelCallLater(self, tple):
704
self._cancellations+=1
706
def cancelCallLater(self, callID):
707
"""See twisted.internet.interfaces.IReactorTime.cancelCallLater.
709
# DO NOT DELETE THIS - this is documented in Python in a Nutshell, so we
710
# we can't get rid of it for a long time.
711
warnings.warn("reactor.cancelCallLater(callID) is deprecated - use callID.cancel() instead")
714
def getDelayedCalls(self):
715
"""Return all the outstanding delayed calls in the system.
716
They are returned in no particular order.
717
This method is not efficient -- it is really only meant for
719
return [x for x in (self._pendingTimedCalls + self._newTimedCalls) if not x.cancelled]
721
def _insertNewDelayedCalls(self):
722
for call in self._newTimedCalls:
724
self._cancellations-=1
726
call.activate_delay()
727
heappush(self._pendingTimedCalls, call)
728
self._newTimedCalls = []
731
# insert new delayed calls to make sure to include them in timeout value
732
self._insertNewDelayedCalls()
734
if not self._pendingTimedCalls:
737
return max(0, self._pendingTimedCalls[0].time - self.seconds())
740
def runUntilCurrent(self):
741
"""Run all pending timed calls.
743
if self.threadCallQueue:
744
# Keep track of how many calls we actually make, as we're
745
# making them, in case another call is added to the queue
746
# while we're in this loop.
748
total = len(self.threadCallQueue)
749
for (f, a, kw) in self.threadCallQueue:
757
del self.threadCallQueue[:count]
758
if self.threadCallQueue:
761
# insert new delayed calls now
762
self._insertNewDelayedCalls()
765
while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now):
766
call = heappop(self._pendingTimedCalls)
768
self._cancellations-=1
771
if call.delayed_time > 0:
772
call.activate_delay()
773
heappush(self._pendingTimedCalls, call)
778
call.func(*call.args, **call.kw)
781
if hasattr(call, "creator"):
783
e += " C: previous exception occurred in " + \
784
"a DelayedCall created here:\n"
786
e += "".join(call.creator).rstrip().replace("\n","\n C:")
791
if (self._cancellations > 50 and
792
self._cancellations > len(self._pendingTimedCalls) >> 1):
793
self._cancellations = 0
794
self._pendingTimedCalls = [x for x in self._pendingTimedCalls
796
heapify(self._pendingTimedCalls)
798
if self._justStopped:
799
self._justStopped = False
800
self.fireSystemEvent("shutdown")
804
def _checkProcessArgs(self, args, env):
806
Check for valid arguments and environment to spawnProcess.
808
@return: A two element tuple giving values to use when creating the
809
process. The first element of the tuple is a C{list} of C{str}
810
giving the values for argv of the child process. The second element
811
of the tuple is either C{None} if C{env} was C{None} or a C{dict}
812
mapping C{str} environment keys to C{str} environment values.
814
# Any unicode string which Python would successfully implicitly
815
# encode to a byte string would have worked before these explicit
816
# checks were added. Anything which would have failed with a
817
# UnicodeEncodeError during that implicit encoding step would have
818
# raised an exception in the child process and that would have been
819
# a pain in the butt to debug.
821
# So, we will explicitly attempt the same encoding which Python
822
# would implicitly do later. If it fails, we will report an error
823
# without ever spawning a child process. If it succeeds, we'll save
824
# the result so that Python doesn't need to do it implicitly later.
826
# For any unicode which we can actually encode, we'll also issue a
827
# deprecation warning, because no one should be passing unicode here
831
defaultEncoding = sys.getdefaultencoding()
833
# Common check function
836
Return either a str or None. If the given value is not
837
allowable for some reason, None is returned. Otherwise, a
838
possibly different object which should be used in place of arg
839
is returned. This forces unicode encoding to happen now, rather
840
than implicitly later.
842
if isinstance(arg, unicode):
844
arg = arg.encode(defaultEncoding)
845
except UnicodeEncodeError:
848
"Argument strings and environment keys/values passed to "
849
"reactor.spawnProcess should be str, not unicode.",
850
category=DeprecationWarning,
852
if isinstance(arg, str) and '\0' not in arg:
856
# Make a few tests to check input validity
857
if not isinstance(args, (tuple, list)):
858
raise TypeError("Arguments must be a tuple or list")
862
arg = argChecker(arg)
864
raise TypeError("Arguments contain a non-string value")
866
outputArgs.append(arg)
871
for key, val in env.iteritems():
872
key = argChecker(key)
874
raise TypeError("Environment contains a non-string key")
875
val = argChecker(val)
877
raise TypeError("Environment contains a non-string value")
879
return outputArgs, outputEnv
882
if platform.supportsThreads():
884
# ID of the trigger starting the threadpool
885
_threadpoolStartupID = None
886
# ID of the trigger stopping the threadpool
887
threadpoolShutdownID = None
889
def _initThreads(self):
890
self.usingThreads = True
891
self.resolver = ThreadedResolver(self)
894
def callFromThread(self, f, *args, **kw):
896
See L{twisted.internet.interfaces.IReactorThreads.callFromThread}.
898
assert callable(f), "%s is not callable" % (f,)
899
# lists are thread-safe in CPython, but not in Jython
900
# this is probably a bug in Jython, but until fixed this code
901
# won't work in Jython.
902
self.threadCallQueue.append((f, args, kw))
905
def _initThreadPool(self):
907
Create the threadpool accessible with callFromThread.
909
from twisted.python import threadpool
910
self.threadpool = threadpool.ThreadPool(
911
0, 10, 'twisted.internet.reactor')
912
self._threadpoolStartupID = self.callWhenRunning(
913
self.threadpool.start)
914
self.threadpoolShutdownID = self.addSystemEventTrigger(
915
'during', 'shutdown', self._stopThreadPool)
917
def _stopThreadPool(self):
919
Stop the reactor threadpool. This method is only valid if there
920
is currently a threadpool (created by L{_initThreadPool}). It
921
is not intended to be called directly; instead, it will be
922
called by a shutdown trigger created in L{_initThreadPool}.
924
triggers = [self._threadpoolStartupID, self.threadpoolShutdownID]
925
for trigger in filter(None, triggers):
927
self.removeSystemEventTrigger(trigger)
930
self._threadpoolStartupID = None
931
self.threadpoolShutdownID = None
932
self.threadpool.stop()
933
self.threadpool = None
936
def getThreadPool(self):
938
See L{twisted.internet.interfaces.IReactorThreads.getThreadPool}.
940
if self.threadpool is None:
941
self._initThreadPool()
942
return self.threadpool
945
def callInThread(self, _callable, *args, **kwargs):
947
See L{twisted.internet.interfaces.IReactorThreads.callInThread}.
949
self.getThreadPool().callInThread(_callable, *args, **kwargs)
951
def suggestThreadPoolSize(self, size):
953
See L{twisted.internet.interfaces.IReactorThreads.suggestThreadPoolSize}.
955
self.getThreadPool().adjustPoolsize(maxthreads=size)
957
# This is for signal handlers.
958
def callFromThread(self, f, *args, **kw):
959
assert callable(f), "%s is not callable" % (f,)
960
# See comment in the other callFromThread implementation.
961
self.threadCallQueue.append((f, args, kw))
963
if platform.supportsThreads():
964
classImplements(ReactorBase, IReactorThreads)
967
class BaseConnector(styles.Ephemeral):
968
"""Basic implementation of connector.
970
State can be: "connecting", "connected", "disconnected"
973
implements(IConnector)
978
def __init__(self, factory, timeout, reactor):
979
self.state = "disconnected"
980
self.reactor = reactor
981
self.factory = factory
982
self.timeout = timeout
984
def disconnect(self):
985
"""Disconnect whatever our state is."""
986
if self.state == 'connecting':
987
self.stopConnecting()
988
elif self.state == 'connected':
989
self.transport.loseConnection()
992
"""Start connection to remote server."""
993
if self.state != "disconnected":
994
raise RuntimeError, "can't connect in this state"
996
self.state = "connecting"
997
if not self.factoryStarted:
998
self.factory.doStart()
999
self.factoryStarted = 1
1000
self.transport = transport = self._makeTransport()
1001
if self.timeout is not None:
1002
self.timeoutID = self.reactor.callLater(self.timeout, transport.failIfNotConnected, error.TimeoutError())
1003
self.factory.startedConnecting(self)
1005
def stopConnecting(self):
1006
"""Stop attempting to connect."""
1007
if self.state != "connecting":
1008
raise error.NotConnectingError, "we're not trying to connect"
1010
self.state = "disconnected"
1011
self.transport.failIfNotConnected(error.UserError())
1014
def cancelTimeout(self):
1015
if self.timeoutID is not None:
1017
self.timeoutID.cancel()
1022
def buildProtocol(self, addr):
1023
self.state = "connected"
1024
self.cancelTimeout()
1025
return self.factory.buildProtocol(addr)
1027
def connectionFailed(self, reason):
1028
self.cancelTimeout()
1029
self.transport = None
1030
self.state = "disconnected"
1031
self.factory.clientConnectionFailed(self, reason)
1032
if self.state == "disconnected":
1033
# factory hasn't called our connect() method
1034
self.factory.doStop()
1035
self.factoryStarted = 0
1037
def connectionLost(self, reason):
1038
self.state = "disconnected"
1039
self.factory.clientConnectionLost(self, reason)
1040
if self.state == "disconnected":
1041
# factory hasn't called our connect() method
1042
self.factory.doStop()
1043
self.factoryStarted = 0
1045
def getDestination(self):
1046
raise NotImplementedError(
1047
reflect.qual(self.__class__) + " did not implement "
1052
class BasePort(abstract.FileDescriptor):
1053
"""Basic implementation of a ListeningPort.
1055
Note: This does not actually implement IListeningPort.
1058
addressFamily = None
1061
def createInternetSocket(self):
1062
s = socket.socket(self.addressFamily, self.socketType)
1064
fdesc._setCloseOnExec(s.fileno())
1069
"""Raises a RuntimeError"""
1070
raise RuntimeError, "doWrite called on a %s" % reflect.qual(self.__class__)
1074
class _SignalReactorMixin:
1076
Private mixin to manage signals: it installs signal handlers at start time,
1077
and define run method.
1079
It can only be used mixed in with L{ReactorBase}, and has to be defined
1080
first in the inheritance (so that method resolution order finds
1081
startRunning first).
1083
@type _installSignalHandlers: C{bool}
1084
@ivar _installSignalHandlers: A flag which indicates whether any signal
1085
handlers will be installed during startup. This includes handlers for
1086
SIGCHLD to monitor child processes, and SIGINT, SIGTERM, and SIGBREAK
1087
to stop the reactor.
1090
_installSignalHandlers = False
1092
def _handleSignals(self):
1094
Install the signal handlers for the Twisted event loop.
1099
log.msg("Warning: signal module unavailable -- "
1100
"not installing signal handlers.")
1103
if signal.getsignal(signal.SIGINT) == signal.default_int_handler:
1104
# only handle if there isn't already a handler, e.g. for Pdb.
1105
signal.signal(signal.SIGINT, self.sigInt)
1106
signal.signal(signal.SIGTERM, self.sigTerm)
1108
# Catch Ctrl-Break in windows
1109
if hasattr(signal, "SIGBREAK"):
1110
signal.signal(signal.SIGBREAK, self.sigBreak)
1112
if platformType == 'posix':
1113
signal.signal(signal.SIGCHLD, self._handleSigchld)
1114
# Also call the signal handler right now, in case we missed any
1115
# signals before we installed it. This should only happen if
1116
# someone used spawnProcess before calling reactor.run (and the
1117
# process also exited already).
1118
self._handleSigchld(signal.SIGCHLD, None)
1121
def _handleSigchld(self, signum, frame, _threadSupport=platform.supportsThreads()):
1123
Reap all processes on SIGCHLD.
1125
This gets called on SIGCHLD. We do no processing inside a signal
1126
handler, as the calls we make here could occur between any two
1127
python bytecode instructions. Deferring processing to the next
1128
eventloop round prevents us from violating the state constraints
1129
of arbitrary classes.
1131
from twisted.internet.process import reapAllProcesses
1133
self.callFromThread(reapAllProcesses)
1135
self.callLater(0, reapAllProcesses)
1138
def startRunning(self, installSignalHandlers=True):
1140
Extend the base implementation in order to remember whether signal
1141
handlers should be installed later.
1143
@type installSignalHandlers: C{bool}
1144
@param installSignalHandlers: A flag which, if set, indicates that
1145
handlers for a number of (implementation-defined) signals should be
1146
installed during startup.
1148
self._installSignalHandlers = installSignalHandlers
1149
ReactorBase.startRunning(self)
1152
def _reallyStartRunning(self):
1154
Extend the base implementation by also installing signal handlers, if
1155
C{self._installSignalHandlers} is true.
1157
ReactorBase._reallyStartRunning(self)
1158
if self._installSignalHandlers:
1159
# Make sure this happens before after-startup events, since the
1160
# expectation of after-startup is that the reactor is fully
1161
# initialized. Don't do it right away for historical reasons
1162
# (perhaps some before-startup triggers don't want there to be a
1163
# custom SIGCHLD handler so that they can run child processes with
1164
# some blocking api).
1165
self._handleSignals()
1168
def run(self, installSignalHandlers=True):
1169
self.startRunning(installSignalHandlers=installSignalHandlers)
1174
while self._started:
1176
while self._started:
1177
# Advance simulation time in delayed event
1179
self.runUntilCurrent()
1181
t = self.running and t2
1184
log.msg("Unexpected error in main loop.")
1187
log.msg('Main loop terminated.')