~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/words/protocols/jabber/xmlstream.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.words.test.test_jabberxmlstream -*-
 
2
#
 
3
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
 
4
# See LICENSE for details.
 
5
 
 
6
"""
 
7
XMPP XML Streams
 
8
 
 
9
Building blocks for setting up XML Streams, including helping classes for
 
10
doing authentication on either client or server side, and working with XML
 
11
Stanzas.
 
12
"""
 
13
 
 
14
from zope.interface import directlyProvides, implements
 
15
 
 
16
from twisted.internet import defer, protocol
 
17
from twisted.internet.error import ConnectionLost
 
18
from twisted.python import failure, log, randbytes
 
19
from twisted.python.hashlib import sha1
 
20
from twisted.words.protocols.jabber import error, ijabber, jid
 
21
from twisted.words.xish import domish, xmlstream
 
22
from twisted.words.xish.xmlstream import STREAM_CONNECTED_EVENT
 
23
from twisted.words.xish.xmlstream import STREAM_START_EVENT
 
24
from twisted.words.xish.xmlstream import STREAM_END_EVENT
 
25
from twisted.words.xish.xmlstream import STREAM_ERROR_EVENT
 
26
 
 
27
try:
 
28
    from twisted.internet import ssl
 
29
except ImportError:
 
30
    ssl = None
 
31
if ssl and not ssl.supported:
 
32
    ssl = None
 
33
 
 
34
STREAM_AUTHD_EVENT = intern("//event/stream/authd")
 
35
INIT_FAILED_EVENT = intern("//event/xmpp/initfailed")
 
36
 
 
37
NS_STREAMS = 'http://etherx.jabber.org/streams'
 
38
NS_XMPP_TLS = 'urn:ietf:params:xml:ns:xmpp-tls'
 
39
 
 
40
Reset = object()
 
41
 
 
42
def hashPassword(sid, password):
 
43
    """
 
44
    Create a SHA1-digest string of a session identifier and password.
 
45
 
 
46
    @param sid: The stream session identifier.
 
47
    @type sid: C{unicode}.
 
48
    @param password: The password to be hashed.
 
49
    @type password: C{unicode}.
 
50
    """
 
51
    if not isinstance(sid, unicode):
 
52
        raise TypeError("The session identifier must be a unicode object")
 
53
    if not isinstance(password, unicode):
 
54
        raise TypeError("The password must be a unicode object")
 
55
    input = u"%s%s" % (sid, password)
 
56
    return sha1(input.encode('utf-8')).hexdigest()
 
57
 
 
58
 
 
59
 
 
60
class Authenticator:
 
61
    """
 
62
    Base class for business logic of initializing an XmlStream
 
63
 
 
64
    Subclass this object to enable an XmlStream to initialize and authenticate
 
65
    to different types of stream hosts (such as clients, components, etc.).
 
66
 
 
67
    Rules:
 
68
      1. The Authenticator MUST dispatch a L{STREAM_AUTHD_EVENT} when the
 
69
         stream has been completely initialized.
 
70
      2. The Authenticator SHOULD reset all state information when
 
71
         L{associateWithStream} is called.
 
72
      3. The Authenticator SHOULD override L{streamStarted}, and start
 
73
         initialization there.
 
74
 
 
75
    @type xmlstream: L{XmlStream}
 
76
    @ivar xmlstream: The XmlStream that needs authentication
 
77
 
 
78
    @note: the term authenticator is historical. Authenticators perform
 
79
           all steps required to prepare the stream for the exchange
 
80
           of XML stanzas.
 
81
    """
 
82
 
 
83
    def __init__(self):
 
84
        self.xmlstream = None
 
85
 
 
86
 
 
87
    def connectionMade(self):
 
88
        """
 
89
        Called by the XmlStream when the underlying socket connection is
 
90
        in place.
 
91
 
 
92
        This allows the Authenticator to send an initial root element, if it's
 
93
        connecting, or wait for an inbound root from the peer if it's accepting
 
94
        the connection.
 
95
 
 
96
        Subclasses can use self.xmlstream.send() to send any initial data to
 
97
        the peer.
 
98
        """
 
99
 
 
100
 
 
101
    def streamStarted(self, rootElement):
 
102
        """
 
103
        Called by the XmlStream when the stream has started.
 
104
 
 
105
        A stream is considered to have started when the start tag of the root
 
106
        element has been received.
 
107
 
 
108
        This examines L{rootElement} to see if there is a version attribute.
 
109
        If absent, C{0.0} is assumed per RFC 3920. Subsequently, the
 
110
        minimum of the version from the received stream header and the
 
111
        value stored in L{xmlstream} is taken and put back in {xmlstream}.
 
112
 
 
113
        Extensions of this method can extract more information from the
 
114
        stream header and perform checks on them, optionally sending
 
115
        stream errors and closing the stream.
 
116
        """
 
117
        if rootElement.hasAttribute("version"):
 
118
            version = rootElement["version"].split(".")
 
119
            try:
 
120
                version = (int(version[0]), int(version[1]))
 
121
            except (IndexError, ValueError):
 
122
                version = (0, 0)
 
123
        else:
 
124
            version = (0, 0)
 
125
 
 
126
        self.xmlstream.version = min(self.xmlstream.version, version)
 
127
 
 
128
 
 
129
    def associateWithStream(self, xmlstream):
 
