~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/internet/base.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# -*- test-case-name: twisted.test.test_internet -*-
2
 
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
 
# See LICENSE for details.
4
 
 
5
 
 
6
 
"""Very basic functionality for a Reactor implementation.
7
 
 
8
 
API Stability: stable
9
 
 
10
 
Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
11
 
"""
12
 
 
13
 
import socket # needed only for sync-dns
14
 
from zope.interface import implements, classImplements
15
 
 
16
 
import imp
17
 
import sys
18
 
import warnings
19
 
import operator
20
 
from heapq import heappush, heappop, heapreplace, heapify
21
 
 
22
 
try:
23
 
    import fcntl
24
 
except ImportError:
25
 
    fcntl = None
26
 
import traceback
27
 
 
28
 
from twisted.internet.interfaces import IReactorCore, IReactorTime, IReactorThreads
29
 
from twisted.internet.interfaces import IResolverSimple, IReactorPluggableResolver
30
 
from twisted.internet.interfaces import IConnector, IDelayedCall
31
 
from twisted.internet import main, error, abstract, defer, threads
32
 
from twisted.python import log, failure, reflect
33
 
from twisted.python.runtime import seconds, platform
34
 
from twisted.internet.defer import Deferred, DeferredList
35
 
from twisted.persisted import styles
36
 
 
37
 
# This import is for side-effects!  Even if you don't see any code using it
38
 
# in this module, don't delete it.
39
 
from twisted.python import threadable
40
 
 
41
 
class DelayedCall(styles.Ephemeral):
42
 
 
43
 
    implements(IDelayedCall)
44
 
    # enable .debug to record creator call stack, and it will be logged if
45
 
    # an exception occurs while the function is being run
46
 
    debug = False
47
 
    _str = None
48
 
 
49
 
    def __init__(self, time, func, args, kw, cancel, reset, seconds=None):
50
 
        """
51
 
        @param time: Seconds from the epoch at which to call C{func}.
52
 
        @param func: The callable to call.
53
 
        @param args: The positional arguments to pass to the callable.
54
 
        @param kw: The keyword arguments to pass to the callable.
55
 
        @param cancel: A callable which will be called with this
56
 
            DelayedCall before cancellation.
57
 
        @param reset: A callable which will be called with this
58
 
            DelayedCall after changing this DelayedCall's scheduled
59
 
            execution time. The callable should adjust any necessary
60
 
            scheduling details to ensure this DelayedCall is invoked
61
 
            at the new appropriate time.
62
 
        @param seconds: If provided, a no-argument callable which will be
63
 
            used to determine the current time any time that information is
64
 
            needed.
65
 
        """
66
 
        self.time, self.func, self.args, self.kw = time, func, args, kw
67
 
        self.resetter = reset
68
 
        self.canceller = cancel
69
 
        self.seconds = seconds
70
 
        self.cancelled = self.called = 0
71
 
        self.delayed_time = 0
72
 
        if self.debug:
73
 
            self.creator = traceback.format_stack()[:-2]
74
 
 
75
 
    def getTime(self):
76
 
        """Return the time at which this call will fire
77
 
 
78
 
        @rtype: C{float}
79
 
        @return: The number of seconds after the epoch at which this call is
80
 
        scheduled to be made.
81
 
        """
82
 
        return self.time + self.delayed_time
83
 
 
84
 
    def cancel(self):
85
 
        """Unschedule this call
86
 
 
87
 
        @raise AlreadyCancelled: Raised if this call has already been
88
 
        unscheduled.
89
 
 
90
 
        @raise AlreadyCalled: Raised if this call has already been made.
91
 
        """
92
 
        if self.cancelled:
93
 
            raise error.AlreadyCancelled
94
 
        elif self.called:
95
 
            raise error.AlreadyCalled
96
 
        else:
97
 
            self.canceller(self)
98
 
            self.cancelled = 1
99
 
            if self.debug:
100
 
                self._str = str(self)
101
 
            del self.func, self.args, self.kw
102
 
 
103
 
    def reset(self, secondsFromNow):
104
 
        """Reschedule this call for a different time
105
 
 
106
 
        @type secondsFromNow: C{float}
107
 
        @param secondsFromNow: The number of seconds from the time of the
108
 
        C{reset} call at which this call will be scheduled.
109
 
 
110
 
        @raise AlreadyCancelled: Raised if this call has been cancelled.
111
 
        @raise AlreadyCalled: Raised if this call has already been made.
112
 
        """
