1
# -*- test-case-name: twisted.test.test_factories -*-
3
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
4
# See LICENSE for details.
8
Standard implementations of Twisted protocol-related interfaces.
10
Start here if you are looking to write a new protocol implementation for
11
Twisted. The Protocol class contains some introductory material.
13
Maintainer: Itamar Shtull-Trauring
17
from zope.interface import implements
20
from twisted.python import log, failure, components
21
from twisted.internet import interfaces, error, defer
25
"""This is a factory which produces protocols.
27
By default, buildProtocol will create a protocol of the class given in
31
implements(interfaces.IProtocolFactory)
33
# put a subclass of Protocol here:
40
"""Make sure startFactory is called.
42
Users should not call this function themselves!
46
log.msg("Starting factory %r" % self)
48
self.numPorts = self.numPorts + 1
51
"""Make sure stopFactory is called.
53
Users should not call this function themselves!
55
if self.numPorts == 0:
56
# this shouldn't happen, but does sometimes and this is better
57
# than blowing up in assert as we did previously.
59
self.numPorts = self.numPorts - 1
62
log.msg("Stopping factory %r" % self)
65
def startFactory(self):
66
"""This will be called before I begin listening on a Port or Connector.
68
It will only be called once, even if the factory is connected
71
This can be used to perform 'unserialization' tasks that
72
are best put off until things are actually running, such
73
as connecting to a database, opening files, etcetera.
76
def stopFactory(self):
77
"""This will be called before I stop listening on all Ports/Connectors.
79
This can be overridden to perform 'shutdown' tasks such as disconnecting
80
database connections, closing files, etc.
82
It will be called, for example, before an application shuts down,
83
if it was connected to a port. User code should not call this function
87
def buildProtocol(self, addr):
88
"""Create an instance of a subclass of Protocol.
90
The returned instance will handle input on an incoming server
91
connection, and an attribute \"factory\" pointing to the creating
94
Override this method to alter how Protocol instances get created.
96
@param addr: an object implementing L{twisted.internet.interfaces.IAddress}
103
class ClientFactory(Factory):
104
"""A Protocol factory for clients.
106
This can be used together with the various connectXXX methods in
110
def startedConnecting(self, connector):
111
"""Called when a connection has been started.
113
You can call connector.stopConnecting() to stop the connection attempt.
115
@param connector: a Connector object.
118
def clientConnectionFailed(self, connector, reason):
119
"""Called when a connection has failed to connect.
121
It may be useful to call connector.connect() - this will reconnect.
123
@type reason: L{twisted.python.failure.Failure}
126
def clientConnectionLost(self, connector, reason):
127
"""Called when an established connection is lost.
129
It may be useful to call connector.connect() - this will reconnect.
131
@type reason: L{twisted.python.failure.Failure}
135
class _InstanceFactory(ClientFactory):
136
"""Factory used by ClientCreator."""
140
def __init__(self, reactor, instance, deferred):
141
self.reactor = reactor
142
self.instance = instance
143
self.deferred = deferred
146
return "<ClientCreator factory: %r>" % (self.instance, )
148
def buildProtocol(self, addr):
149
self.reactor.callLater(0, self.deferred.callback, self.instance)
153
def clientConnectionFailed(self, connector, reason):
154
self.reactor.callLater(0, self.deferred.errback, reason)
159
"""Client connections that do not require a factory.
161
The various connect* methods create a protocol instance using the given
162
protocol class and arguments, and connect it, returning a Deferred of the
163
resulting protocol instance.
165
Useful for cases when we don't really need a factory. Mainly this
166
is when there is no shared state between protocol instances, and no need
170
def __init__(self, reactor, protocolClass, *args, **kwargs):
171
self.reactor = reactor
172
self.protocolClass = protocolClass
176
def connectTCP(self, host, port, timeout=30, bindAddress=None):
177
"""Connect to remote host, return Deferred of resulting protocol instance."""
179
f = _InstanceFactory(self.reactor, self.protocolClass(*self.args, **self.kwargs), d)
180
self.reactor.connectTCP(host, port, f, timeout=timeout, bindAddress=bindAddress)
183
def connectUNIX(self, address, timeout = 30, checkPID=0):
184
"""Connect to Unix socket, return Deferred of resulting protocol instance."""
186
f = _InstanceFactory(self.reactor, self.protocolClass(*self.args, **self.kwargs), d)
187
self.reactor.connectUNIX(address, f, timeout = timeout, checkPID=checkPID)
190
def connectSSL(self, host, port, contextFactory, timeout=30, bindAddress=None):
191
"""Connect to SSL server, return Deferred of resulting protocol instance."""
193
f = _InstanceFactory(self.reactor, self.protocolClass(*self.args, **self.kwargs), d)
194
self.reactor.connectSSL(host, port, f, contextFactory, timeout=timeout, bindAddress=bindAddress)
198
class ReconnectingClientFactory(ClientFactory):
200
Factory which auto-reconnects clients with an exponential back-off.
202
Note that clients should call my resetDelay method after they have
203
connected successfully.
205
@ivar maxDelay: Maximum number of seconds between connection attempts.
206
@ivar initialDelay: Delay for the first reconnection attempt.
207
@ivar factor: a multiplicitive factor by which the delay grows
208
@ivar jitter: percentage of randomness to introduce into the delay length
209
to prevent stampeding.
210
@ivar clock: the clock used to schedule reconnection. It's mainly useful to
211
be parametrized in tests. If the factory is serialized, this attribute
212
will not be serialized, and the default value (the reactor) will be
213
restored when deserialized.
217
# Note: These highly sensitive factors have been precisely measured by
218
# the National Institute of Science and Technology. Take extreme care
219
# in altering them, or you may damage your Internet!
220
# (Seriously: <http://physics.nist.gov/cuu/Constants/index.html>)
221
factor = 2.7182818284590451 # (math.e)
222
# Phi = 1.6180339887498948 # (Phi is acceptable for use as a
223
# factor if e is too large for your application.)
224
jitter = 0.11962656472 # molar Planck constant times c, joule meter/mole
236
def clientConnectionFailed(self, connector, reason):
237
if self.continueTrying:
238
self.connector = connector
242
def clientConnectionLost(self, connector, unused_reason):
243
if self.continueTrying:
244
self.connector = connector
248
def retry(self, connector=None):
250
Have this connector connect again, after a suitable delay.
252
if not self.continueTrying:
254
log.msg("Abandoning %s on explicit request" % (connector,))
257
if connector is None:
258
if self.connector is None:
259
raise ValueError("no connector to retry")
261
connector = self.connector
264
if self.maxRetries is not None and (self.retries > self.maxRetries):
266
log.msg("Abandoning %s after %d retries." %
267
(connector, self.retries))
270
self.delay = min(self.delay * self.factor, self.maxDelay)
272
self.delay = random.normalvariate(self.delay,
273
self.delay * self.jitter)
276
log.msg("%s will retry in %d seconds" % (connector, self.delay,))
281
if self.clock is None:
282
from twisted.internet import reactor
284
self._callID = self.clock.callLater(self.delay, reconnector)
287
def stopTrying(self):
289
Put a stop to any attempt to reconnect in progress.
291
# ??? Is this function really stopFactory?
293
self._callID.cancel()
296
# Hopefully this doesn't just make clientConnectionFailed
299
self.connector.stopConnecting()
300
except error.NotConnectingError:
302
self.continueTrying = 0
305
def resetDelay(self):
307
Call this method after a successful connection: it resets the delay and
310
self.delay = self.initialDelay
313
self.continueTrying = 1
316
def __getstate__(self):
318
Remove all of the state which is mutated by connection attempts and
319
failures, returning just the state which describes how reconnections
320
should be attempted. This will make the unserialized instance
321
behave just as this one did when it was first instantiated.
323
state = self.__dict__.copy()
324
for key in ['connector', 'retries', 'delay',
325
'continueTrying', '_callID', 'clock']:
332
class ServerFactory(Factory):
333
"""Subclass this to indicate that your protocol.Factory is only usable for servers.
338
"""This is the abstract superclass of all protocols.
340
If you are going to write a new protocol for Twisted, start here. The
341
docstrings of this class explain how you can get started. Any protocol
342
implementation, either client or server, should be a subclass of me.
344
My API is quite simple. Implement dataReceived(data) to handle both
345
event-based and synchronous input; output can be sent through the
346
'transport' attribute, which is to be an instance that implements
347
L{twisted.internet.interfaces.ITransport}.
349
Some subclasses exist already to help you write common types of protocols:
350
see the L{twisted.protocols.basic} module for a few of them.
356
def makeConnection(self, transport):
357
"""Make a connection to a transport and a server.
359
This sets the 'transport' attribute of this Protocol, and calls the
360
connectionMade() callback.
363
self.transport = transport
364
self.connectionMade()
366
def connectionMade(self):
367
"""Called when a connection is made.
369
This may be considered the initializer of the protocol, because
370
it is called when the connection is completed. For clients,
371
this is called once the connection to the server has been
372
established; for servers, this is called after an accept() call
373
stops blocking and a socket has been received. If you need to
374
send any greeting or initial message, do it here.
377
connectionDone=failure.Failure(error.ConnectionDone())
378
connectionDone.cleanFailure()
381
class Protocol(BaseProtocol):
383
implements(interfaces.IProtocol)
385
def dataReceived(self, data):
386
"""Called whenever data is received.
388
Use this method to translate to a higher-level message. Usually, some
389
callback will be made upon the receipt of each complete protocol
392
@param data: a string of indeterminate length. Please keep in mind
393
that you will probably need to buffer some data, as partial
394
(or multiple) protocol messages may be received! I recommend
395
that unit tests for protocols call through to this method with
396
differing chunk sizes, down to one byte at a time.
399
def connectionLost(self, reason=connectionDone):
400
"""Called when the connection is shut down.
402
Clear any circular references here, and any external references
403
to this Protocol. The connection has been closed.
405
@type reason: L{twisted.python.failure.Failure}
409
class ProtocolToConsumerAdapter(components.Adapter):
410
implements(interfaces.IConsumer)
412
def write(self, data):
413
self.original.dataReceived(data)
415
def registerProducer(self, producer, streaming):
418
def unregisterProducer(self):
421
components.registerAdapter(ProtocolToConsumerAdapter, interfaces.IProtocol,
422
interfaces.IConsumer)
424
class ConsumerToProtocolAdapter(components.Adapter):
425
implements(interfaces.IProtocol)
427
def dataReceived(self, data):
428
self.original.write(data)
430
def connectionLost(self, reason):
433
def makeConnection(self, transport):
436
def connectionMade(self):
439
components.registerAdapter(ConsumerToProtocolAdapter, interfaces.IConsumer,
440
interfaces.IProtocol)
442
class ProcessProtocol(BaseProtocol):
444
Base process protocol implementation which does simple dispatching for
445
stdin, stdout, and stderr file descriptors.
447
implements(interfaces.IProcessProtocol)
449
def childDataReceived(self, childFD, data):
451
self.outReceived(data)
453
self.errReceived(data)
456
def outReceived(self, data):
458
Some data was received from stdout.
462
def errReceived(self, data):
464
Some data was received from stderr.
468
def childConnectionLost(self, childFD):
470
self.inConnectionLost()
472
self.outConnectionLost()
474
self.errConnectionLost()
477
def inConnectionLost(self):
479
This will be called when stdin is closed.
483
def outConnectionLost(self):
485
This will be called when stdout is closed.
489
def errConnectionLost(self):
491
This will be called when stderr is closed.
495
def processExited(self, reason):
497
This will be called when the subprocess exits.
499
@type reason: L{twisted.python.failure.Failure}
503
def processEnded(self, reason):
505
This will be called when the subprocess is finished.
507
@type reason: L{twisted.python.failure.Failure}
512
class AbstractDatagramProtocol:
514
Abstract protocol for datagram-oriented transports, e.g. IP, ICMP, ARP, UDP.
521
def __getstate__(self):
522
d = self.__dict__.copy()
523
d['transport'] = None
527
"""Make sure startProtocol is called.
529
This will be called by makeConnection(), users should not call it.
531
if not self.numPorts:
533
log.msg("Starting protocol %s" % self)
535
self.numPorts = self.numPorts + 1
538
"""Make sure stopProtocol is called.
540
This will be called by the port, users should not call it.
542
assert self.numPorts > 0
543
self.numPorts = self.numPorts - 1
544
self.transport = None
545
if not self.numPorts:
547
log.msg("Stopping protocol %s" % self)
550
def startProtocol(self):
551
"""Called when a transport is connected to this protocol.
553
Will only be called once, even if multiple ports are connected.
556
def stopProtocol(self):
557
"""Called when the transport is disconnected.
559
Will only be called once, after all ports are disconnected.
562
def makeConnection(self, transport):
563
"""Make a connection to a transport and a server.
565
This sets the 'transport' attribute of this DatagramProtocol, and calls the
568
assert self.transport == None
569
self.transport = transport
572
def datagramReceived(self, datagram, addr):
573
"""Called when a datagram is received.
575
@param datagram: the string received from the transport.
576
@param addr: tuple of source of datagram.
580
class DatagramProtocol(AbstractDatagramProtocol):
582
Protocol for datagram-oriented transport, e.g. UDP.
584
@type transport: C{NoneType} or
585
L{IUDPTransport<twisted.internet.interfaces.IUDPTransport>} provider
586
@ivar transport: The transport with which this protocol is associated,
587
if it is associated with one.
590
def connectionRefused(self):
591
"""Called due to error from write in connected mode.
593
Note this is a result of ICMP message generated by *previous*
598
class ConnectedDatagramProtocol(DatagramProtocol):
599
"""Protocol for connected datagram-oriented transport.
601
No longer necessary for UDP.
604
def datagramReceived(self, datagram):
605
"""Called when a datagram is received.
607
@param datagram: the string received from the transport.
610
def connectionFailed(self, failure):
611
"""Called if connecting failed.
613
Usually this will be due to a DNS lookup failure.
619
"""A wrapper around a file-like object to make it behave as a Transport.
621
This doesn't actually stream the file to the attached protocol,
622
and is thus useful mainly as a utility for debugging protocols.
625
implements(interfaces.ITransport)
630
streamingProducer = 0
632
def __init__(self, file):
635
def write(self, data):
637
self.file.write(data)
639
self.handleException()
640
# self._checkProducer()
642
def _checkProducer(self):
643
# Cheating; this is called at "idle" times to allow producers to be
644
# found and dealt with
646
self.producer.resumeProducing()
648
def registerProducer(self, producer, streaming):
649
"""From abstract.FileDescriptor
651
self.producer = producer
652
self.streamingProducer = streaming
654
producer.resumeProducing()
656
def unregisterProducer(self):
659
def stopConsuming(self):
660
self.unregisterProducer()
661
self.loseConnection()
663
def writeSequence(self, iovec):
664
self.write("".join(iovec))
666
def loseConnection(self):
670
except (IOError, OSError):
671
self.handleException()
674
# XXX: According to ITransport, this should return an IAddress!
675
return 'file', 'file'
678
# XXX: According to ITransport, this should return an IAddress!
681
def handleException(self):
684
def resumeProducing(self):
685
# Never sends data anyways
688
def pauseProducing(self):
689
# Never sends data anyways
692
def stopProducing(self):
693
self.loseConnection()
696
__all__ = ["Factory", "ClientFactory", "ReconnectingClientFactory", "connectionDone",
697
"Protocol", "ProcessProtocol", "FileWrapper", "ServerFactory",
698
"AbstractDatagramProtocol", "DatagramProtocol", "ConnectedDatagramProtocol",