130
        """
 
131
        Called by the XmlStreamFactory when a connection has been made
 
132
        to the requested peer, and an XmlStream object has been
 
133
        instantiated.
 
134
 
 
135
        The default implementation just saves a handle to the new
 
136
        XmlStream.
 
137
 
 
138
        @type xmlstream: L{XmlStream}
 
139
        @param xmlstream: The XmlStream that will be passing events to this
 
140
                          Authenticator.
 
141
 
 
142
        """
 
143
        self.xmlstream = xmlstream
 
144
 
 
145
 
 
146
 
 
147
class ConnectAuthenticator(Authenticator):
 
148
    """
 
149
    Authenticator for initiating entities.
 
150
    """
 
151
 
 
152
    namespace = None
 
153
 
 
154
    def __init__(self, otherHost):
 
155
        self.otherHost = otherHost
 
156
 
 
157
 
 
158
    def connectionMade(self):
 
159
        self.xmlstream.namespace = self.namespace
 
160
        self.xmlstream.otherEntity = jid.internJID(self.otherHost)
 
161
        self.xmlstream.sendHeader()
 
162
 
 
163
 
 
164
    def initializeStream(self):
 
165
        """
 
166
        Perform stream initialization procedures.
 
167
 
 
168
        An L{XmlStream} holds a list of initializer objects in its
 
169
        C{initializers} attribute. This method calls these initializers in
 
170
        order and dispatches the C{STREAM_AUTHD_EVENT} event when the list has
 
171
        been successfully processed. Otherwise it dispatches the
 
172
        C{INIT_FAILED_EVENT} event with the failure.
 
173
 
 
174
        Initializers may return the special L{Reset} object to halt the
 
175
        initialization processing. It signals that the current initializer was
 
176
        successfully processed, but that the XML Stream has been reset. An
 
177
        example is the TLSInitiatingInitializer.
 
178
        """
 
179
 
 
180
        def remove_first(result):
 
181
            self.xmlstream.initializers.pop(0)
 
182
 
 
183
            return result
 
184
 
 
185
        def do_next(result):
 
186
            """
 
187
            Take the first initializer and process it.
 
188
 
 
189
            On success, the initializer is removed from the list and
 
190
            then next initializer will be tried.
 
191
            """
 
192
 
 
193
            if result is Reset:
 
194
                return None
 
195
 
 
196
            try:
 
197
                init = self.xmlstream.initializers[0]
 
198
            except IndexError:
 
199
                self.xmlstream.dispatch(self.xmlstream, STREAM_AUTHD_EVENT)
 
200
                return None
 
201
            else:
 
202
                d = defer.maybeDeferred(init.initialize)
 
203
                d.addCallback(remove_first)
 
204
                d.addCallback(do_next)
 
205
                return d
 
206
 
 
207
        d = defer.succeed(None)
 
208
        d.addCallback(do_next)
 
209
        d.addErrback(self.xmlstream.dispatch, INIT_FAILED_EVENT)
 
210
 
 
211
 
 
212
    def streamStarted(self, rootElement):
 
213
        """
 
214
        Called by the XmlStream when the stream has started.
 
215
 
 
216
        This extends L{Authenticator.streamStarted} to extract further stream
 
217
        headers from L{rootElement}, optionally wait for stream features being
 
218
        received and then call C{initializeStream}.
 
219
        """
 
220
 
 
221
        Authenticator.streamStarted(self, rootElement)
 
222
 
 
223
        self.xmlstream.sid = rootElement.getAttribute("id")
 
224
 
 
225
        if rootElement.hasAttribute("from"):
 
226
            self.xmlstream.otherEntity = jid.internJID(rootElement["from"])
 
227
 
 
228
        # Setup observer for stream features, if applicable
 
229
        if self.xmlstream.version >= (1, 0):
 
230
            def onFeatures(element):
 
231
                features = {}
 
232
                for feature in element.elements():
 
233
                    features[(feature.uri, feature.name)] = feature
 
234
 
 
235
                self.xmlstream.features = features
 
236
                self.initializeStream()
 
237
 
 
238
            self.xmlstream.addOnetimeObserver('/features[@xmlns="%s"]' %
 
239
                                                  NS_STREAMS,
 
240
                                              onFeatures)
 
241
        else:
 
242
            self.initializeStream()
 
243
 
 
244
 
 
245
 
 
246
class ListenAuthenticator(Authenticator):
 
247
    """
 
248
    Authenticator for receiving entities.
 
249
    """
 
250
 
 
251
    namespace = None
 
252
 
 
253
    def associateWithStream(self, xmlstream):
 
254
        """
 
255
        Called by the XmlStreamFactory when a connection has been made.
 
256
 
 
257
        Extend L{Authenticator.associateWithStream} to set the L{XmlStream}
 
258
        to be non-initiating.
 
259
        """
 
260
        Authenticator.associateWithStream(self, xmlstream)
 
261
        self.xmlstream.initiating = False
 
262
 
 
263
 
 
264
    def streamStarted(self, rootElement):
 
265
        """
 
266
        Called by the XmlStream when the stream has started.
 
267
 
 
268
        This extends L{Authenticator.streamStarted} to extract further
 
269
        information from the stream headers from L{rootElement}.
 
270
        """
 
271
        Authenticator.streamStarted(self, rootElement)
 
272
 
 
273
        self.xmlstream.namespace = rootElement.defaultUri
 
274
 
 
275
        if rootElement.hasAttribute("to"):
 
276
            self.xmlstream.thisEntity = jid.internJID(rootElement["to"])
 
277
 
 
278
        self.xmlstream.prefixes = {}
 
279
        for prefix, uri in rootElement.localPrefixes.iteritems():
 