113
 
        if self.cancelled:
114
 
            raise error.AlreadyCancelled
115
 
        elif self.called:
116
 
            raise error.AlreadyCalled
117
 
        else:
118
 
            if self.seconds is None:
119
 
                new_time = seconds() + secondsFromNow
120
 
            else:
121
 
                new_time = self.seconds() + secondsFromNow
122
 
            if new_time < self.time:
123
 
                self.delayed_time = 0
124
 
                self.time = new_time
125
 
                self.resetter(self)
126
 
            else:
127
 
                self.delayed_time = new_time - self.time
128
 
 
129
 
    def delay(self, secondsLater):
130
 
        """Reschedule this call for a later time
131
 
 
132
 
        @type secondsLater: C{float}
133
 
        @param secondsLater: The number of seconds after the originally
134
 
        scheduled time for which to reschedule this call.
135
 
 
136
 
        @raise AlreadyCancelled: Raised if this call has been cancelled.
137
 
        @raise AlreadyCalled: Raised if this call has already been made.
138
 
        """
139
 
        if self.cancelled:
140
 
            raise error.AlreadyCancelled
141
 
        elif self.called:
142
 
            raise error.AlreadyCalled
143
 
        else:
144
 
            self.delayed_time += secondsLater
145
 
            if self.delayed_time < 0:
146
 
                self.activate_delay()
147
 
                self.resetter(self)
148
 
 
149
 
    def activate_delay(self):
150
 
        self.time += self.delayed_time
151
 
        self.delayed_time = 0
152
 
 
153
 
    def active(self):
154
 
        """Determine whether this call is still pending
155
 
 
156
 
        @rtype: C{bool}
157
 
        @return: True if this call has not yet been made or cancelled,
158
 
        False otherwise.
159
 
        """
160
 
        return not (self.cancelled or self.called)
161
 
 
162
 
    def __le__(self, other):
163
 
        return self.time <= other.time
164
 
 
165
 
    def __str__(self):
166
 
        if self._str is not None:
167
 
            return self._str
168
 
        if hasattr(self, 'func'):
169
 
            if hasattr(self.func, 'func_name'):
170
 
                func = self.func.func_name
171
 
                if hasattr(self.func, 'im_class'):
172
 
                    func = self.func.im_class.__name__ + '.' + func
173
 
            else:
174
 
                func = reflect.safe_repr(self.func)
175
 
        else:
176
 
            func = None
177
 
 
178
 
        if self.seconds is None:
179
 
            now = seconds()
180
 
        else:
181
 
            now = self.seconds()
182
 
        L = ["<DelayedCall %s [%ss] called=%s cancelled=%s" % (
183
 
                id(self), self.time - now, self.called, self.cancelled)]
184
 
        if func is not None:
185
 
            L.extend((" ", func, "("))
186
 
            if self.args:
187
 
                L.append(", ".join([reflect.safe_repr(e) for e in self.args]))
188
 
                if self.kw:
189
 
                    L.append(", ")
190
 
            if self.kw:
191
 
                L.append(", ".join(['%s=%s' % (k, reflect.safe_repr(v)) for (k, v) in self.kw.iteritems()]))
192
 
            L.append(")")
193
 
 
194
 
        if self.debug:
195
 
            L.append("\n\ntraceback at creation: \n\n%s" % ('    '.join(self.creator)))
196
 
        L.append('>')
197
 
 
198
 
        return "".join(L)
199
 
 
200
 
 
201
 
class ThreadedResolver:
202
 
    implements(IResolverSimple)
203
 
 
204
 
    def __init__(self, reactor):
205
 
        self.reactor = reactor
206
 
        self._runningQueries = {}
207
 
 
208
 
    def _fail(self, name, err):
209
 
        err = error.DNSLookupError("address %r not found: %s" % (name, err))
210
 
        return failure.Failure(err)
211
 
 
212
 
    def _cleanup(self, name, lookupDeferred):
213
 
        userDeferred, cancelCall = self._runningQueries[lookupDeferred]
214
 
        del self._runningQueries[lookupDeferred]
215
 
        userDeferred.errback(self._fail(name, "timeout error"))
