1
# -*- test-case-name: twisted.test.test_internet -*-
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
"""Very basic functionality for a Reactor implementation.
10
Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
13
import socket # needed only for sync-dns
14
from zope.interface import implements, classImplements
20
from heapq import heappush, heappop, heapreplace, heapify
28
from twisted.internet.interfaces import IReactorCore, IReactorTime, IReactorThreads
29
from twisted.internet.interfaces import IResolverSimple, IReactorPluggableResolver
30
from twisted.internet.interfaces import IConnector, IDelayedCall
31
from twisted.internet import main, error, abstract, defer, threads
32
from twisted.python import log, failure, reflect
33
from twisted.python.runtime import seconds, platform
34
from twisted.internet.defer import Deferred, DeferredList
35
from twisted.persisted import styles
37
# This import is for side-effects! Even if you don't see any code using it
38
# in this module, don't delete it.
39
from twisted.python import threadable
41
class DelayedCall(styles.Ephemeral):
43
implements(IDelayedCall)
44
# enable .debug to record creator call stack, and it will be logged if
45
# an exception occurs while the function is being run
49
def __init__(self, time, func, args, kw, cancel, reset, seconds=None):
51
@param time: Seconds from the epoch at which to call C{func}.
52
@param func: The callable to call.
53
@param args: The positional arguments to pass to the callable.
54
@param kw: The keyword arguments to pass to the callable.
55
@param cancel: A callable which will be called with this
56
DelayedCall before cancellation.
57
@param reset: A callable which will be called with this
58
DelayedCall after changing this DelayedCall's scheduled
59
execution time. The callable should adjust any necessary
60
scheduling details to ensure this DelayedCall is invoked
61
at the new appropriate time.
62
@param seconds: If provided, a no-argument callable which will be
63
used to determine the current time any time that information is
66
self.time, self.func, self.args, self.kw = time, func, args, kw
68
self.canceller = cancel
69
self.seconds = seconds
70
self.cancelled = self.called = 0
73
self.creator = traceback.format_stack()[:-2]
76
"""Return the time at which this call will fire
79
@return: The number of seconds after the epoch at which this call is
82
return self.time + self.delayed_time
85
"""Unschedule this call
87
@raise AlreadyCancelled: Raised if this call has already been
90
@raise AlreadyCalled: Raised if this call has already been made.
93
raise error.AlreadyCancelled
95
raise error.AlreadyCalled
100
self._str = str(self)
101
del self.func, self.args, self.kw
103
def reset(self, secondsFromNow):
104
"""Reschedule this call for a different time
106
@type secondsFromNow: C{float}
107
@param secondsFromNow: The number of seconds from the time of the
108
C{reset} call at which this call will be scheduled.
110
@raise AlreadyCancelled: Raised if this call has been cancelled.
111
@raise AlreadyCalled: Raised if this call has already been made.
114
raise error.AlreadyCancelled
116
raise error.AlreadyCalled
118
if self.seconds is None:
119
new_time = seconds() + secondsFromNow
121
new_time = self.seconds() + secondsFromNow
122
if new_time < self.time:
123
self.delayed_time = 0
127
self.delayed_time = new_time - self.time
129
def delay(self, secondsLater):
130
"""Reschedule this call for a later time
132
@type secondsLater: C{float}
133
@param secondsLater: The number of seconds after the originally
134
scheduled time for which to reschedule this call.
136
@raise AlreadyCancelled: Raised if this call has been cancelled.
137
@raise AlreadyCalled: Raised if this call has already been made.
140
raise error.AlreadyCancelled
142
raise error.AlreadyCalled
144
self.delayed_time += secondsLater
145
if self.delayed_time < 0:
146
self.activate_delay()
149
def activate_delay(self):
150
self.time += self.delayed_time
151
self.delayed_time = 0
154
"""Determine whether this call is still pending
157
@return: True if this call has not yet been made or cancelled,
160
return not (self.cancelled or self.called)
162
def __le__(self, other):
163
return self.time <= other.time
166
if self._str is not None:
168
if hasattr(self, 'func'):
169
if hasattr(self.func, 'func_name'):
170
func = self.func.func_name
171
if hasattr(self.func, 'im_class'):
172
func = self.func.im_class.__name__ + '.' + func
174
func = reflect.safe_repr(self.func)
178
if self.seconds is None:
182
L = ["<DelayedCall %s [%ss] called=%s cancelled=%s" % (
183
id(self), self.time - now, self.called, self.cancelled)]
185
L.extend((" ", func, "("))
187
L.append(", ".join([reflect.safe_repr(e) for e in self.args]))
191
L.append(", ".join(['%s=%s' % (k, reflect.safe_repr(v)) for (k, v) in self.kw.iteritems()]))
195
L.append("\n\ntraceback at creation: \n\n%s" % (' '.join(self.creator)))
201
class ThreadedResolver:
202
implements(IResolverSimple)
204
def __init__(self, reactor):
205
self.reactor = reactor
206
self._runningQueries = {}
208
def _fail(self, name, err):
209
err = error.DNSLookupError("address %r not found: %s" % (name, err))
210
return failure.Failure(err)
212
def _cleanup(self, name, lookupDeferred):
213
userDeferred, cancelCall = self._runningQueries[lookupDeferred]
214
del self._runningQueries[lookupDeferred]
215
userDeferred.errback(self._fail(name, "timeout error"))
217
def _checkTimeout(self, result, name, lookupDeferred):
219
userDeferred, cancelCall = self._runningQueries[lookupDeferred]
223
del self._runningQueries[lookupDeferred]
226
if isinstance(result, failure.Failure):
227
userDeferred.errback(self._fail(name, result.getErrorMessage()))
229
userDeferred.callback(result)
231
def getHostByName(self, name, timeout = (1, 3, 11, 45)):
233
timeoutDelay = reduce(operator.add, timeout)
236
userDeferred = defer.Deferred()
237
lookupDeferred = threads.deferToThread(socket.gethostbyname, name)
238
cancelCall = self.reactor.callLater(
239
timeoutDelay, self._cleanup, name, lookupDeferred)
240
self._runningQueries[lookupDeferred] = (userDeferred, cancelCall)
241
lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred)
244
class BlockingResolver:
245
implements(IResolverSimple)
247
def getHostByName(self, name, timeout = (1, 3, 11, 45)):
249
address = socket.gethostbyname(name)
251
msg = "address %r not found" % (name,)
252
err = error.DNSLookupError(msg)
253
return defer.fail(err)
255
return defer.succeed(address)
257
class ReactorBase(object):
258
"""Default base class for Reactors.
261
implements(IReactorCore, IReactorTime, IReactorPluggableResolver)
265
resolver = BlockingResolver()
267
__name__ = "twisted.internet.reactor"
270
self.threadCallQueue = []
271
self._eventTriggers = {}
272
self._pendingTimedCalls = []
273
self._newTimedCalls = []
274
self._cancellations = 0
278
self.addSystemEventTrigger('during', 'shutdown', self.crash)
279
self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
281
if platform.supportsThreads():
284
# override in subclasses
288
def installWaker(self):
289
raise NotImplementedError()
291
def installResolver(self, resolver):
292
assert IResolverSimple.providedBy(resolver)
293
oldResolver = self.resolver
294
self.resolver = resolver
298
"""Wake up the event loop."""
299
if not threadable.isInIOThread():
302
# if the waker isn't installed, the reactor isn't running, and
303
# therefore doesn't need to be woken up
305
def doIteration(self, delay):
306
"""Do one iteration over the readers and writers we know about."""
307
raise NotImplementedError
309
def addReader(self, reader):
310
raise NotImplementedError
312
def addWriter(self, writer):
313
raise NotImplementedError
315
def removeReader(self, reader):
316
raise NotImplementedError
318
def removeWriter(self, writer):
319
raise NotImplementedError
322
raise NotImplementedError
324
def resolve(self, name, timeout = (1, 3, 11, 45)):
325
"""Return a Deferred that will resolve a hostname.
328
# XXX - This is *less than* '::', and will screw up IPv6 servers
329
return defer.succeed('0.0.0.0')
330
if abstract.isIPAddress(name):
331
return defer.succeed(name)
332
return self.resolver.getHostByName(name, timeout)
339
"""See twisted.internet.interfaces.IReactorCore.stop.
342
raise RuntimeError, "can't stop reactor that isn't running"
343
self.fireSystemEvent("shutdown")
346
"""See twisted.internet.interfaces.IReactorCore.crash.
350
def sigInt(self, *args):
351
"""Handle a SIGINT interrupt.
353
log.msg("Received SIGINT, shutting down.")
354
self.callFromThread(self.stop)
356
def sigBreak(self, *args):
357
"""Handle a SIGBREAK interrupt.
359
log.msg("Received SIGBREAK, shutting down.")
360
self.callFromThread(self.stop)
362
def sigTerm(self, *args):
363
"""Handle a SIGTERM interrupt.
365
log.msg("Received SIGTERM, shutting down.")
366
self.callFromThread(self.stop)
368
def disconnectAll(self):
369
"""Disconnect every reader, and writer in the system.
371
selectables = self.removeAll()
372
for reader in selectables:
373
log.callWithLogger(reader,
374
reader.connectionLost,
375
failure.Failure(main.CONNECTION_LOST))
378
def iterate(self, delay=0):
379
"""See twisted.internet.interfaces.IReactorCore.iterate.
381
self.runUntilCurrent()
382
self.doIteration(delay)
384
def fireSystemEvent(self, eventType):
385
"""See twisted.internet.interfaces.IReactorCore.fireSystemEvent.
387
sysEvtTriggers = self._eventTriggers.get(eventType)
388
if not sysEvtTriggers:
391
for callable, args, kw in sysEvtTriggers[0]:
393
d = callable(*args, **kw)
397
if isinstance(d, Deferred):
400
DeferredList(defrList).addBoth(self._cbContinueSystemEvent, eventType)
402
self.callLater(0, self._continueSystemEvent, eventType)
405
def _cbContinueSystemEvent(self, result, eventType):
406
self._continueSystemEvent(eventType)
409
def _continueSystemEvent(self, eventType):
410
sysEvtTriggers = self._eventTriggers.get(eventType)
411
for callList in sysEvtTriggers[1], sysEvtTriggers[2]:
412
for callable, args, kw in callList:
414
callable(*args, **kw)
417
# now that we've called all callbacks, no need to store
418
# references to them anymore, in fact this can cause problems.
419
del self._eventTriggers[eventType]
421
def addSystemEventTrigger(self, _phase, _eventType, _f, *args, **kw):
422
"""See twisted.internet.interfaces.IReactorCore.addSystemEventTrigger.
424
assert callable(_f), "%s is not callable" % _f
425
if self._eventTriggers.has_key(_eventType):
426
triglist = self._eventTriggers[_eventType]
428
triglist = [[], [], []]
429
self._eventTriggers[_eventType] = triglist
430
evtList = triglist[{"before": 0, "during": 1, "after": 2}[_phase]]
431
evtList.append((_f, args, kw))
432
return (_phase, _eventType, (_f, args, kw))
434
def removeSystemEventTrigger(self, triggerID):
435
"""See twisted.internet.interfaces.IReactorCore.removeSystemEventTrigger.
437
phase, eventType, item = triggerID
438
self._eventTriggers[eventType][{"before": 0,
443
def callWhenRunning(self, _callable, *args, **kw):
444
"""See twisted.internet.interfaces.IReactorCore.callWhenRunning.
447
_callable(*args, **kw)
449
return self.addSystemEventTrigger('after', 'startup',
450
_callable, *args, **kw)
454
def callLater(self, _seconds, _f, *args, **kw):
455
"""See twisted.internet.interfaces.IReactorTime.callLater.
457
assert callable(_f), "%s is not callable" % _f
458
assert sys.maxint >= _seconds >= 0, \
459
"%s is not greater than or equal to 0 seconds" % (_seconds,)
460
tple = DelayedCall(seconds() + _seconds, _f, args, kw,
461
self._cancelCallLater,
462
self._moveCallLaterSooner)
463
self._newTimedCalls.append(tple)
466
def _moveCallLaterSooner(self, tple):
467
# Linear time find: slow.
468
heap = self._pendingTimedCalls
470
pos = heap.index(tple)
472
# Move elt up the heap until it rests at the right place.
475
parent = (pos-1) // 2
476
if heap[parent] <= elt:
479
heap[pos] = heap[parent]
483
# element was not found in heap - oh well...
486
def _cancelCallLater(self, tple):
487
self._cancellations+=1
489
def cancelCallLater(self, callID):
490
"""See twisted.internet.interfaces.IReactorTime.cancelCallLater.
492
# DO NOT DELETE THIS - this is documented in Python in a Nutshell, so we
493
# we can't get rid of it for a long time.
494
warnings.warn("reactor.cancelCallLater(callID) is deprecated - use callID.cancel() instead")
497
def getDelayedCalls(self):
498
"""Return all the outstanding delayed calls in the system.
499
They are returned in no particular order.
500
This method is not efficient -- it is really only meant for
502
return [x for x in (self._pendingTimedCalls + self._newTimedCalls) if not x.cancelled]
504
def _insertNewDelayedCalls(self):
505
for call in self._newTimedCalls:
507
self._cancellations-=1
509
call.activate_delay()
510
heappush(self._pendingTimedCalls, call)
511
self._newTimedCalls = []
514
# insert new delayed calls to make sure to include them in timeout value
515
self._insertNewDelayedCalls()
517
if not self._pendingTimedCalls:
520
return max(0, self._pendingTimedCalls[0].time - seconds())
522
def runUntilCurrent(self):
523
"""Run all pending timed calls.
525
if self.threadCallQueue:
526
# Keep track of how many calls we actually make, as we're
527
# making them, in case another call is added to the queue
528
# while we're in this loop.
530
total = len(self.threadCallQueue)
531
for (f, a, kw) in self.threadCallQueue:
539
del self.threadCallQueue[:count]
540
if self.threadCallQueue:
544
# insert new delayed calls now
545
self._insertNewDelayedCalls()
548
while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now):
549
call = heappop(self._pendingTimedCalls)
551
self._cancellations-=1
554
if call.delayed_time > 0:
555
call.activate_delay()
556
heappush(self._pendingTimedCalls, call)
561
call.func(*call.args, **call.kw)
564
if hasattr(call, "creator"):
566
e += " C: previous exception occurred in " + \
567
"a DelayedCall created here:\n"
569
e += "".join(call.creator).rstrip().replace("\n","\n C:")
574
if (self._cancellations > 50 and
575
self._cancellations > len(self._pendingTimedCalls) >> 1):
576
self._cancellations = 0
577
self._pendingTimedCalls = [x for x in self._pendingTimedCalls
579
heapify(self._pendingTimedCalls)
582
if platform.supportsThreads():
585
def _initThreads(self):
586
self.usingThreads = 1
587
self.resolver = ThreadedResolver(self)
590
def callFromThread(self, f, *args, **kw):
591
"""See twisted.internet.interfaces.IReactorThreads.callFromThread.
593
assert callable(f), "%s is not callable" % (f,)
594
# lists are thread-safe in CPython, but not in Jython
595
# this is probably a bug in Jython, but until fixed this code
596
# won't work in Jython.
597
self.threadCallQueue.append((f, args, kw))
600
def _initThreadPool(self):
601
from twisted.python import threadpool
602
self.threadpool = threadpool.ThreadPool(0, 10, 'twisted.internet.reactor')
603
self.callWhenRunning(self.threadpool.start)
604
self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
606
def callInThread(self, _callable, *args, **kwargs):
607
"""See twisted.internet.interfaces.IReactorThreads.callInThread.
609
if self.threadpool is None:
610
self._initThreadPool()
611
self.threadpool.callInThread(_callable, *args, **kwargs)
613
def suggestThreadPoolSize(self, size):
614
"""See twisted.internet.interfaces.IReactorThreads.suggestThreadPoolSize.
616
if size == 0 and not self.threadpool:
618
if not self.threadpool:
619
self._initThreadPool()
620
self.threadpool.adjustPoolsize(maxthreads=size)
622
# This is for signal handlers.
623
def callFromThread(self, f, *args, **kw):
624
assert callable(f), "%s is not callable" % (f,)
625
# See comment in the other callFromThread implementation.
626
self.threadCallQueue.append((f, args, kw))
628
if platform.supportsThreads():
629
classImplements(ReactorBase, IReactorThreads)
632
class BaseConnector(styles.Ephemeral):
633
"""Basic implementation of connector.
635
State can be: "connecting", "connected", "disconnected"
638
implements(IConnector)
643
def __init__(self, factory, timeout, reactor):
644
self.state = "disconnected"
645
self.reactor = reactor
646
self.factory = factory
647
self.timeout = timeout
649
def disconnect(self):
650
"""Disconnect whatever our state is."""
651
if self.state == 'connecting':
652
self.stopConnecting()
653
elif self.state == 'connected':
654
self.transport.loseConnection()
657
"""Start connection to remote server."""
658
if self.state != "disconnected":
659
raise RuntimeError, "can't connect in this state"
661
self.state = "connecting"
662
if not self.factoryStarted:
663
self.factory.doStart()
664
self.factoryStarted = 1
665
self.transport = transport = self._makeTransport()
666
if self.timeout is not None:
667
self.timeoutID = self.reactor.callLater(self.timeout, transport.failIfNotConnected, error.TimeoutError())
668
self.factory.startedConnecting(self)
670
def stopConnecting(self):
671
"""Stop attempting to connect."""
672
if self.state != "connecting":
673
raise error.NotConnectingError, "we're not trying to connect"
675
self.state = "disconnected"
676
self.transport.failIfNotConnected(error.UserError())
679
def cancelTimeout(self):
680
if self.timeoutID is not None:
682
self.timeoutID.cancel()
687
def buildProtocol(self, addr):
688
self.state = "connected"
690
return self.factory.buildProtocol(addr)
692
def connectionFailed(self, reason):
694
self.transport = None
695
self.state = "disconnected"
696
self.factory.clientConnectionFailed(self, reason)
697
if self.state == "disconnected":
698
# factory hasn't called our connect() method
699
self.factory.doStop()
700
self.factoryStarted = 0
702
def connectionLost(self, reason):
703
self.state = "disconnected"
704
self.factory.clientConnectionLost(self, reason)
705
if self.state == "disconnected":
706
# factory hasn't called our connect() method
707
self.factory.doStop()
708
self.factoryStarted = 0
710
def getDestination(self):
711
raise NotImplementedError, "implement in subclasses"
714
class BasePort(abstract.FileDescriptor):
715
"""Basic implementation of a ListeningPort.
717
Note: This does not actually implement IListeningPort.
723
def createInternetSocket(self):
724
s = socket.socket(self.addressFamily, self.socketType)
726
if fcntl and hasattr(fcntl, 'FD_CLOEXEC'):
727
old = fcntl.fcntl(s.fileno(), fcntl.F_GETFD)
728
fcntl.fcntl(s.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC)
733
"""Raises a RuntimeError"""
734
raise RuntimeError, "doWrite called on a %s" % reflect.qual(self.__class__)