280
            self.xmlstream.prefixes[uri] = prefix
 
281
 
 
282
        self.xmlstream.sid = randbytes.secureRandom(8).encode('hex')
 
283
 
 
284
 
 
285
 
 
286
class FeatureNotAdvertized(Exception):
 
287
    """
 
288
    Exception indicating a stream feature was not advertized, while required by
 
289
    the initiating entity.
 
290
    """
 
291
 
 
292
 
 
293
 
 
294
class BaseFeatureInitiatingInitializer(object):
 
295
    """
 
296
    Base class for initializers with a stream feature.
 
297
 
 
298
    This assumes the associated XmlStream represents the initiating entity
 
299
    of the connection.
 
300
 
 
301
    @cvar feature: tuple of (uri, name) of the stream feature root element.
 
302
    @type feature: tuple of (L{str}, L{str})
 
303
    @ivar required: whether the stream feature is required to be advertized
 
304
                    by the receiving entity.
 
305
    @type required: L{bool}
 
306
    """
 
307
 
 
308
    implements(ijabber.IInitiatingInitializer)
 
309
 
 
310
    feature = None
 
311
    required = False
 
312
 
 
313
    def __init__(self, xs):
 
314
        self.xmlstream = xs
 
315
 
 
316
 
 
317
    def initialize(self):
 
318
        """
 
319
        Initiate the initialization.
 
320
 
 
321
        Checks if the receiving entity advertizes the stream feature. If it
 
322
        does, the initialization is started. If it is not advertized, and the
 
323
        C{required} instance variable is L{True}, it raises
 
324
        L{FeatureNotAdvertized}. Otherwise, the initialization silently
 
325
        succeeds.
 
326
        """
 
327
 
 
328
        if self.feature in self.xmlstream.features:
 
329
            return self.start()
 
330
        elif self.required:
 
331
            raise FeatureNotAdvertized
 
332
        else:
 
333
            return None
 
334
 
 
335
 
 
336
    def start(self):
 
337
        """
 
338
        Start the actual initialization.
 
339
 
 
340
        May return a deferred for asynchronous initialization.
 
341
        """
 
342
 
 
343
 
 
344
 
 
345
class TLSError(Exception):
 
346
    """
 
347
    TLS base exception.
 
348
    """
 
349
 
 
350
 
 
351
 
 
352
class TLSFailed(TLSError):
 
353
    """
 
354
    Exception indicating failed TLS negotiation
 
355
    """
 
356
 
 
357
 
 
358
 
 
359
class TLSRequired(TLSError):
 
360
    """
 
361
    Exception indicating required TLS negotiation.
 
362
 
 
363
    This exception is raised when the receiving entity requires TLS
 
364
    negotiation and the initiating does not desire to negotiate TLS.
 
365
    """
 
366
 
 
367
 
 
368
 
 
369
class TLSNotSupported(TLSError):
 
370
    """
 
371
    Exception indicating missing TLS support.
 
372
 
 
373
    This exception is raised when the initiating entity wants and requires to
 
374
    negotiate TLS when the OpenSSL library is not available.
 
375
    """
 
376
 
 
377
 
 
378
 
 
379
class TLSInitiatingInitializer(BaseFeatureInitiatingInitializer):
 
380
    """
 
381
    TLS stream initializer for the initiating entity.
 
382
 
 
383
    It is strongly required to include this initializer in the list of
 
384
    initializers for an XMPP stream. By default it will try to negotiate TLS.
 
385
    An XMPP server may indicate that TLS is required. If TLS is not desired,
 
386
    set the C{wanted} attribute to False instead of removing it from the list
 
387
    of initializers, so a proper exception L{TLSRequired} can be raised.
 
388
 
 
389
    @cvar wanted: indicates if TLS negotiation is wanted.
 
390
    @type wanted: L{bool}
 
391
    """
 
392
 
 
393
    feature = (NS_XMPP_TLS, 'starttls')
 
394
    wanted = True
 
395
    _deferred = None
 
396
 
 
397
    def onProceed(self, obj):
 
398
        """
 
399
        Proceed with TLS negotiation and reset the XML stream.
 
400
        """
 
401
 
 
402
        self.xmlstream.removeObserver('/failure', self.onFailure)
 
403
        ctx = ssl.CertificateOptions()
 
404
        self.xmlstream.transport.startTLS(ctx)
 
405
        self.xmlstream.reset()
 
406
        self.xmlstream.sendHeader()
 
407
        self._deferred.callback(Reset)
 
408
 
 
409
 
 
410
    def onFailure(self, obj):
 
411
        self.xmlstream.removeObserver('/proceed', self.onProceed)
 
412
        self._deferred.errback(TLSFailed())
 
413
 
 
414
 
 
415
    def start(self):
 
416
        """
 
417
        Start TLS negotiation.
 
418
 
 
419
        This checks if the receiving entity requires TLS, the SSL library is
 
420
        available and uses the C{required} and C{wanted} instance variables to
 
421
        determine what to do in the various different cases.
 
422
 
 
423
        For example, if the SSL library is not available, and wanted and
 
424
        required by the user, it raises an exception. However if it is not
 
425
        required by both parties, initialization silently succeeds, moving
 
426
        on to the next step.
 
427
        """
 
428
        if self.wanted:
 
429
            if ssl is None:
 
430
                if self.required:
 
431
                    return defer.fail(TLSNotSupported())
 
432
                else:
 
433
                    return defer.succeed(None)
 
434
            else:
 
435
                pass
 
436
        elif self.xmlstream.features[self.feature].required:
 