216
 
 
217
 
    def _checkTimeout(self, result, name, lookupDeferred):
218
 
        try:
219
 
            userDeferred, cancelCall = self._runningQueries[lookupDeferred]
220
 
        except KeyError:
221
 
            pass
222
 
        else:
223
 
            del self._runningQueries[lookupDeferred]
224
 
            cancelCall.cancel()
225
 
 
226
 
            if isinstance(result, failure.Failure):
227
 
                userDeferred.errback(self._fail(name, result.getErrorMessage()))
228
 
            else:
229
 
                userDeferred.callback(result)
230
 
 
231
 
    def getHostByName(self, name, timeout = (1, 3, 11, 45)):
232
 
        if timeout:
233
 
            timeoutDelay = reduce(operator.add, timeout)
234
 
        else:
235
 
            timeoutDelay = 60
236
 
        userDeferred = defer.Deferred()
237
 
        lookupDeferred = threads.deferToThread(socket.gethostbyname, name)
238
 
        cancelCall = self.reactor.callLater(
239
 
            timeoutDelay, self._cleanup, name, lookupDeferred)
240
 
        self._runningQueries[lookupDeferred] = (userDeferred, cancelCall)
241
 
        lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred)
242
 
        return userDeferred
243
 
 
244
 
class BlockingResolver:
245
 
    implements(IResolverSimple)
246
 
 
247
 
    def getHostByName(self, name, timeout = (1, 3, 11, 45)):
248
 
        try:
249
 
            address = socket.gethostbyname(name)
250
 
        except socket.error:
251
 
            msg = "address %r not found" % (name,)
252
 
            err = error.DNSLookupError(msg)
253
 
            return defer.fail(err)
254
 
        else:
255
 
            return defer.succeed(address)
256
 
 
257
 
class ReactorBase(object):
258
 
    """Default base class for Reactors.
259
 
    """
260
 
 
261
 
    implements(IReactorCore, IReactorTime, IReactorPluggableResolver)
262
 
 
263
 
    installed = 0
264
 
    usingThreads = 0
265
 
    resolver = BlockingResolver()
266
 
 
267
 
    __name__ = "twisted.internet.reactor"
268
 
 
269
 
    def __init__(self):
270
 
        self.threadCallQueue = []
271
 
        self._eventTriggers = {}
272
 
        self._pendingTimedCalls = []
273
 
        self._newTimedCalls = []
274
 
        self._cancellations = 0
275
 
        self.running = 0
276
 
        self.waker = None
277
 
 
278
 
        self.addSystemEventTrigger('during', 'shutdown', self.crash)
279
 
        self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
280
 
 
281
 
        if platform.supportsThreads():
282
 
            self._initThreads()
283
 
 
284
 
    # override in subclasses
285
 
 
286
 
    _lock = None
287
 
 
288
 
    def installWaker(self):
289
 
        raise NotImplementedError()
290
 
 
291
 
    def installResolver(self, resolver):
292
 
        assert IResolverSimple.providedBy(resolver)
293
 
        oldResolver = self.resolver
294
 
        self.resolver = resolver
295
 
        return oldResolver
296
 
 
297
 
    def wakeUp(self):
298
 
        """Wake up the event loop."""
299
 
        if not threadable.isInIOThread():
300
 
            if self.waker:
301
 
                self.waker.wakeUp()
302
 
            # if the waker isn't installed, the reactor isn't running, and
303
 
            # therefore doesn't need to be woken up
304
 
 
305
 
    def doIteration(self, delay):
306
 
        """Do one iteration over the readers and writers we know about."""
307
 
        raise NotImplementedError
308
 
 
309
 
    def addReader(self, reader):
310
 
        raise NotImplementedError
311
 
 
312
 
    def addWriter(self, writer):
313
 
        raise NotImplementedError
314
 
 
315
 
    def removeReader(self, reader):
316
 
        raise NotImplementedError
317
 
 
318
 
    def removeWriter(self, writer):
319
 
        raise NotImplementedError
320
 
 
321
 
    def removeAll(self):
322
 
        raise NotImplementedError
323
 
 
324
 
    def resolve(self, name, timeout = (1, 3, 11, 45)):
325
 
        """Return a Deferred that will resolve a hostname.
326
 
        """
