1
# -*- test-case-name: twisted.words.test.test_jabberxmlstream -*-
3
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
4
# See LICENSE for details.
9
Building blocks for setting up XML Streams, including helping classes for
10
doing authentication on either client or server side, and working with XML
14
from zope.interface import directlyProvides, implements
16
from twisted.internet import defer, protocol
17
from twisted.internet.error import ConnectionLost
18
from twisted.python import failure, log, randbytes
19
from twisted.python.hashlib import sha1
20
from twisted.words.protocols.jabber import error, ijabber, jid
21
from twisted.words.xish import domish, xmlstream
22
from twisted.words.xish.xmlstream import STREAM_CONNECTED_EVENT
23
from twisted.words.xish.xmlstream import STREAM_START_EVENT
24
from twisted.words.xish.xmlstream import STREAM_END_EVENT
25
from twisted.words.xish.xmlstream import STREAM_ERROR_EVENT
28
from twisted.internet import ssl
31
if ssl and not ssl.supported:
34
STREAM_AUTHD_EVENT = intern("//event/stream/authd")
35
INIT_FAILED_EVENT = intern("//event/xmpp/initfailed")
37
NS_STREAMS = 'http://etherx.jabber.org/streams'
38
NS_XMPP_TLS = 'urn:ietf:params:xml:ns:xmpp-tls'
42
def hashPassword(sid, password):
44
Create a SHA1-digest string of a session identifier and password.
46
@param sid: The stream session identifier.
47
@type sid: C{unicode}.
48
@param password: The password to be hashed.
49
@type password: C{unicode}.
51
if not isinstance(sid, unicode):
52
raise TypeError("The session identifier must be a unicode object")
53
if not isinstance(password, unicode):
54
raise TypeError("The password must be a unicode object")
55
input = u"%s%s" % (sid, password)
56
return sha1(input.encode('utf-8')).hexdigest()
62
Base class for business logic of initializing an XmlStream
64
Subclass this object to enable an XmlStream to initialize and authenticate
65
to different types of stream hosts (such as clients, components, etc.).
68
1. The Authenticator MUST dispatch a L{STREAM_AUTHD_EVENT} when the
69
stream has been completely initialized.
70
2. The Authenticator SHOULD reset all state information when
71
L{associateWithStream} is called.
72
3. The Authenticator SHOULD override L{streamStarted}, and start
75
@type xmlstream: L{XmlStream}
76
@ivar xmlstream: The XmlStream that needs authentication
78
@note: the term authenticator is historical. Authenticators perform
79
all steps required to prepare the stream for the exchange
87
def connectionMade(self):
89
Called by the XmlStream when the underlying socket connection is
92
This allows the Authenticator to send an initial root element, if it's
93
connecting, or wait for an inbound root from the peer if it's accepting
96
Subclasses can use self.xmlstream.send() to send any initial data to
101
def streamStarted(self, rootElement):
103
Called by the XmlStream when the stream has started.
105
A stream is considered to have started when the start tag of the root
106
element has been received.
108
This examines L{rootElement} to see if there is a version attribute.
109
If absent, C{0.0} is assumed per RFC 3920. Subsequently, the
110
minimum of the version from the received stream header and the
111
value stored in L{xmlstream} is taken and put back in {xmlstream}.
113
Extensions of this method can extract more information from the
114
stream header and perform checks on them, optionally sending
115
stream errors and closing the stream.
117
if rootElement.hasAttribute("version"):
118
version = rootElement["version"].split(".")
120
version = (int(version[0]), int(version[1]))
121
except (IndexError, ValueError):
126
self.xmlstream.version = min(self.xmlstream.version, version)
129
def associateWithStream(self, xmlstream):
131
Called by the XmlStreamFactory when a connection has been made
132
to the requested peer, and an XmlStream object has been
135
The default implementation just saves a handle to the new
138
@type xmlstream: L{XmlStream}
139
@param xmlstream: The XmlStream that will be passing events to this
143
self.xmlstream = xmlstream
147
class ConnectAuthenticator(Authenticator):
149
Authenticator for initiating entities.
154
def __init__(self, otherHost):
155
self.otherHost = otherHost
158
def connectionMade(self):
159
self.xmlstream.namespace = self.namespace
160
self.xmlstream.otherEntity = jid.internJID(self.otherHost)
161
self.xmlstream.sendHeader()
164
def initializeStream(self):
166
Perform stream initialization procedures.
168
An L{XmlStream} holds a list of initializer objects in its
169
C{initializers} attribute. This method calls these initializers in
170
order and dispatches the C{STREAM_AUTHD_EVENT} event when the list has
171
been successfully processed. Otherwise it dispatches the
172
C{INIT_FAILED_EVENT} event with the failure.
174
Initializers may return the special L{Reset} object to halt the
175
initialization processing. It signals that the current initializer was
176
successfully processed, but that the XML Stream has been reset. An
177
example is the TLSInitiatingInitializer.
180
def remove_first(result):
181
self.xmlstream.initializers.pop(0)
187
Take the first initializer and process it.
189
On success, the initializer is removed from the list and
190
then next initializer will be tried.
197
init = self.xmlstream.initializers[0]
199
self.xmlstream.dispatch(self.xmlstream, STREAM_AUTHD_EVENT)
202
d = defer.maybeDeferred(init.initialize)
203
d.addCallback(remove_first)
204
d.addCallback(do_next)
207
d = defer.succeed(None)
208
d.addCallback(do_next)
209
d.addErrback(self.xmlstream.dispatch, INIT_FAILED_EVENT)
212
def streamStarted(self, rootElement):
214
Called by the XmlStream when the stream has started.
216
This extends L{Authenticator.streamStarted} to extract further stream
217
headers from L{rootElement}, optionally wait for stream features being
218
received and then call C{initializeStream}.
221
Authenticator.streamStarted(self, rootElement)
223
self.xmlstream.sid = rootElement.getAttribute("id")
225
if rootElement.hasAttribute("from"):
226
self.xmlstream.otherEntity = jid.internJID(rootElement["from"])
228
# Setup observer for stream features, if applicable
229
if self.xmlstream.version >= (1, 0):
230
def onFeatures(element):
232
for feature in element.elements():
233
features[(feature.uri, feature.name)] = feature
235
self.xmlstream.features = features
236
self.initializeStream()
238
self.xmlstream.addOnetimeObserver('/features[@xmlns="%s"]' %
242
self.initializeStream()
246
class ListenAuthenticator(Authenticator):
248
Authenticator for receiving entities.
253
def associateWithStream(self, xmlstream):
255
Called by the XmlStreamFactory when a connection has been made.
257
Extend L{Authenticator.associateWithStream} to set the L{XmlStream}
258
to be non-initiating.
260
Authenticator.associateWithStream(self, xmlstream)
261
self.xmlstream.initiating = False
264
def streamStarted(self, rootElement):
266
Called by the XmlStream when the stream has started.
268
This extends L{Authenticator.streamStarted} to extract further
269
information from the stream headers from L{rootElement}.
271
Authenticator.streamStarted(self, rootElement)
273
self.xmlstream.namespace = rootElement.defaultUri
275
if rootElement.hasAttribute("to"):
276
self.xmlstream.thisEntity = jid.internJID(rootElement["to"])
278
self.xmlstream.prefixes = {}
279
for prefix, uri in rootElement.localPrefixes.iteritems():
280
self.xmlstream.prefixes[uri] = prefix
282
self.xmlstream.sid = randbytes.secureRandom(8).encode('hex')
286
class FeatureNotAdvertized(Exception):
288
Exception indicating a stream feature was not advertized, while required by
289
the initiating entity.
294
class BaseFeatureInitiatingInitializer(object):
296
Base class for initializers with a stream feature.
298
This assumes the associated XmlStream represents the initiating entity
301
@cvar feature: tuple of (uri, name) of the stream feature root element.
302
@type feature: tuple of (L{str}, L{str})
303
@ivar required: whether the stream feature is required to be advertized
304
by the receiving entity.
305
@type required: L{bool}
308
implements(ijabber.IInitiatingInitializer)
313
def __init__(self, xs):
317
def initialize(self):
319
Initiate the initialization.
321
Checks if the receiving entity advertizes the stream feature. If it
322
does, the initialization is started. If it is not advertized, and the
323
C{required} instance variable is L{True}, it raises
324
L{FeatureNotAdvertized}. Otherwise, the initialization silently
328
if self.feature in self.xmlstream.features:
331
raise FeatureNotAdvertized
338
Start the actual initialization.
340
May return a deferred for asynchronous initialization.
345
class TLSError(Exception):
352
class TLSFailed(TLSError):
354
Exception indicating failed TLS negotiation
359
class TLSRequired(TLSError):
361
Exception indicating required TLS negotiation.
363
This exception is raised when the receiving entity requires TLS
364
negotiation and the initiating does not desire to negotiate TLS.
369
class TLSNotSupported(TLSError):
371
Exception indicating missing TLS support.
373
This exception is raised when the initiating entity wants and requires to
374
negotiate TLS when the OpenSSL library is not available.
379
class TLSInitiatingInitializer(BaseFeatureInitiatingInitializer):
381
TLS stream initializer for the initiating entity.
383
It is strongly required to include this initializer in the list of
384
initializers for an XMPP stream. By default it will try to negotiate TLS.
385
An XMPP server may indicate that TLS is required. If TLS is not desired,
386
set the C{wanted} attribute to False instead of removing it from the list
387
of initializers, so a proper exception L{TLSRequired} can be raised.
389
@cvar wanted: indicates if TLS negotiation is wanted.
390
@type wanted: L{bool}
393
feature = (NS_XMPP_TLS, 'starttls')
397
def onProceed(self, obj):
399
Proceed with TLS negotiation and reset the XML stream.
402
self.xmlstream.removeObserver('/failure', self.onFailure)
403
ctx = ssl.CertificateOptions()
404
self.xmlstream.transport.startTLS(ctx)
405
self.xmlstream.reset()
406
self.xmlstream.sendHeader()
407
self._deferred.callback(Reset)
410
def onFailure(self, obj):
411
self.xmlstream.removeObserver('/proceed', self.onProceed)
412
self._deferred.errback(TLSFailed())
417
Start TLS negotiation.
419
This checks if the receiving entity requires TLS, the SSL library is
420
available and uses the C{required} and C{wanted} instance variables to
421
determine what to do in the various different cases.
423
For example, if the SSL library is not available, and wanted and
424
required by the user, it raises an exception. However if it is not
425
required by both parties, initialization silently succeeds, moving
431
return defer.fail(TLSNotSupported())
433
return defer.succeed(None)
436
elif self.xmlstream.features[self.feature].required:
437
return defer.fail(TLSRequired())
439
return defer.succeed(None)
441
self._deferred = defer.Deferred()
442
self.xmlstream.addOnetimeObserver("/proceed", self.onProceed)
443
self.xmlstream.addOnetimeObserver("/failure", self.onFailure)
444
self.xmlstream.send(domish.Element((NS_XMPP_TLS, "starttls")))
445
return self._deferred
449
class XmlStream(xmlstream.XmlStream):
451
XMPP XML Stream protocol handler.
453
@ivar version: XML stream version as a tuple (major, minor). Initially,
454
this is set to the minimally supported version. Upon
455
receiving the stream header of the peer, it is set to the
456
minimum of that value and the version on the received
458
@type version: (L{int}, L{int})
459
@ivar namespace: default namespace URI for stream
460
@type namespace: L{str}
461
@ivar thisEntity: JID of this entity
462
@type thisEntity: L{JID}
463
@ivar otherEntity: JID of the peer entity
464
@type otherEntity: L{JID}
465
@ivar sid: session identifier
467
@ivar initiating: True if this is the initiating stream
468
@type initiating: L{bool}
469
@ivar features: map of (uri, name) to stream features element received from
470
the receiving entity.
471
@type features: L{dict} of (L{str}, L{str}) to L{domish.Element}.
472
@ivar prefixes: map of URI to prefixes that are to appear on stream
474
@type prefixes: L{dict} of L{str} to L{str}
475
@ivar initializers: list of stream initializer objects
476
@type initializers: L{list} of objects that provide L{IInitializer}
477
@ivar authenticator: associated authenticator that uses C{initializers} to
478
initialize the XML stream.
482
namespace = 'invalid'
488
_headerSent = False # True if the stream header has been sent
490
def __init__(self, authenticator):
491
xmlstream.XmlStream.__init__(self)
493
self.prefixes = {NS_STREAMS: 'stream'}
494
self.authenticator = authenticator
495
self.initializers = []
498
# Reset the authenticator
499
authenticator.associateWithStream(self)
502
def _callLater(self, *args, **kwargs):
503
from twisted.internet import reactor
504
return reactor.callLater(*args, **kwargs)
511
Resets the XML Parser for incoming data. This is to be used after
512
successfully negotiating a new layer, e.g. TLS and SASL. Note that
513
registered event observers will continue to be in place.
515
self._headerSent = False
516
self._initializeStream()
519
def onStreamError(self, errelem):
521
Called when a stream:error element has been received.
523
Dispatches a L{STREAM_ERROR_EVENT} event with the error element to
524
allow for cleanup actions and drops the connection.
526
@param errelem: The received error element.
527
@type errelem: L{domish.Element}
529
self.dispatch(failure.Failure(error.exceptionFromStreamError(errelem)),
531
self.transport.loseConnection()
534
def sendHeader(self):
538
# set up optional extra namespaces
540
for uri, prefix in self.prefixes.iteritems():
541
if uri != NS_STREAMS:
542
localPrefixes[prefix] = uri
544
rootElement = domish.Element((NS_STREAMS, 'stream'), self.namespace,
545
localPrefixes=localPrefixes)
548
rootElement['to'] = self.otherEntity.userhost()
551
rootElement['from'] = self.thisEntity.userhost()
553
if not self.initiating and self.sid:
554
rootElement['id'] = self.sid
556
if self.version >= (1, 0):
557
rootElement['version'] = "%d.%d" % self.version
559
self.send(rootElement.toXml(prefixes=self.prefixes, closeElement=0))
560
self._headerSent = True
563
def sendFooter(self):
567
self.send('</stream:stream>')
570
def sendStreamError(self, streamError):
572
Send stream level error.
574
If we are the receiving entity, and haven't sent the header yet,
577
After sending the stream error, the stream is closed and the transport
580
@param streamError: stream error instance
581
@type streamError: L{error.StreamError}
583
if not self._headerSent and not self.initiating:
587
self.send(streamError.getElement())
590
self.transport.loseConnection()
595
Send data over the stream.
597
This overrides L{xmlstream.Xmlstream.send} to use the default namespace
598
of the stream header when serializing L{domish.IElement}s. It is
599
assumed that if you pass an object that provides L{domish.IElement},
600
it represents a direct child of the stream's root element.
602
if domish.IElement.providedBy(obj):
603
obj = obj.toXml(prefixes=self.prefixes,
604
defaultUri=self.namespace,
605
prefixesInScope=self.prefixes.values())
607
xmlstream.XmlStream.send(self, obj)
610
def connectionMade(self):
612
Called when a connection is made.
614
Notifies the authenticator when a connection has been made.
616
xmlstream.XmlStream.connectionMade(self)
617
self.authenticator.connectionMade()
620
def onDocumentStart(self, rootElement):
622
Called when the stream header has been received.
624
Extracts the header's C{id} and C{version} attributes from the root
625
element. The C{id} attribute is stored in our C{sid} attribute and the
626
C{version} attribute is parsed and the minimum of the version we sent
627
and the parsed C{version} attribute is stored as a tuple (major, minor)
628
in this class' C{version} attribute. If no C{version} attribute was
629
present, we assume version 0.0.
631
If appropriate (we are the initiating stream and the minimum of our and
632
the other party's version is at least 1.0), a one-time observer is
633
registered for getting the stream features. The registered function is
636
Ultimately, the authenticator's C{streamStarted} method will be called.
638
@param rootElement: The root element.
639
@type rootElement: L{domish.Element}
641
xmlstream.XmlStream.onDocumentStart(self, rootElement)
643
# Setup observer for stream errors
644
self.addOnetimeObserver("/error[@xmlns='%s']" % NS_STREAMS,
647
self.authenticator.streamStarted(rootElement)
651
class XmlStreamFactory(xmlstream.XmlStreamFactory):
653
Factory for Jabber XmlStream objects as a reconnecting client.
655
Note that this differs from L{xmlstream.XmlStreamFactory} in that
656
it generates Jabber specific L{XmlStream} instances that have
662
def __init__(self, authenticator):
663
xmlstream.XmlStreamFactory.__init__(self, authenticator)
664
self.authenticator = authenticator
668
class XmlStreamServerFactory(xmlstream.BootstrapMixin,
669
protocol.ServerFactory):
671
Factory for Jabber XmlStream objects as a server.
674
@ivar authenticatorFactory: Factory callable that takes no arguments, to
675
create a fresh authenticator to be associated
681
def __init__(self, authenticatorFactory):
682
xmlstream.BootstrapMixin.__init__(self)
683
self.authenticatorFactory = authenticatorFactory
686
def buildProtocol(self, addr):
688
Create an instance of XmlStream.
690
A new authenticator instance will be created and passed to the new
691
XmlStream. Registered bootstrap event observers are installed as well.
693
authenticator = self.authenticatorFactory()
694
xs = self.protocol(authenticator)
696
self.installBootstraps(xs)
701
class TimeoutError(Exception):
703
Exception raised when no IQ response has been received before the
709
def upgradeWithIQResponseTracker(xs):
711
Enhances an XmlStream for iq response tracking.
713
This makes an L{XmlStream} object provide L{IIQResponseTracker}. When a
714
response is an error iq stanza, the deferred has its errback invoked with a
715
failure that holds a L{StanzaException<error.StanzaException>} that is
720
Handle iq response by firing associated deferred.
722
if getattr(iq, 'handled', False):
726
d = xs.iqDeferreds[iq["id"]]
730
del xs.iqDeferreds[iq["id"]]
732
if iq['type'] == 'error':
733
d.errback(error.exceptionFromStanza(iq))
740
Make sure deferreds do not linger on after disconnect.
742
This errbacks all deferreds of iq's for which no response has been
743
received with a L{ConnectionLost} failure. Otherwise, the deferreds
746
iqDeferreds = xs.iqDeferreds
748
for d in iqDeferreds.itervalues():
749
d.errback(ConnectionLost())
752
xs.iqDefaultTimeout = getattr(xs, 'iqDefaultTimeout', None)
753
xs.addObserver(xmlstream.STREAM_END_EVENT, disconnected)
754
xs.addObserver('/iq[@type="result"]', callback)
755
xs.addObserver('/iq[@type="error"]', callback)
756
directlyProvides(xs, ijabber.IIQResponseTracker)
760
class IQ(domish.Element):
762
Wrapper for an iq stanza.
764
Iq stanzas are used for communications with a request-response behaviour.
765
Each iq request is associated with an XML stream and has its own unique id
766
to be able to track the response.
768
@ivar timeout: if set, a timeout period after which the deferred returned
769
by C{send} will have its errback called with a
770
L{TimeoutError} failure.
771
@type timeout: C{float}
776
def __init__(self, xmlstream, stanzaType="set"):
778
@type xmlstream: L{xmlstream.XmlStream}
779
@param xmlstream: XmlStream to use for transmission of this IQ
781
@type stanzaType: L{str}
782
@param stanzaType: IQ type identifier ('get' or 'set')
784
domish.Element.__init__(self, (None, "iq"))
786
self["type"] = stanzaType
787
self._xmlstream = xmlstream
790
def send(self, to=None):
794
Returns a deferred that is fired when an iq response with the same id
795
is received. Result responses will be passed to the deferred callback.
796
Error responses will be transformed into a
797
L{StanzaError<error.StanzaError>} and result in the errback of the
798
deferred being invoked.
800
@rtype: L{defer.Deferred}
805
if not ijabber.IIQResponseTracker.providedBy(self._xmlstream):
806
upgradeWithIQResponseTracker(self._xmlstream)
809
self._xmlstream.iqDeferreds[self['id']] = d
811
timeout = self.timeout or self._xmlstream.iqDefaultTimeout
812
if timeout is not None:
814
del self._xmlstream.iqDeferreds[self['id']]
815
d.errback(TimeoutError("IQ timed out"))
817
call = self._xmlstream._callLater(timeout, onTimeout)
819
def cancelTimeout(result):
825
d.addBoth(cancelTimeout)
827
self._xmlstream.send(self)
832
def toResponse(stanza, stanzaType=None):
834
Create a response stanza from another stanza.
836
This takes the addressing and id attributes from a stanza to create a (new,
837
empty) response stanza. The addressing attributes are swapped and the id
838
copied. Optionally, the stanza type of the response can be specified.
840
@param stanza: the original stanza
841
@type stanza: L{domish.Element}
842
@param stanzaType: optional response stanza type
843
@type stanzaType: C{str}
844
@return: the response stanza.
845
@rtype: L{domish.Element}
848
toAddr = stanza.getAttribute('from')
849
fromAddr = stanza.getAttribute('to')
850
stanzaID = stanza.getAttribute('id')
852
response = domish.Element((None, stanza.name))
854
response['to'] = toAddr
856
response['from'] = fromAddr
858
response['id'] = stanzaID
860
response['type'] = stanzaType
866
class XMPPHandler(object):
868
XMPP protocol handler.
870
Classes derived from this class implement (part of) one or more XMPP
871
extension protocols, and are referred to as a subprotocol implementation.
874
implements(ijabber.IXMPPHandler)
878
self.xmlstream = None
881
def setHandlerParent(self, parent):
883
self.parent.addHandler(self)
886
def disownHandlerParent(self, parent):
887
self.parent.removeHandler(self)
891
def makeConnection(self, xs):
893
self.connectionMade()
896
def connectionMade(self):
898
Called after a connection has been established.
900
Can be overridden to perform work before stream initialization.
904
def connectionInitialized(self):
906
The XML stream has been initialized.
908
Can be overridden to perform work after stream initialization, e.g. to
909
set up observers and start exchanging XML stanzas.
913
def connectionLost(self, reason):
915
The XML stream has been closed.
917
This method can be extended to inspect the C{reason} argument and
920
self.xmlstream = None
925
Send data over the managed XML stream.
927
@note: The stream manager maintains a queue for data sent using this
928
method when there is no current initialized XML stream. This
929
data is then sent as soon as a new stream has been established
930
and initialized. Subsequently, L{connectionInitialized} will be
931
called again. If this queueing is not desired, use C{send} on
934
@param obj: data to be sent over the XML stream. This is usually an
935
object providing L{domish.IElement}, or serialized XML. See
936
L{xmlstream.XmlStream} for details.
938
self.parent.send(obj)
942
class XMPPHandlerCollection(object):
944
Collection of XMPP subprotocol handlers.
946
This allows for grouping of subprotocol handlers, but is not an
947
L{XMPPHandler} itself, so this is not recursive.
949
@ivar handlers: List of protocol handlers.
950
@type handlers: L{list} of objects providing
954
implements(ijabber.IXMPPHandlerCollection)
962
Act as a container for handlers.
964
return iter(self.handlers)
967
def addHandler(self, handler):
969
Add protocol handler.
971
Protocol handlers are expected to provide L{ijabber.IXMPPHandler}.
973
self.handlers.append(handler)
976
def removeHandler(self, handler):
978
Remove protocol handler.
980
self.handlers.remove(handler)
984
class StreamManager(XMPPHandlerCollection):
986
Business logic representing a managed XMPP connection.
988
This maintains a single XMPP connection and provides facilities for packet
989
routing and transmission. Business logic modules are objects providing
990
L{ijabber.IXMPPHandler} (like subclasses of L{XMPPHandler}), and added
993
@ivar xmlstream: currently managed XML stream
994
@type xmlstream: L{XmlStream}
995
@ivar logTraffic: if true, log all traffic.
996
@type logTraffic: L{bool}
997
@ivar _initialized: Whether the stream represented by L{xmlstream} has
998
been initialized. This is used when caching outgoing
1000
@type _initialized: C{bool}
1001
@ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
1002
@type _packetQueue: L{list}
1007
def __init__(self, factory):
1008
XMPPHandlerCollection.__init__(self)
1009
self.xmlstream = None
1010
self._packetQueue = []
1011
self._initialized = False
1013
factory.addBootstrap(STREAM_CONNECTED_EVENT, self._connected)
1014
factory.addBootstrap(STREAM_AUTHD_EVENT, self._authd)
1015
factory.addBootstrap(INIT_FAILED_EVENT, self.initializationFailed)
1016
factory.addBootstrap(STREAM_END_EVENT, self._disconnected)
1017
self.factory = factory
1020
def addHandler(self, handler):
1022
Add protocol handler.
1024
When an XML stream has already been established, the handler's
1025
C{connectionInitialized} will be called to get it up to speed.
1027
XMPPHandlerCollection.addHandler(self, handler)
1029
# get protocol handler up to speed when a connection has already
1031
if self.xmlstream and self._initialized:
1032
handler.makeConnection(self.xmlstream)
1033
handler.connectionInitialized()
1036
def _connected(self, xs):
1038
Called when the transport connection has been established.
1040
Here we optionally set up traffic logging (depending on L{logTraffic})
1041
and call each handler's C{makeConnection} method with the L{XmlStream}
1045
log.msg("RECV: %r" % buf)
1047
def logDataOut(buf):
1048
log.msg("SEND: %r" % buf)
1051
xs.rawDataInFn = logDataIn
1052
xs.rawDataOutFn = logDataOut
1057
e.makeConnection(xs)
1060
def _authd(self, xs):
1062
Called when the stream has been initialized.
1064
Send out cached stanzas and call each handler's
1065
C{connectionInitialized} method.
1067
# Flush all pending packets
1068
for p in self._packetQueue:
1070
self._packetQueue = []
1071
self._initialized = True
1073
# Notify all child services which implement
1074
# the IService interface
1076
e.connectionInitialized()
1079
def initializationFailed(self, reason):
1081
Called when stream initialization has failed.
1083
Stream initialization has halted, with the reason indicated by
1084
C{reason}. It may be retried by calling the authenticator's
1085
C{initializeStream}. See the respective authenticators for details.
1087
@param reason: A failure instance indicating why stream initialization
1089
@type reason: L{failure.Failure}
1093
def _disconnected(self, _):
1095
Called when the stream has been closed.
1097
From this point on, the manager doesn't interact with the
1098
L{XmlStream} anymore and notifies each handler that the connection
1099
was lost by calling its C{connectionLost} method.
1101
self.xmlstream = None
1102
self._initialized = False
1104
# Notify all child services which implement
1105
# the IService interface
1107
e.connectionLost(None)
1110
def send(self, obj):
1112
Send data over the XML stream.
1114
When there is no established XML stream, the data is queued and sent
1115
out when a new XML stream has been established and initialized.
1117
@param obj: data to be sent over the XML stream. See
1118
L{xmlstream.XmlStream.send} for details.
1120
if self._initialized:
1121
self.xmlstream.send(obj)
1123
self._packetQueue.append(obj)
1127
__all__ = ['Authenticator', 'BaseFeatureInitiatingInitializer',
1128
'ConnectAuthenticator', 'ConnectionLost', 'FeatureNotAdvertized',
1129
'INIT_FAILED_EVENT', 'IQ', 'ListenAuthenticator', 'NS_STREAMS',
1130
'NS_XMPP_TLS', 'Reset', 'STREAM_AUTHD_EVENT',
1131
'STREAM_CONNECTED_EVENT', 'STREAM_END_EVENT', 'STREAM_ERROR_EVENT',
1132
'STREAM_START_EVENT', 'StreamManager', 'TLSError', 'TLSFailed',
1133
'TLSInitiatingInitializer', 'TLSNotSupported', 'TLSRequired',
1134
'TimeoutError', 'XMPPHandler', 'XMPPHandlerCollection', 'XmlStream',
1135
'XmlStreamFactory', 'XmlStreamServerFactory', 'hashPassword',
1136
'toResponse', 'upgradeWithIQResponseTracker']