437
            return defer.fail(TLSRequired())
 
438
        else:
 
439
            return defer.succeed(None)
 
440
 
 
441
        self._deferred = defer.Deferred()
 
442
        self.xmlstream.addOnetimeObserver("/proceed", self.onProceed)
 
443
        self.xmlstream.addOnetimeObserver("/failure", self.onFailure)
 
444
        self.xmlstream.send(domish.Element((NS_XMPP_TLS, "starttls")))
 
445
        return self._deferred
 
446
 
 
447
 
 
448
 
 
449
class XmlStream(xmlstream.XmlStream):
 
450
    """
 
451
    XMPP XML Stream protocol handler.
 
452
 
 
453
    @ivar version: XML stream version as a tuple (major, minor). Initially,
 
454
                   this is set to the minimally supported version. Upon
 
455
                   receiving the stream header of the peer, it is set to the
 
456
                   minimum of that value and the version on the received
 
457
                   header.
 
458
    @type version: (L{int}, L{int})
 
459
    @ivar namespace: default namespace URI for stream
 
460
    @type namespace: L{str}
 
461
    @ivar thisEntity: JID of this entity
 
462
    @type thisEntity: L{JID}
 
463
    @ivar otherEntity: JID of the peer entity
 
464
    @type otherEntity: L{JID}
 
465
    @ivar sid: session identifier
 
466
    @type sid: L{str}
 
467
    @ivar initiating: True if this is the initiating stream
 
468
    @type initiating: L{bool}
 
469
    @ivar features: map of (uri, name) to stream features element received from
 
470
                    the receiving entity.
 
471
    @type features: L{dict} of (L{str}, L{str}) to L{domish.Element}.
 
472
    @ivar prefixes: map of URI to prefixes that are to appear on stream
 
473
                    header.
 
474
    @type prefixes: L{dict} of L{str} to L{str}
 
475
    @ivar initializers: list of stream initializer objects
 
476
    @type initializers: L{list} of objects that provide L{IInitializer}
 
477
    @ivar authenticator: associated authenticator that uses C{initializers} to
 
478
                         initialize the XML stream.
 
479
    """
 
480
 
 
481
    version = (1, 0)
 
482
    namespace = 'invalid'
 
483
    thisEntity = None
 
484
    otherEntity = None
 
485
    sid = None
 
486
    initiating = True
 
487
 
 
488
    _headerSent = False     # True if the stream header has been sent
 
489
 
 
490
    def __init__(self, authenticator):
 
491
        xmlstream.XmlStream.__init__(self)
 
492
 
 
493
        self.prefixes = {NS_STREAMS: 'stream'}
 
494
        self.authenticator = authenticator
 
495
        self.initializers = []
 
496
        self.features = {}
 
497
 
 
498
        # Reset the authenticator
 
499
        authenticator.associateWithStream(self)
 
500
 
 
501
 
 
502
    def _callLater(self, *args, **kwargs):
 
503
        from twisted.internet import reactor
 
504
        return reactor.callLater(*args, **kwargs)
 
505
 
 
506
 
 
507
    def reset(self):
 
508
        """
 
509
        Reset XML Stream.
 
510
 
 
511
        Resets the XML Parser for incoming data. This is to be used after
 
512
        successfully negotiating a new layer, e.g. TLS and SASL. Note that
 
513
        registered event observers will continue to be in place.
 
514
        """
 
515
        self._headerSent = False
 
516
        self._initializeStream()
 
517
 
 
518
 
 
519
    def onStreamError(self, errelem):
 
520
        """
 
521
        Called when a stream:error element has been received.
 
522
 
 
523
        Dispatches a L{STREAM_ERROR_EVENT} event with the error element to
 
524
        allow for cleanup actions and drops the connection.
 
525
 
 
526
        @param errelem: The received error element.
 
527
        @type errelem: L{domish.Element}
 
528
        """
 
529
        self.dispatch(failure.Failure(error.exceptionFromStreamError(errelem)),
 
530
                      STREAM_ERROR_EVENT)
 
531
        self.transport.loseConnection()
 
532
 
 
533
 
 
534
    def sendHeader(self):
 
535
        """
 
536
        Send stream header.
 
537
        """
 
538
        # set up optional extra namespaces
 
539
        localPrefixes = {}
 
540
        for uri, prefix in self.prefixes.iteritems():
 
541
            if uri != NS_STREAMS:
 
542
                localPrefixes[prefix] = uri
 
543
 
 
544
        rootElement = domish.Element((NS_STREAMS, 'stream'), self.namespace,
 
545
                                     localPrefixes=localPrefixes)
 
546
 
 
547
        if self.otherEntity:
 
548
            rootElement['to'] = self.otherEntity.userhost()
 
549
 
 
550
        if self.thisEntity:
 
551
            rootElement['from'] = self.thisEntity.userhost()
 
552
 
 
553
        if not self.initiating and self.sid:
 
554
            rootElement['id'] = self.sid
 
555
 
 
556
        if self.version >= (1, 0):
 
557
            rootElement['version'] = "%d.%d" % self.version
 
558
 
 
559
        self.send(rootElement.toXml(prefixes=self.prefixes, closeElement=0))
 
560
        self._headerSent = True
 
561
 
 
562
 
 
563
    def sendFooter(self):
 
564
        """
 
565
        Send stream footer.
 
566
        """
 
567
        self.send('</stream:stream>')
 
568
 
 
569
 
 
570
    def sendStreamError(self, streamError):
 