327
 
        if not name:
328
 
            # XXX - This is *less than* '::', and will screw up IPv6 servers
329
 
            return defer.succeed('0.0.0.0')
330
 
        if abstract.isIPAddress(name):
331
 
            return defer.succeed(name)
332
 
        return self.resolver.getHostByName(name, timeout)
333
 
 
334
 
    # Installation.
335
 
 
336
 
    # IReactorCore
337
 
 
338
 
    def stop(self):
339
 
        """See twisted.internet.interfaces.IReactorCore.stop.
340
 
        """
341
 
        if not self.running:
342
 
            raise RuntimeError, "can't stop reactor that isn't running"
343
 
        self.fireSystemEvent("shutdown")
344
 
 
345
 
    def crash(self):
346
 
        """See twisted.internet.interfaces.IReactorCore.crash.
347
 
        """
348
 
        self.running = 0
349
 
 
350
 
    def sigInt(self, *args):
351
 
        """Handle a SIGINT interrupt.
352
 
        """
353
 
        log.msg("Received SIGINT, shutting down.")
354
 
        self.callFromThread(self.stop)
355
 
 
356
 
    def sigBreak(self, *args):
357
 
        """Handle a SIGBREAK interrupt.
358
 
        """
359
 
        log.msg("Received SIGBREAK, shutting down.")
360
 
        self.callFromThread(self.stop)
361
 
 
362
 
    def sigTerm(self, *args):
363
 
        """Handle a SIGTERM interrupt.
364
 
        """
365
 
        log.msg("Received SIGTERM, shutting down.")
366
 
        self.callFromThread(self.stop)
367
 
 
368
 
    def disconnectAll(self):
369
 
        """Disconnect every reader, and writer in the system.
370
 
        """
371
 
        selectables = self.removeAll()
372
 
        for reader in selectables:
373
 
            log.callWithLogger(reader,
374
 
                               reader.connectionLost,
375
 
                               failure.Failure(main.CONNECTION_LOST))
376
 
 
377
 
 
378
 
    def iterate(self, delay=0):
379
 
        """See twisted.internet.interfaces.IReactorCore.iterate.
380
 
        """
381
 
        self.runUntilCurrent()
382
 
        self.doIteration(delay)
383
 
 
384
 
    def fireSystemEvent(self, eventType):
385
 
        """See twisted.internet.interfaces.IReactorCore.fireSystemEvent.
386
 
        """
387
 
        sysEvtTriggers = self._eventTriggers.get(eventType)
388
 
        if not sysEvtTriggers:
389
 
            return
390
 
        defrList = []
391
 
        for callable, args, kw in sysEvtTriggers[0]:
392
 
            try:
393
 
                d = callable(*args, **kw)
394
 
            except:
395
 
                log.deferr()
396
 
            else:
397
 
                if isinstance(d, Deferred):
398
 
                    defrList.append(d)
399
 
        if defrList:
400
 
            DeferredList(defrList).addBoth(self._cbContinueSystemEvent, eventType)
401
 
        else:
402
 
            self.callLater(0, self._continueSystemEvent, eventType)
403
 
 
404
 
 
405
 
    def _cbContinueSystemEvent(self, result, eventType):
406
 
        self._continueSystemEvent(eventType)
407
 
 
408
 
 
409
 
    def _continueSystemEvent(self, eventType):
410
 
        sysEvtTriggers = self._eventTriggers.get(eventType)
411
 
        for callList in sysEvtTriggers[1], sysEvtTriggers[2]:
412
 
            for callable, args, kw in callList:
413
 
                try:
414
 
                    callable(*args, **kw)
415
 
                except:
416
 
                    log.deferr()
417
 
        # now that we've called all callbacks, no need to store
418
 
        # references to them anymore, in fact this can cause problems.
419
 
        del self._eventTriggers[eventType]
420
 
 
421
 
    def addSystemEventTrigger(self, _phase, _eventType, _f, *args, **kw):
422
 
        """See twisted.internet.interfaces.IReactorCore.addSystemEventTrigger.
423
 
        """
424
 
        assert callable(_f), "%s is not callable" % _f
425
 
        if self._eventTriggers.has_key(_eventType):
426
 
            triglist = self._eventTriggers[_eventType]
427
 
        else:
428
 
            triglist = [[], [], []]
