1
# -*- test-case-name: twisted.words.test.test_jabberxmlstream -*-
3
# Copyright (c) 2001-2006 Twisted Matrix Laboratories.
4
# See LICENSE for details.
9
Building blocks for setting up XML Streams, including helping classes for
10
doing authentication on either client or server side, and working with XML
14
from zope.interface import directlyProvides, implements
16
from twisted.internet import defer
17
from twisted.internet.error import ConnectionLost
18
from twisted.python import failure
19
from twisted.words.protocols.jabber import error, ijabber
20
from twisted.words.xish import domish, xmlstream
21
from twisted.words.xish.xmlstream import STREAM_CONNECTED_EVENT
22
from twisted.words.xish.xmlstream import STREAM_START_EVENT
23
from twisted.words.xish.xmlstream import STREAM_END_EVENT
24
from twisted.words.xish.xmlstream import STREAM_ERROR_EVENT
27
from twisted.internet import ssl
30
if ssl and not ssl.supported:
33
STREAM_AUTHD_EVENT = intern("//event/stream/authd")
34
INIT_FAILED_EVENT = intern("//event/xmpp/initfailed")
36
NS_STREAMS = 'http://etherx.jabber.org/streams'
37
NS_XMPP_TLS = 'urn:ietf:params:xml:ns:xmpp-tls'
41
def hashPassword(sid, password):
43
Create a SHA1-digest string of a session identifier and password.
46
return sha.new("%s%s" % (sid, password)).hexdigest()
50
Base class for business logic of initializing an XmlStream
52
Subclass this object to enable an XmlStream to initialize and authenticate
53
to different types of stream hosts (such as clients, components, etc.).
56
1. The Authenticator MUST dispatch a L{STREAM_AUTHD_EVENT} when the
57
stream has been completely initialized.
58
2. The Authenticator SHOULD reset all state information when
59
L{associateWithStream} is called.
60
3. The Authenticator SHOULD override L{streamStarted}, and start
63
@type xmlstream: L{XmlStream}
64
@ivar xmlstream: The XmlStream that needs authentication
66
@note: the term authenticator is historical. Authenticators perform
67
all steps required to prepare the stream for the exchange
74
def connectionMade(self):
76
Called by the XmlStream when the underlying socket connection is
79
This allows the Authenticator to send an initial root element, if it's
80
connecting, or wait for an inbound root from the peer if it's accepting
83
Subclasses can use self.xmlstream.send() to send any initial data to
87
def streamStarted(self):
89
Called by the XmlStream when the stream has started.
91
A stream is considered to have started when the root element has been
92
received and, if applicable, the feature set has been received.
95
def associateWithStream(self, xmlstream):
97
Called by the XmlStreamFactory when a connection has been made
98
to the requested peer, and an XmlStream object has been
101
The default implementation just saves a handle to the new
104
@type xmlstream: L{XmlStream}
105
@param xmlstream: The XmlStream that will be passing events to this
109
self.xmlstream = xmlstream
111
class ConnectAuthenticator(Authenticator):
113
Authenticator for initiating entities.
118
def __init__(self, otherHost):
119
self.otherHost = otherHost
121
def connectionMade(self):
122
self.xmlstream.namespace = self.namespace
123
self.xmlstream.otherHost = self.otherHost
124
self.xmlstream.sendHeader()
126
def initializeStream(self):
128
Perform stream initialization procedures.
130
An L{XmlStream} holds a list of initializer objects in its
131
C{initializers} attribute. This method calls these initializers in
132
order and dispatches the C{STREAM_AUTHD_EVENT} event when the list has
133
been successfully processed. Otherwise it dispatches the
134
C{INIT_FAILED_EVENT} event with the failure.
136
Initializers may return the special L{Reset} object to halt the
137
initialization processing. It signals that the current initializer was
138
successfully processed, but that the XML Stream has been reset. An
139
example is the TLSInitiatingInitializer.
142
def remove_first(result):
143
self.xmlstream.initializers.pop(0)
149
Take the first initializer and process it.
151
On success, the initializer is removed from the list and
152
then next initializer will be tried.
159
init = self.xmlstream.initializers[0]
161
self.xmlstream.dispatch(self.xmlstream, STREAM_AUTHD_EVENT)
164
d = defer.maybeDeferred(init.initialize)
165
d.addCallback(remove_first)
166
d.addCallback(do_next)
169
d = defer.succeed(None)
170
d.addCallback(do_next)
171
d.addErrback(self.xmlstream.dispatch, INIT_FAILED_EVENT)
173
def streamStarted(self):
174
self.initializeStream()
176
class FeatureNotAdvertized(Exception):
178
Exception indicating a stream feature was not advertized, while required by
179
the initiating entity.
182
class BaseFeatureInitiatingInitializer(object):
184
Base class for initializers with a stream feature.
186
This assumes the associated XmlStream represents the initiating entity
189
@cvar feature: tuple of (uri, name) of the stream feature root element.
190
@type feature: tuple of (L{str}, L{str})
191
@ivar required: whether the stream feature is required to be advertized
192
by the receiving entity.
193
@type required: L{bool}
196
implements(ijabber.IInitiatingInitializer)
201
def __init__(self, xs):
204
def initialize(self):
206
Initiate the initialization.
208
Checks if the receiving entity advertizes the stream feature. If it
209
does, the initialization is started. If it is not advertized, and the
210
C{required} instance variable is L{True}, it raises
211
L{FeatureNotAdvertized}. Otherwise, the initialization silently
215
if self.feature in self.xmlstream.features:
218
raise FeatureNotAdvertized
224
Start the actual initialization.
226
May return a deferred for asynchronous initialization.
229
class TLSError(Exception):
234
class TLSFailed(TLSError):
236
Exception indicating failed TLS negotiation
239
class TLSRequired(TLSError):
241
Exception indicating required TLS negotiation.
243
This exception is raised when the receiving entity requires TLS
244
negotiation and the initiating does not desire to negotiate TLS.
247
class TLSNotSupported(TLSError):
249
Exception indicating missing TLS support.
251
This exception is raised when the initiating entity wants and requires to
252
negotiate TLS when the OpenSSL library is not available.
255
class TLSInitiatingInitializer(BaseFeatureInitiatingInitializer):
257
TLS stream initializer for the initiating entity.
259
It is strongly required to include this initializer in the list of
260
initializers for an XMPP stream. By default it will try to negotiate TLS.
261
An XMPP server may indicate that TLS is required. If TLS is not desired,
262
set the C{wanted} attribute to False instead of removing it from the list
263
of initializers, so a proper exception L{TLSRequired} can be raised.
265
@cvar wanted: indicates if TLS negotiation is wanted.
266
@type wanted: L{bool}
269
feature = (NS_XMPP_TLS, 'starttls')
273
def onProceed(self, obj):
275
Proceed with TLS negotiation and reset the XML stream.
278
self.xmlstream.removeObserver('/failure', self.onFailure)
279
ctx = ssl.CertificateOptions()
280
self.xmlstream.transport.startTLS(ctx)
281
self.xmlstream.reset()
282
self.xmlstream.sendHeader()
283
self._deferred.callback(Reset)
285
def onFailure(self, obj):
286
self.xmlstream.removeObserver('/proceed', self.onProceed)
287
self._deferred.errback(TLSFailed())
291
Start TLS negotiation.
293
This checks if the receiving entity requires TLS, the SSL library is
294
available and uses the C{required} and C{wanted} instance variables to
295
determine what to do in the various different cases.
297
For example, if the SSL library is not available, and wanted and
298
required by the user, it raises an exception. However if it is not
299
required by both parties, initialization silently succeeds, moving
305
return defer.fail(TLSNotSupported())
307
return defer.succeed(None)
310
elif self.xmlstream.features[self.feature].required:
311
return defer.fail(TLSRequired())
313
return defer.succeed(None)
315
self._deferred = defer.Deferred()
316
self.xmlstream.addOnetimeObserver("/proceed", self.onProceed)
317
self.xmlstream.addOnetimeObserver("/failure", self.onFailure)
318
self.xmlstream.send(domish.Element((NS_XMPP_TLS, "starttls")))
319
return self._deferred
323
class XmlStream(xmlstream.XmlStream):
325
XMPP XML Stream protocol handler.
327
@ivar version: XML stream version as a tuple (major, minor). Initially,
328
this is set to the minimally supported version. Upon
329
receiving the stream header of the peer, it is set to the
330
minimum of that value and the version on the received
332
@type version: (L{int}, L{int})
333
@ivar namespace: default namespace URI for stream
334
@type namespace: L{str}
335
@ivar thisHost: hostname of this entity
336
@ivar otherHost: hostname of the peer entity
337
@ivar sid: session identifier
339
@ivar initiating: True if this is the initiating stream
340
@type initiating: L{bool}
341
@ivar features: map of (uri, name) to stream features element received from
342
the receiving entity.
343
@type features: L{dict} of (L{str}, L{str}) to L{domish.Element}.
344
@ivar prefixes: map of URI to prefixes that are to appear on stream
346
@type prefixes: L{dict} of L{str} to L{str}
347
@ivar initializers: list of stream initializer objects
348
@type initializers: L{list} of objects that provide L{IInitializer}
349
@ivar authenticator: associated authenticator that uses C{initializers} to
350
initialize the XML stream.
354
namespace = 'invalid'
359
prefixes = {NS_STREAMS: 'stream'}
361
_headerSent = False # True if the stream header has been sent
363
def __init__(self, authenticator):
364
xmlstream.XmlStream.__init__(self)
366
self.authenticator = authenticator
367
self.initializers = []
370
# Reset the authenticator
371
authenticator.associateWithStream(self)
373
def _callLater(self, *args, **kwargs):
374
from twisted.internet import reactor
375
return reactor.callLater(*args, **kwargs)
381
Resets the XML Parser for incoming data. This is to be used after
382
successfully negotiating a new layer, e.g. TLS and SASL. Note that
383
registered event observers will continue to be in place.
385
self._headerSent = False
386
self._initializeStream()
389
def onStreamError(self, errelem):
391
Called when a stream:error element has been received.
393
Dispatches a L{STREAM_ERROR_EVENT} event with the error element to
394
allow for cleanup actions and drops the connection.
396
@param errelem: The received error element.
397
@type errelem: L{domish.Element}
399
self.dispatch(failure.Failure(error.exceptionFromStreamError(errelem)),
401
self.transport.loseConnection()
404
def onFeatures(self, features):
406
Called when a stream:features element has been received.
408
Stores the received features in the C{features} attribute, checks the
409
need for initiating TLS and notifies the authenticator of the start of
412
@param features: The received features element.
413
@type features: L{domish.Element}
416
for feature in features.elements():
417
self.features[(feature.uri, feature.name)] = feature
418
self.authenticator.streamStarted()
421
def sendHeader(self):
425
rootElem = domish.Element((NS_STREAMS, 'stream'), self.namespace)
427
if self.initiating and self.otherHost:
428
rootElem['to'] = self.otherHost
429
elif not self.initiating:
431
rootElem['from'] = self.thisHost
433
rootElem['id'] = self.sid
435
if self.version >= (1, 0):
436
rootElem['version'] = "%d.%d" % (self.version[0], self.version[1])
438
self.rootElem = rootElem
440
self.send(rootElem.toXml(prefixes=self.prefixes, closeElement=0))
441
self._headerSent = True
444
def sendFooter(self):
448
self.send('</stream:stream>')
451
def sendStreamError(self, streamError):
453
Send stream level error.
455
If we are the receiving entity, and haven't sent the header yet,
458
If the given C{failure} is a L{error.StreamError}, it is rendered
459
to its XML representation, otherwise a generic C{internal-error}
460
stream error is generated.
462
After sending the stream error, the stream is closed and the transport
465
if not self._headerSent and not self.initiating:
469
self.send(streamError.getElement())
472
self.transport.loseConnection()
477
Send data over the stream.
479
This overrides L{xmlstream.Xmlstream.send} to use the default namespace
480
of the stream header when serializing L{domish.IElement}s. It is
481
assumed that if you pass an object that provides L{domish.IElement},
482
it represents a direct child of the stream's root element.
484
if domish.IElement.providedBy(obj):
485
obj = obj.toXml(prefixes=self.prefixes,
486
defaultUri=self.namespace,
487
prefixesInScope=self.prefixes.values())
489
xmlstream.XmlStream.send(self, obj)
492
def connectionMade(self):
494
Called when a connection is made.
496
Notifies the authenticator when a connection has been made.
498
xmlstream.XmlStream.connectionMade(self)
499
self.authenticator.connectionMade()
502
def onDocumentStart(self, rootelem):
504
Called when the stream header has been received.
506
Extracts the header's C{id} and C{version} attributes from the root
507
element. The C{id} attribute is stored in our C{sid} attribute and the
508
C{version} attribute is parsed and the minimum of the version we sent
509
and the parsed C{version} attribute is stored as a tuple (major, minor)
510
in this class' C{version} attribute. If no C{version} attribute was
511
present, we assume version 0.0.
513
If appropriate (we are the initiating stream and the minimum of our and
514
the other party's version is at least 1.0), a one-time observer is
515
registered for getting the stream features. The registered function is
518
Ultimately, the authenticator's C{streamStarted} method will be called.
520
@param rootelem: The root element.
521
@type rootelem: L{domish.Element}
523
xmlstream.XmlStream.onDocumentStart(self, rootelem)
525
# Extract stream identifier
526
if rootelem.hasAttribute("id"):
527
self.sid = rootelem["id"]
529
# Extract stream version and take minimum with the version sent
530
if rootelem.hasAttribute("version"):
531
version = rootelem["version"].split(".")
533
version = (int(version[0]), int(version[1]))
534
except IndexError, ValueError:
539
self.version = min(self.version, version)
541
# Setup observer for stream errors
542
self.addOnetimeObserver("/error[@xmlns='%s']" % NS_STREAMS,
545
# Setup observer for stream features, if applicable
546
if self.initiating and self.version >= (1, 0):
547
self.addOnetimeObserver('/features[@xmlns="%s"]' % NS_STREAMS,
550
self.authenticator.streamStarted()
554
class XmlStreamFactory(xmlstream.XmlStreamFactory):
555
def __init__(self, authenticator):
556
xmlstream.XmlStreamFactory.__init__(self)
557
self.authenticator = authenticator
560
def buildProtocol(self, _):
562
# Create the stream and register all the bootstrap observers
563
xs = XmlStream(self.authenticator)
565
for event, fn in self.bootstraps: xs.addObserver(event, fn)
570
class TimeoutError(Exception):
572
Exception raised when no IQ response has been received before the
578
def upgradeWithIQResponseTracker(xs):
580
Enhances an XmlStream for iq response tracking.
582
This makes an L{XmlStream} object provide L{IIQResponseTracker}. When a
583
response is an error iq stanza, the deferred has its errback invoked with a
584
failure that holds a L{StanzaException<error.StanzaException>} that is
589
Handle iq response by firing associated deferred.
591
if getattr(iq, 'handled', False):
595
d = xs.iqDeferreds[iq["id"]]
599
del xs.iqDeferreds[iq["id"]]
601
if iq['type'] == 'error':
602
d.errback(error.exceptionFromStanza(iq))
609
Make sure deferreds do not linger on after disconnect.
611
This errbacks all deferreds of iq's for which no response has been
612
received with a L{ConnectionLost} failure. Otherwise, the deferreds
615
iqDeferreds = xs.iqDeferreds
617
for d in iqDeferreds.itervalues():
618
d.errback(ConnectionLost())
621
xs.iqDefaultTimeout = getattr(xs, 'iqDefaultTimeout', None)
622
xs.addObserver(xmlstream.STREAM_END_EVENT, disconnected)
623
xs.addObserver('/iq[@type="result"]', callback)
624
xs.addObserver('/iq[@type="error"]', callback)
625
directlyProvides(xs, ijabber.IIQResponseTracker)
629
class IQ(domish.Element):
631
Wrapper for an iq stanza.
633
Iq stanzas are used for communications with a request-response behaviour.
634
Each iq request is associated with an XML stream and has its own unique id
635
to be able to track the response.
637
@ivar timeout: if set, a timeout period after which the deferred returned
638
by C{send} will have its errback called with a
639
L{TimeoutError} failure.
640
@type timeout: C{float}
645
def __init__(self, xmlstream, type = "set"):
647
@type xmlstream: L{xmlstream.XmlStream}
648
@param xmlstream: XmlStream to use for transmission of this IQ
651
@param type: IQ type identifier ('get' or 'set')
653
domish.Element.__init__(self, (None, "iq"))
656
self._xmlstream = xmlstream
658
def send(self, to=None):
662
Returns a deferred that is fired when an iq response with the same id
663
is received. Result responses will be passed to the deferred callback.
664
Error responses will be transformed into a
665
L{StanzaError<error.StanzaError>} and result in the errback of the
666
deferred being invoked.
668
@rtype: L{defer.Deferred}
673
if not ijabber.IIQResponseTracker.providedBy(self._xmlstream):
674
upgradeWithIQResponseTracker(self._xmlstream)
677
self._xmlstream.iqDeferreds[self['id']] = d
679
timeout = self.timeout or self._xmlstream.iqDefaultTimeout
680
if timeout is not None:
682
del self._xmlstream.iqDeferreds[self['id']]
683
d.errback(TimeoutError("IQ timed out"))
685
call = self._xmlstream._callLater(timeout, onTimeout)
687
def cancelTimeout(result):
693
d.addBoth(cancelTimeout)
695
self._xmlstream.send(self)