571
        """
 
572
        Send stream level error.
 
573
 
 
574
        If we are the receiving entity, and haven't sent the header yet,
 
575
        we sent one first.
 
576
 
 
577
        After sending the stream error, the stream is closed and the transport
 
578
        connection dropped.
 
579
 
 
580
        @param streamError: stream error instance
 
581
        @type streamError: L{error.StreamError}
 
582
        """
 
583
        if not self._headerSent and not self.initiating:
 
584
            self.sendHeader()
 
585
 
 
586
        if self._headerSent:
 
587
            self.send(streamError.getElement())
 
588
            self.sendFooter()
 
589
 
 
590
        self.transport.loseConnection()
 
591
 
 
592
 
 
593
    def send(self, obj):
 
594
        """
 
595
        Send data over the stream.
 
596
 
 
597
        This overrides L{xmlstream.Xmlstream.send} to use the default namespace
 
598
        of the stream header when serializing L{domish.IElement}s. It is
 
599
        assumed that if you pass an object that provides L{domish.IElement},
 
600
        it represents a direct child of the stream's root element.
 
601
        """
 
602
        if domish.IElement.providedBy(obj):
 
603
            obj = obj.toXml(prefixes=self.prefixes,
 
604
                            defaultUri=self.namespace,
 
605
                            prefixesInScope=self.prefixes.values())
 
606
 
 
607
        xmlstream.XmlStream.send(self, obj)
 
608
 
 
609
 
 
610
    def connectionMade(self):
 
611
        """
 
612
        Called when a connection is made.
 
613
 
 
614
        Notifies the authenticator when a connection has been made.
 
615
        """
 
616
        xmlstream.XmlStream.connectionMade(self)
 
617
        self.authenticator.connectionMade()
 
618
 
 
619
 
 
620
    def onDocumentStart(self, rootElement):
 
621
        """
 
622
        Called when the stream header has been received.
 
623
 
 
624
        Extracts the header's C{id} and C{version} attributes from the root
 
625
        element. The C{id} attribute is stored in our C{sid} attribute and the
 
626
        C{version} attribute is parsed and the minimum of the version we sent
 
627
        and the parsed C{version} attribute is stored as a tuple (major, minor)
 
628
        in this class' C{version} attribute. If no C{version} attribute was
 
629
        present, we assume version 0.0.
 
630
 
 
631
        If appropriate (we are the initiating stream and the minimum of our and
 
632
        the other party's version is at least 1.0), a one-time observer is
 
633
        registered for getting the stream features. The registered function is
 
634
        C{onFeatures}.
 
635
 
 
636
        Ultimately, the authenticator's C{streamStarted} method will be called.
 
637
 
 
638
        @param rootElement: The root element.
 
639
        @type rootElement: L{domish.Element}
 
640
        """
 
641
        xmlstream.XmlStream.onDocumentStart(self, rootElement)
 
642
 
 
643
        # Setup observer for stream errors
 
644
        self.addOnetimeObserver("/error[@xmlns='%s']" % NS_STREAMS,
 
645
                                self.onStreamError)
 
646
 
 
647
        self.authenticator.streamStarted(rootElement)
 
648
 
 
649
 
 
650
 
 
651
class XmlStreamFactory(xmlstream.XmlStreamFactory):
 
652
    """
 
653
    Factory for Jabber XmlStream objects as a reconnecting client.
 
654
 
 
655
    Note that this differs from L{xmlstream.XmlStreamFactory} in that
 
656
    it generates Jabber specific L{XmlStream} instances that have
 
657
    authenticators.
 
658
    """
 
659
 
 
660
    protocol = XmlStream
 
661
 
 
662
    def __init__(self, authenticator):
 
663
        xmlstream.XmlStreamFactory.__init__(self, authenticator)
 
664
        self.authenticator = authenticator
 
665
 
 
666
 
 
667
 
 
668
class XmlStreamServerFactory(xmlstream.BootstrapMixin,
 
669
                             protocol.ServerFactory):
 
670
    """
 
671
    Factory for Jabber XmlStream objects as a server.
 
672
 
 
673
    @since: 8.2.
 
674
    @ivar authenticatorFactory: Factory callable that takes no arguments, to
 
675
                                create a fresh authenticator to be associated
 
676
                                with the XmlStream.
 
677
    """
 
678
 
 
679
    protocol = XmlStream
 
680
 
 
681
    def __init__(self, authenticatorFactory):
 
682
        xmlstream.BootstrapMixin.__init__(self)
 
683
        self.authenticatorFactory = authenticatorFactory
 
684
 
 
685
 
 
686
    def buildProtocol(self, addr):
 
687
        """
 
688
        Create an instance of XmlStream.
 
689
 
 
690
        A new authenticator instance will be created and passed to the new
 
691
        XmlStream. Registered bootstrap event observers are installed as well.
 
692
        """
 
693
        authenticator = self.authenticatorFactory()
 
694
        xs = self.protocol(authenticator)
 
695
        xs.factory = self
 
696
        self.installBootstraps(xs)
 
697
        return xs
 
698
 
 
699
 
 
700
 
 
701
class TimeoutError(Exception):
 
702
    """
 
703
    Exception raised when no IQ response has been received before the
 
704
    configured timeout.
 
705
    """
 
706
 
 
707
 
 
708
 
 
709
def upgradeWithIQResponseTracker(xs):
 
710
    """
 
711
    Enhances an XmlStream for iq response tracking.
 
712
 
 
713
    This makes an L{XmlStream} object provide L{IIQResponseTracker}. When a
 
714
    response is an error iq stanza, the deferred has its errback invoked with a
 
715
    failure that holds a L{StanzaException<error.StanzaException>} that is
 
716
    easier to examine.
 
717
    """
 