429
 
            self._eventTriggers[_eventType] = triglist
430
 
        evtList = triglist[{"before": 0, "during": 1, "after": 2}[_phase]]
431
 
        evtList.append((_f, args, kw))
432
 
        return (_phase, _eventType, (_f, args, kw))
433
 
 
434
 
    def removeSystemEventTrigger(self, triggerID):
435
 
        """See twisted.internet.interfaces.IReactorCore.removeSystemEventTrigger.
436
 
        """
437
 
        phase, eventType, item = triggerID
438
 
        self._eventTriggers[eventType][{"before": 0,
439
 
                                        "during": 1,
440
 
                                        "after":  2}[phase]
441
 
                                       ].remove(item)
442
 
 
443
 
    def callWhenRunning(self, _callable, *args, **kw):
444
 
        """See twisted.internet.interfaces.IReactorCore.callWhenRunning.
445
 
        """
446
 
        if self.running:
447
 
            _callable(*args, **kw)
448
 
        else:
449
 
            return self.addSystemEventTrigger('after', 'startup',
450
 
                                              _callable, *args, **kw)
451
 
 
452
 
    # IReactorTime
453
 
 
454
 
    def callLater(self, _seconds, _f, *args, **kw):
455
 
        """See twisted.internet.interfaces.IReactorTime.callLater.
456
 
        """
457
 
        assert callable(_f), "%s is not callable" % _f
458
 
        assert sys.maxint >= _seconds >= 0, \
459
 
               "%s is not greater than or equal to 0 seconds" % (_seconds,)
460
 
        tple = DelayedCall(seconds() + _seconds, _f, args, kw,
461
 
                           self._cancelCallLater,
462
 
                           self._moveCallLaterSooner)
463
 
        self._newTimedCalls.append(tple)
464
 
        return tple
465
 
 
466
 
    def _moveCallLaterSooner(self, tple):
467
 
        # Linear time find: slow.
468
 
        heap = self._pendingTimedCalls
469
 
        try:
470
 
            pos = heap.index(tple)
471
 
 
472
 
            # Move elt up the heap until it rests at the right place.
473
 
            elt = heap[pos]
474
 
            while pos != 0:
475
 
                parent = (pos-1) // 2
476
 
                if heap[parent] <= elt:
477
 
                    break
478
 
                # move parent down
479
 
                heap[pos] = heap[parent]
480
 
                pos = parent
481
 
            heap[pos] = elt
482
 
        except ValueError:
483
 
            # element was not found in heap - oh well...
484
 
            pass
485
 
 
486
 
    def _cancelCallLater(self, tple):
487
 
        self._cancellations+=1
488
 
 
489
 
    def cancelCallLater(self, callID):
490
 
        """See twisted.internet.interfaces.IReactorTime.cancelCallLater.
491
 
        """
492
 
        # DO NOT DELETE THIS - this is documented in Python in a Nutshell, so we
493
 
        # we can't get rid of it for a long time.
494
 
        warnings.warn("reactor.cancelCallLater(callID) is deprecated - use callID.cancel() instead")
495
 
        callID.cancel()
496
 
 
497
 
    def getDelayedCalls(self):
498
 
        """Return all the outstanding delayed calls in the system.
499
 
        They are returned in no particular order.
500
 
        This method is not efficient -- it is really only meant for
501
 
        test cases."""
502
 
        return [x for x in (self._pendingTimedCalls + self._newTimedCalls) if not x.cancelled]
503
 
 
504
 
    def _insertNewDelayedCalls(self):
505
 
        for call in self._newTimedCalls:
506
 
            if call.cancelled:
507
 
                self._cancellations-=1
508
 
            else:
509
 
                call.activate_delay()
510
 
                heappush(self._pendingTimedCalls, call)
511
 
        self._newTimedCalls = []
512
 
 
513
 
    def timeout(self):
514
 
        # insert new delayed calls to make sure to include them in timeout value
515
 
        self._insertNewDelayedCalls()
516
 
 
517
 
        if not self._pendingTimedCalls:
518
 
            return None
519
 
 
520
 
        return max(0, self._pendingTimedCalls[0].time - seconds())
521
 
 
522
 
    def runUntilCurrent(self):
523
 
        """Run all pending timed calls.
524
 
        """