718
    def callback(iq):
 
719
        """
 
720
        Handle iq response by firing associated deferred.
 
721
        """
 
722
        if getattr(iq, 'handled', False):
 
723
            return
 
724
 
 
725
        try:
 
726
            d = xs.iqDeferreds[iq["id"]]
 
727
        except KeyError:
 
728
            pass
 
729
        else:
 
730
            del xs.iqDeferreds[iq["id"]]
 
731
            iq.handled = True
 
732
            if iq['type'] == 'error':
 
733
                d.errback(error.exceptionFromStanza(iq))
 
734
            else:
 
735
                d.callback(iq)
 
736
 
 
737
 
 
738
    def disconnected(_):
 
739
        """
 
740
        Make sure deferreds do not linger on after disconnect.
 
741
 
 
742
        This errbacks all deferreds of iq's for which no response has been
 
743
        received with a L{ConnectionLost} failure. Otherwise, the deferreds
 
744
        will never be fired.
 
745
        """
 
746
        iqDeferreds = xs.iqDeferreds
 
747
        xs.iqDeferreds = {}
 
748
        for d in iqDeferreds.itervalues():
 
749
            d.errback(ConnectionLost())
 
750
 
 
751
    xs.iqDeferreds = {}
 
752
    xs.iqDefaultTimeout = getattr(xs, 'iqDefaultTimeout', None)
 
753
    xs.addObserver(xmlstream.STREAM_END_EVENT, disconnected)
 
754
    xs.addObserver('/iq[@type="result"]', callback)
 
755
    xs.addObserver('/iq[@type="error"]', callback)
 
756
    directlyProvides(xs, ijabber.IIQResponseTracker)
 
757
 
 
758
 
 
759
 
 
760
class IQ(domish.Element):
 
761
    """
 
762
    Wrapper for an iq stanza.
 
763
 
 
764
    Iq stanzas are used for communications with a request-response behaviour.
 
765
    Each iq request is associated with an XML stream and has its own unique id
 
766
    to be able to track the response.
 
767
 
 
768
    @ivar timeout: if set, a timeout period after which the deferred returned
 
769
                   by C{send} will have its errback called with a
 
770
                   L{TimeoutError} failure.
 
771
    @type timeout: C{float}
 
772
    """
 
773
 
 
774
    timeout = None
 
775
 
 
776
    def __init__(self, xmlstream, stanzaType="set"):
 
777
        """
 
778
        @type xmlstream: L{xmlstream.XmlStream}
 
779
        @param xmlstream: XmlStream to use for transmission of this IQ
 
780
 
 
781
        @type stanzaType: L{str}
 
782
        @param stanzaType: IQ type identifier ('get' or 'set')
 
783
        """
 
784
        domish.Element.__init__(self, (None, "iq"))
 
785
        self.addUniqueId()
 
786
        self["type"] = stanzaType
 
787
        self._xmlstream = xmlstream
 
788
 
 
789
 
 
790
    def send(self, to=None):
 
791
        """
 
792
        Send out this iq.
 
793
 
 
794
        Returns a deferred that is fired when an iq response with the same id
 
795
        is received. Result responses will be passed to the deferred callback.
 
796
        Error responses will be transformed into a
 
797
        L{StanzaError<error.StanzaError>} and result in the errback of the
 
798
        deferred being invoked.
 
799
 
 
800
        @rtype: L{defer.Deferred}
 
801
        """
 
802
        if to is not None:
 
803
            self["to"] = to
 
804
 
 
805
        if not ijabber.IIQResponseTracker.providedBy(self._xmlstream):
 
806
            upgradeWithIQResponseTracker(self._xmlstream)
 
807
 
 
808
        d = defer.Deferred()
 
809
        self._xmlstream.iqDeferreds[self['id']] = d
 
810
 
 
811
        timeout = self.timeout or self._xmlstream.iqDefaultTimeout
 
812
        if timeout is not None:
 
813
            def onTimeout():
 
814
                del self._xmlstream.iqDeferreds[self['id']]
 
815
                d.errback(TimeoutError("IQ timed out"))
 
816
 
 
817
            call = self._xmlstream._callLater(timeout, onTimeout)
 
818
 
 
819
            def cancelTimeout(result):
 
820
                if call.active():
 
821
                    call.cancel()
 
822
 
 
823
                return result
 
824
 
 
825
            d.addBoth(cancelTimeout)
 
826
 
 
827
        self._xmlstream.send(self)
 
828
        return d
 
829
 
 
830
 
 
831
 
 
832
def toResponse(stanza, stanzaType=None):
 
833
    """
 
834
    Create a response stanza from another stanza.
 
835
 
 
836
    This takes the addressing and id attributes from a stanza to create a (new,
 
837
    empty) response stanza. The addressing attributes are swapped and the id
 
838
    copied. Optionally, the stanza type of the response can be specified.
 
839
 
 
840
    @param stanza: the original stanza
 
841
    @type stanza: L{domish.Element}
 
842
    @param stanzaType: optional response stanza type
 
843
    @type stanzaType: C{str}
 
844
    @return: the response stanza.
 
845
    @rtype: L{domish.Element}
 
846
    """
 
847
 
 
848
    toAddr = stanza.getAttribute('from')
 
849
    fromAddr = stanza.getAttribute('to')
 
850
    stanzaID = stanza.getAttribute('id')
 
851
 
 
852
    response = domish.Element((None, stanza.name))
 
853
    if toAddr:
 
854
        response['to'] = toAddr
 
855
    if fromAddr:
 
856
        response['from'] = fromAddr
 
857
    if stanzaID:
 
858
        response['id'] = stanzaID
 
859
    if stanzaType:
 
860
        response['type'] = stanzaType
 
861
 
 
862
    return response
 
863
 
 
864
 
 
865
 
 
866
class XMPPHandler(object):
 
867
    """
 
868
    XMPP protocol handler.
 
869
 
 
870
    Classes derived from this class implement (part of) one or more XMPP
 
871
    extension protocols, and are referred to as a subprotocol implementation.
 
872
    """
 
873
 
 
874
    implements(ijabber.IXMPPHandler)
 
875
 
 
876
    def __init__(self):
 
877
        self.parent = None
 
878
        self.xmlstream = None
 
879
 
 
880
 
 
881
    def setHandlerParent(self, parent):
 
882
        self.parent = parent
 
883
        self.parent.addHandler(self)
 
884
 
 
885
 
 
886
    def disownHandlerParent(self, parent):
 
887
        self.parent.removeHandler(self)
 
888
        self.parent = None
 
889
 
 
890
 
 
891
    def makeConnection(self, xs):
 
892
        self.xmlstream = xs
 
893
        self.connectionMade()
 
894
 
 
895
 
 
896
    def connectionMade(self):
 
897
        """
 
898
        Called after a connection has been established.
 
899
 
 
900
        Can be overridden to perform work before stream initialization.
 
901
        """
 
902
 
 
903
 
 
904
    def connectionInitialized(self):
 
905
        """
 
906
        The XML stream has been initialized.
 
907
 
 
908
        Can be overridden to perform work after stream initialization, e.g. to
 
909
        set up observers and start exchanging XML stanzas.
 
910
        """
 
911
 
 
912
 
 
913
    def connectionLost(self, reason):
 
914
        """
 
915
        The XML stream has been closed.
 
916
 
 
917
        This method can be extended to inspect the C{reason} argument and
 
918
        act on it.
 
919
        """
 
920
        self.xmlstream = None
 
921
 
 
922
 
 
923
    def send(self, obj):
 
924
        """
 
925
        Send data over the managed XML stream.
 
926
 
 
927
        @note: The stream manager maintains a queue for data sent using this
 
928
               method when there is no current initialized XML stream. This
 
929
               data is then sent as soon as a new stream has been established
 
930
               and initialized. Subsequently, L{connectionInitialized} will be
 
931
               called again. If this queueing is not desired, use C{send} on
 
932
               C{self.xmlstream}.
 
933
 
 
934
        @param obj: data to be sent over the XML stream. This is usually an
 
935
                    object providing L{domish.IElement}, or serialized XML. See
 
936
                    L{xmlstream.XmlStream} for details.
 
937
        """
 
938
        self.parent.send(obj)
 
939
 
 
940
 
 
941
 
 
942
class XMPPHandlerCollection(object):
 
943
    """
 
944
    Collection of XMPP subprotocol handlers.
 
945
 
 
946
    This allows for grouping of subprotocol handlers, but is not an
 
947
    L{XMPPHandler} itself, so this is not recursive.
 
948
 
 
949
    @ivar handlers: List of protocol handlers.
 
950
    @type handlers: L{list} of objects providing
 
951
                      L{IXMPPHandler}
 
952
    """
 
953
 
 
954
    implements(ijabber.IXMPPHandlerCollection)
 
955
 
 
956
    def __init__(self):
 
957
        self.handlers = []
 
958
 
 
959
 
 
960
    def __iter__(self):
 
961
        """
 
962
        Act as a container for handlers.
 
963
        """
 
964
        return iter(self.handlers)
 
965
 
 
966
 
 
967
    def addHandler(self, handler):
 
968
        """
 
969
        Add protocol handler.
 
970
 
 
971
        Protocol handlers are expected to provide L{ijabber.IXMPPHandler}.
 
972
        """
 
973
        self.handlers.append(handler)
 
974
 
 
975
 
 
976
    def removeHandler(self, handler):
 
977
        """
 
978
        Remove protocol handler.
 
979
        """
 
980
        self.handlers.remove(handler)
 
981
 
 
982
 
 
983
 
 
984
class StreamManager(XMPPHandlerCollection):
 
985
    """
 
986
    Business logic representing a managed XMPP connection.
 
987
 
 
988
    This maintains a single XMPP connection and provides facilities for packet
 
989
    routing and transmission. Business logic modules are objects providing
 
990
    L{ijabber.IXMPPHandler} (like subclasses of L{XMPPHandler}), and added
 
991
    using L{addHandler}.
 
992
 
 
993
    @ivar xmlstream: currently managed XML stream
 
994
    @type xmlstream: L{XmlStream}
 
995
    @ivar logTraffic: if true, log all traffic.
 
996
    @type logTraffic: L{bool}
 
997
    @ivar _initialized: Whether the stream represented by L{xmlstream} has
 
998
                        been initialized. This is used when caching outgoing
 
999
                        stanzas.
 
1000
    @type _initialized: C{bool}
 
1001
    @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
 
1002
    @type _packetQueue: L{list}
 
1003
    """
 
1004
 
 
1005
    logTraffic = False
 
1006
 
 
1007
    def __init__(self, factory):
 
1008
        XMPPHandlerCollection.__init__(self)
 
1009
        self.xmlstream = None
 
1010
        self._packetQueue = []
 
1011
        self._initialized = False
 