525
 
        if self.threadCallQueue:
526
 
            # Keep track of how many calls we actually make, as we're
527
 
            # making them, in case another call is added to the queue
528
 
            # while we're in this loop.
529
 
            count = 0
530
 
            total = len(self.threadCallQueue)
531
 
            for (f, a, kw) in self.threadCallQueue:
532
 
                try:
533
 
                    f(*a, **kw)
534
 
                except:
535
 
                    log.err()
536
 
                count += 1
537
 
                if count == total:
538
 
                    break
539
 
            del self.threadCallQueue[:count]
540
 
            if self.threadCallQueue:
541
 
                if self.waker:
542
 
                    self.waker.wakeUp()
543
 
 
544
 
        # insert new delayed calls now
545
 
        self._insertNewDelayedCalls()
546
 
 
547
 
        now = seconds()
548
 
        while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now):
549
 
            call = heappop(self._pendingTimedCalls)
550
 
            if call.cancelled:
551
 
                self._cancellations-=1
552
 
                continue
553
 
 
554
 
            if call.delayed_time > 0:
555
 
                call.activate_delay()
556
 
                heappush(self._pendingTimedCalls, call)
557
 
                continue
558
 
 
559
 
            try:
560
 
                call.called = 1
561
 
                call.func(*call.args, **call.kw)
562
 
            except:
563
 
                log.deferr()
564
 
                if hasattr(call, "creator"):
565
 
                    e = "\n"
566
 
                    e += " C: previous exception occurred in " + \
567
 
                         "a DelayedCall created here:\n"
568
 
                    e += " C:"
569
 
                    e += "".join(call.creator).rstrip().replace("\n","\n C:")
570
 
                    e += "\n"
571
 
                    log.msg(e)
572
 
 
573
 
 
574
 
        if (self._cancellations > 50 and
575
 
             self._cancellations > len(self._pendingTimedCalls) >> 1):
576
 
            self._cancellations = 0
577
 
            self._pendingTimedCalls = [x for x in self._pendingTimedCalls
578
 
                                       if not x.cancelled]
579
 
            heapify(self._pendingTimedCalls)
580
 
 
581
 
    # IReactorThreads
582
 
    if platform.supportsThreads():
583
 
        threadpool = None
584
 
 
585
 
        def _initThreads(self):
586
 
            self.usingThreads = 1
587
 
            self.resolver = ThreadedResolver(self)
588
 
            self.installWaker()
589
 
 
590
 
        def callFromThread(self, f, *args, **kw):
591
 
            """See twisted.internet.interfaces.IReactorThreads.callFromThread.
592
 
            """
593
 
            assert callable(f), "%s is not callable" % (f,)
594
 
            # lists are thread-safe in CPython, but not in Jython
595
 
            # this is probably a bug in Jython, but until fixed this code
596
 
            # won't work in Jython.
597
 
            self.threadCallQueue.append((f, args, kw))
598
 
            self.wakeUp()
599
 
 
600
 
        def _initThreadPool(self):
601
 
            from twisted.python import threadpool
602
 
            self.threadpool = threadpool.ThreadPool(0, 10, 'twisted.internet.reactor')
603
 
            self.callWhenRunning(self.threadpool.start)
604
 
            self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
605
 
 
606
 
        def callInThread(self, _callable, *args, **kwargs):
607
 
            """See twisted.internet.interfaces.IReactorThreads.callInThread.
608
 
            """
609
 
            if self.threadpool is None:
610
 
                self._initThreadPool()
611
 
            self.threadpool.callInThread(_callable, *args, **kwargs)
612
 
 
613
 
        def suggestThreadPoolSize(self, size):
614
 
            """See twisted.internet.interfaces.IReactorThreads.suggestThreadPoolSize.
615
 
            """
616
 
            if size == 0 and not self.threadpool:
617
 
                return
618
 
            if not self.threadpool:
619
 
                self._initThreadPool()
620
 
            self.threadpool.adjustPoolsize(maxthreads=size)
621
 
    else:
622
 
        # This is for signal handlers.
623
 
        def callFromThread(self, f, *args, **kw):
624
 
            assert callable(f), "%s is not callable" % (f,)
625
 
            # See comment in the other callFromThread implementation.
626
 
            self.threadCallQueue.append((f, args, kw))