1012
 
 
1013
        factory.addBootstrap(STREAM_CONNECTED_EVENT, self._connected)
 
1014
        factory.addBootstrap(STREAM_AUTHD_EVENT, self._authd)
 
1015
        factory.addBootstrap(INIT_FAILED_EVENT, self.initializationFailed)
 
1016
        factory.addBootstrap(STREAM_END_EVENT, self._disconnected)
 
1017
        self.factory = factory
 
1018
 
 
1019
 
 
1020
    def addHandler(self, handler):
 
1021
        """
 
1022
        Add protocol handler.
 
1023
 
 
1024
        When an XML stream has already been established, the handler's
 
1025
        C{connectionInitialized} will be called to get it up to speed.
 
1026
        """
 
1027
        XMPPHandlerCollection.addHandler(self, handler)
 
1028
 
 
1029
        # get protocol handler up to speed when a connection has already
 
1030
        # been established
 
1031
        if self.xmlstream and self._initialized:
 
1032
            handler.makeConnection(self.xmlstream)
 
1033
            handler.connectionInitialized()
 
1034
 
 
1035
 
 
1036
    def _connected(self, xs):
 
1037
        """
 
1038
        Called when the transport connection has been established.
 
1039
 
 
1040
        Here we optionally set up traffic logging (depending on L{logTraffic})
 
1041
        and call each handler's C{makeConnection} method with the L{XmlStream}
 
1042
        instance.
 
1043
        """
 
1044
        def logDataIn(buf):
 
1045
            log.msg("RECV: %r" % buf)
 
1046
 
 
1047
        def logDataOut(buf):
 
1048
            log.msg("SEND: %r" % buf)
 
1049
 
 
1050
        if self.logTraffic:
 
1051
            xs.rawDataInFn = logDataIn
 
1052
            xs.rawDataOutFn = logDataOut
 
1053
 
 
1054
        self.xmlstream = xs
 
1055
 
 
1056
        for e in self:
 
1057
            e.makeConnection(xs)
 
1058
 
 
1059
 
 
1060
    def _authd(self, xs):
 
1061
        """
 
1062
        Called when the stream has been initialized.
 
1063
 
 
1064
        Send out cached stanzas and call each handler's
 
1065
        C{connectionInitialized} method.
 
1066
        """
 
1067
        # Flush all pending packets
 
1068
        for p in self._packetQueue:
 
1069
            xs.send(p)
 
1070
        self._packetQueue = []
 
1071
        self._initialized = True
 
1072
 
 
1073
        # Notify all child services which implement
 
1074
        # the IService interface
 
1075
        for e in self:
 
1076
            e.connectionInitialized()
 
1077
 
 
1078
 
 
1079
    def initializationFailed(self, reason):
 
1080
        """
 
1081
        Called when stream initialization has failed.
 
1082
 
 
1083
        Stream initialization has halted, with the reason indicated by
 
1084
        C{reason}. It may be retried by calling the authenticator's
 
1085
        C{initializeStream}. See the respective authenticators for details.
 
1086
 
 
1087
        @param reason: A failure instance indicating why stream initialization
 
1088
                       failed.
 
1089
        @type reason: L{failure.Failure}
 
1090
        """
 
1091
 
 
1092
 
 
1093
    def _disconnected(self, _):
 
1094
        """
 
1095
        Called when the stream has been closed.
 
1096
 
 
1097
        From this point on, the manager doesn't interact with the
 
1098
        L{XmlStream} anymore and notifies each handler that the connection
 
1099
        was lost by calling its C{connectionLost} method.
 
1100
        """
 
1101
        self.xmlstream = None
 
1102
        self._initialized = False
 
1103
 
 
1104
        # Notify all child services which implement
 
1105
        # the IService interface
 
1106
        for e in self:
 
1107
            e.connectionLost(None)
 
1108
 
 
1109
 
 
1110
    def send(self, obj):
 
1111
        """
 
1112
        Send data over the XML stream.
 
1113
 
 
1114
        When there is no established XML stream, the data is queued and sent
 
1115
        out when a new XML stream has been established and initialized.
 
1116
 
 
1117
        @param obj: data to be sent over the XML stream. See
 
1118
                    L{xmlstream.XmlStream.send} for details.
 
1119
        """
 
1120
        if self._initialized:
 
1121
            self.xmlstream.send(obj)
 
1122
        else:
 
1123
            self._packetQueue.append(obj)
 
1124
 
 
1125
 
 
1126
 
 
1127
__all__ = ['Authenticator', 'BaseFeatureInitiatingInitializer',
 
1128
           'ConnectAuthenticator', 'ConnectionLost', 'FeatureNotAdvertized',
 
1129
           'INIT_FAILED_EVENT', 'IQ', 'ListenAuthenticator', 'NS_STREAMS',
 
1130
           'NS_XMPP_TLS', 'Reset', 'STREAM_AUTHD_EVENT',
 
1131
           'STREAM_CONNECTED_EVENT', 'STREAM_END_EVENT', 'STREAM_ERROR_EVENT',
 
1132
           'STREAM_START_EVENT', 'StreamManager', 'TLSError', 'TLSFailed',
 
1133
           'TLSInitiatingInitializer', 'TLSNotSupported', 'TLSRequired',
 
1134
           'TimeoutError', 'XMPPHandler', 'XMPPHandlerCollection', 'XmlStream',
 
1135
           'XmlStreamFactory', 'XmlStreamServerFactory', 'hashPassword',
 
1136
           'toResponse', 'upgradeWithIQResponseTracker']