627
 
 
628
 
if platform.supportsThreads():
629
 
    classImplements(ReactorBase, IReactorThreads)
630
 
 
631
 
 
632
 
class BaseConnector(styles.Ephemeral):
633
 
    """Basic implementation of connector.
634
 
 
635
 
    State can be: "connecting", "connected", "disconnected"
636
 
    """
637
 
 
638
 
    implements(IConnector)
639
 
 
640
 
    timeoutID = None
641
 
    factoryStarted = 0
642
 
 
643
 
    def __init__(self, factory, timeout, reactor):
644
 
        self.state = "disconnected"
645
 
        self.reactor = reactor
646
 
        self.factory = factory
647
 
        self.timeout = timeout
648
 
 
649
 
    def disconnect(self):
650
 
        """Disconnect whatever our state is."""
651
 
        if self.state == 'connecting':
652
 
            self.stopConnecting()
653
 
        elif self.state == 'connected':
654
 
            self.transport.loseConnection()
655
 
 
656
 
    def connect(self):
657
 
        """Start connection to remote server."""
658
 
        if self.state != "disconnected":
659
 
            raise RuntimeError, "can't connect in this state"
660
 
 
661
 
        self.state = "connecting"
662
 
        if not self.factoryStarted:
663
 
            self.factory.doStart()
664
 
            self.factoryStarted = 1
665
 
        self.transport = transport = self._makeTransport()
666
 
        if self.timeout is not None:
667
 
            self.timeoutID = self.reactor.callLater(self.timeout, transport.failIfNotConnected, error.TimeoutError())
668
 
        self.factory.startedConnecting(self)
669
 
 
670
 
    def stopConnecting(self):
671
 
        """Stop attempting to connect."""
672
 
        if self.state != "connecting":
673
 
            raise error.NotConnectingError, "we're not trying to connect"
674
 
 
675
 
        self.state = "disconnected"
676
 
        self.transport.failIfNotConnected(error.UserError())
677
 
        del self.transport
678
 
 
679
 
    def cancelTimeout(self):
680
 
        if self.timeoutID is not None:
681
 
            try:
682
 
                self.timeoutID.cancel()
683
 
            except ValueError:
684
 
                pass
685
 
            del self.timeoutID
686
 
 
687
 
    def buildProtocol(self, addr):
688
 
        self.state = "connected"
689
 
        self.cancelTimeout()
690
 
        return self.factory.buildProtocol(addr)
691
 
 
692
 
    def connectionFailed(self, reason):
693
 
        self.cancelTimeout()
694
 
        self.transport = None
695
 
        self.state = "disconnected"
696
 
        self.factory.clientConnectionFailed(self, reason)
697
 
        if self.state == "disconnected":
698
 
            # factory hasn't called our connect() method
699
 
            self.factory.doStop()
700
 
            self.factoryStarted = 0
701
 
 
702
 
    def connectionLost(self, reason):
703
 
        self.state = "disconnected"
704
 
        self.factory.clientConnectionLost(self, reason)
705
 
        if self.state == "disconnected":
706
 
            # factory hasn't called our connect() method
707
 
            self.factory.doStop()
708
 
            self.factoryStarted = 0
709
 
 
710
 
    def getDestination(self):
711
 
        raise NotImplementedError, "implement in subclasses"
712
 
 
713
 
 
714
 
class BasePort(abstract.FileDescriptor):
715
 
    """Basic implementation of a ListeningPort.
716
 
 
717
 
    Note: This does not actually implement IListeningPort.
718
 
    """
719
 
 
720
 
    addressFamily = None
721
 
    socketType = None
722
 
 
723
 
    def createInternetSocket(self):
724
 
        s = socket.socket(self.addressFamily, self.socketType)
725
 
        s.setblocking(0)
726
 
        if fcntl and hasattr(fcntl, 'FD_CLOEXEC'):
727
 
            old = fcntl.fcntl(s.fileno(), fcntl.F_GETFD)
728
 
            fcntl.fcntl(s.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC)
729
 
        return s
730
 
 
731
 
 
732
 
    def doWrite(self):
733
 
        """Raises a RuntimeError"""
734
 
        raise RuntimeError, "doWrite called on a %s" % reflect.qual(self.__class__)
735
 
 
736
 
 
737
 
__all__ = []