~jk0/nova/xs-ipv6

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/words/protocols/jabber/component.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.words.test.test_jabbercomponent -*-
 
2
#
 
3
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
 
4
# See LICENSE for details.
 
5
 
 
6
"""
 
7
External server-side components.
 
8
 
 
9
Most Jabber server implementations allow for add-on components that act as a
 
10
seperate entity on the Jabber network, but use the server-to-server
 
11
functionality of a regular Jabber IM server. These so-called 'external
 
12
components' are connected to the Jabber server using the Jabber Component
 
13
Protocol as defined in U{JEP-0114<http://www.jabber.org/jeps/jep-0114.html>}.
 
14
 
 
15
This module allows for writing external server-side component by assigning one
 
16
or more services implementing L{ijabber.IService} up to L{ServiceManager}. The
 
17
ServiceManager connects to the Jabber server and is responsible for the
 
18
corresponding XML stream.
 
19
"""
 
20
 
 
21
from zope.interface import implements
 
22
 
 
23
from twisted.application import service
 
24
from twisted.internet import defer
 
25
from twisted.python import log
 
26
from twisted.words.xish import domish
 
27
from twisted.words.protocols.jabber import error, ijabber, jstrports, xmlstream
 
28
from twisted.words.protocols.jabber.jid import internJID as JID
 
29
 
 
30
NS_COMPONENT_ACCEPT = 'jabber:component:accept'
 
31
 
 
32
def componentFactory(componentid, password):
 
33
    """
 
34
    XML stream factory for external server-side components.
 
35
 
 
36
    @param componentid: JID of the component.
 
37
    @type componentid: L{unicode}
 
38
    @param password: password used to authenticate to the server.
 
39
    @type password: L{str}
 
40
    """
 
41
    a = ConnectComponentAuthenticator(componentid, password)
 
42
    return xmlstream.XmlStreamFactory(a)
 
43
 
 
44
class ComponentInitiatingInitializer(object):
 
45
    """
 
46
    External server-side component authentication initializer for the
 
47
    initiating entity.
 
48
 
 
49
    @ivar xmlstream: XML stream between server and component.
 
50
    @type xmlstream: L{xmlstream.XmlStream}
 
51
    """
 
52
 
 
53
    def __init__(self, xs):
 
54
        self.xmlstream = xs
 
55
        self._deferred = None
 
56
 
 
57
    def initialize(self):
 
58
        xs = self.xmlstream
 
59
        hs = domish.Element((self.xmlstream.namespace, "handshake"))
 
60
        hs.addContent(xmlstream.hashPassword(xs.sid,
 
61
                                             unicode(xs.authenticator.password)))
 
62
 
 
63
        # Setup observer to watch for handshake result
 
64
        xs.addOnetimeObserver("/handshake", self._cbHandshake)
 
65
        xs.send(hs)
 
66
        self._deferred = defer.Deferred()
 
67
        return self._deferred
 
68
 
 
69
    def _cbHandshake(self, _):
 
70
        # we have successfully shaken hands and can now consider this
 
71
        # entity to represent the component JID.
 
72
        self.xmlstream.thisEntity = self.xmlstream.otherEntity
 
73
        self._deferred.callback(None)
 
74
 
 
75
 
 
76
 
 
77
class ConnectComponentAuthenticator(xmlstream.ConnectAuthenticator):
 
78
    """
 
79
    Authenticator to permit an XmlStream to authenticate against a Jabber
 
80
    server as an external component (where the Authenticator is initiating the
 
81
    stream).
 
82
    """
 
83
    namespace = NS_COMPONENT_ACCEPT
 
84
 
 
85
    def __init__(self, componentjid, password):
 
86
        """
 
87
        @type componentjid: L{str}
 
88
        @param componentjid: Jabber ID that this component wishes to bind to.
 
89
 
 
90
        @type password: L{str}
 
91
        @param password: Password/secret this component uses to authenticate.
 
92
        """
 
93
        # Note that we are sending 'to' our desired component JID.
 
94
        xmlstream.ConnectAuthenticator.__init__(self, componentjid)
 
95
        self.password = password
 
96
 
 
97
    def associateWithStream(self, xs):
 
98
        xs.version = (0, 0)
 
99
        xmlstream.ConnectAuthenticator.associateWithStream(self, xs)
 
100
 
 
101
        xs.initializers = [ComponentInitiatingInitializer(xs)]
 
102
 
 
103
 
 
104
 
 
105
class ListenComponentAuthenticator(xmlstream.ListenAuthenticator):
 
106
    """
 
107
    Authenticator for accepting components.
 
108
 
 
109
    @since: 8.2
 
110
    @ivar secret: The shared secret used to authorized incoming component
 
111
                  connections.
 
112
    @type secret: C{unicode}.
 
113
    """
 
114
 
 
115
    namespace = NS_COMPONENT_ACCEPT
 
116
 
 
117
    def __init__(self, secret):
 
118
        self.secret = secret
 
119
        xmlstream.ListenAuthenticator.__init__(self)
 
120
 
 
121
 
 
122
    def associateWithStream(self, xs):
 
123
        """
 
124
        Associate the authenticator with a stream.
 
125
 
 
126
        This sets the stream's version to 0.0, because the XEP-0114 component
 
127
        protocol was not designed for XMPP 1.0.
 
128
        """
 
129
        xs.version = (0, 0)
 
130
        xmlstream.ListenAuthenticator.associateWithStream(self, xs)
 
131
 
 
132
 
 
133
    def streamStarted(self, rootElement):
 
134
        """
 
135
        Called by the stream when it has started.
 
136
 
 
137
        This examines the default namespace of the incoming stream and whether
 
138
        there is a requested hostname for the component. Then it generates a
 
139
        stream identifier, sends a response header and adds an observer for
 
140
        the first incoming element, triggering L{onElement}.
 
141
        """
 
142
 
 
143
        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
 
144
 
 
145
        if rootElement.defaultUri != self.namespace:
 
146
            exc = error.StreamError('invalid-namespace')
 
147
            self.xmlstream.sendStreamError(exc)
 
148
            return
 
149
 
 
150
        # self.xmlstream.thisEntity is set to the address the component
 
151
        # wants to assume.
 
152
        if not self.xmlstream.thisEntity:
 
153
            exc = error.StreamError('improper-addressing')
 
154
            self.xmlstream.sendStreamError(exc)
 
155
            return
 
156
 
 
157
        self.xmlstream.sendHeader()
 
158
        self.xmlstream.addOnetimeObserver('/*', self.onElement)
 
159
 
 
160
 
 
161
    def onElement(self, element):
 
162
        """
 
163
        Called on incoming XML Stanzas.
 
164
 
 
165
        The very first element received should be a request for handshake.
 
166
        Otherwise, the stream is dropped with a 'not-authorized' error. If a
 
167
        handshake request was received, the hash is extracted and passed to
 
168
        L{onHandshake}.
 
169
        """
 
170
        if (element.uri, element.name) == (self.namespace, 'handshake'):
 
171
            self.onHandshake(unicode(element))
 
172
        else:
 
173
            exc = error.StreamError('not-authorized')
 
174
            self.xmlstream.sendStreamError(exc)
 
175
 
 
176
 
 
177
    def onHandshake(self, handshake):
 
178
        """
 
179
        Called upon receiving the handshake request.
 
180
 
 
181
        This checks that the given hash in C{handshake} is equal to a
 
182
        calculated hash, responding with a handshake reply or a stream error.
 
183
        If the handshake was ok, the stream is authorized, and  XML Stanzas may
 
184
        be exchanged.
 
185
        """
 
186
        calculatedHash = xmlstream.hashPassword(self.xmlstream.sid,
 
187
                                                unicode(self.secret))
 
188
        if handshake != calculatedHash:
 
189
            exc = error.StreamError('not-authorized', text='Invalid hash')
 
190
            self.xmlstream.sendStreamError(exc)
 
191
        else:
 
192
            self.xmlstream.send('<handshake/>')
 
193
            self.xmlstream.dispatch(self.xmlstream,
 
194
                                    xmlstream.STREAM_AUTHD_EVENT)
 
195
 
 
196
 
 
197
 
 
198
class Service(service.Service):
 
199
    """
 
200
    External server-side component service.
 
201
    """
 
202
 
 
203
    implements(ijabber.IService)
 
204
 
 
205
    def componentConnected(self, xs):
 
206
        pass
 
207
 
 
208
    def componentDisconnected(self):
 
209
        pass
 
210
 
 
211
    def transportConnected(self, xs):
 
212
        pass
 
213
 
 
214
    def send(self, obj):
 
215
        """
 
216
        Send data over service parent's XML stream.
 
217
 
 
218
        @note: L{ServiceManager} maintains a queue for data sent using this
 
219
        method when there is no current established XML stream. This data is
 
220
        then sent as soon as a new stream has been established and initialized.
 
221
        Subsequently, L{componentConnected} will be called again. If this
 
222
        queueing is not desired, use C{send} on the XmlStream object (passed to
 
223
        L{componentConnected}) directly.
 
224
 
 
225
        @param obj: data to be sent over the XML stream. This is usually an
 
226
        object providing L{domish.IElement}, or serialized XML. See
 
227
        L{xmlstream.XmlStream} for details.
 
228
        """
 
229
 
 
230
        self.parent.send(obj)
 
231
 
 
232
class ServiceManager(service.MultiService):
 
233
    """
 
234
    Business logic representing a managed component connection to a Jabber
 
235
    router.
 
236
 
 
237
    This service maintains a single connection to a Jabber router and provides
 
238
    facilities for packet routing and transmission. Business logic modules are
 
239
    services implementing L{ijabber.IService} (like subclasses of L{Service}), and
 
240
    added as sub-service.
 
241
    """
 
242
 
 
243
    def __init__(self, jid, password):
 
244
        service.MultiService.__init__(self)
 
245
 
 
246
        # Setup defaults
 
247
        self.jabberId = jid
 
248
        self.xmlstream = None
 
249
 
 
250
        # Internal buffer of packets
 
251
        self._packetQueue = []
 
252
 
 
253
        # Setup the xmlstream factory
 
254
        self._xsFactory = componentFactory(self.jabberId, password)
 
255
 
 
256
        # Register some lambda functions to keep the self.xmlstream var up to
 
257
        # date
 
258
        self._xsFactory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
 
259
                                     self._connected)
 
260
        self._xsFactory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd)
 
261
        self._xsFactory.addBootstrap(xmlstream.STREAM_END_EVENT,
 
262
                                     self._disconnected)
 
263
 
 
264
        # Map addBootstrap and removeBootstrap to the underlying factory -- is
 
265
        # this right? I have no clue...but it'll work for now, until i can
 
266
        # think about it more.
 
267
        self.addBootstrap = self._xsFactory.addBootstrap
 
268
        self.removeBootstrap = self._xsFactory.removeBootstrap
 
269
 
 
270
    def getFactory(self):
 
271
        return self._xsFactory
 
272
 
 
273
    def _connected(self, xs):
 
274
        self.xmlstream = xs
 
275
        for c in self:
 
276
            if ijabber.IService.providedBy(c):
 
277
                c.transportConnected(xs)
 
278
 
 
279
    def _authd(self, xs):
 
280
        # Flush all pending packets
 
281
        for p in self._packetQueue:
 
282
            self.xmlstream.send(p)
 
283
        self._packetQueue = []
 
284
 
 
285
        # Notify all child services which implement the IService interface
 
286
        for c in self:
 
287
            if ijabber.IService.providedBy(c):
 
288
                c.componentConnected(xs)
 
289
 
 
290
    def _disconnected(self, _):
 
291
        self.xmlstream = None
 
292
 
 
293
        # Notify all child services which implement
 
294
        # the IService interface
 
295
        for c in self:
 
296
            if ijabber.IService.providedBy(c):
 
297
                c.componentDisconnected()
 
298
 
 
299
    def send(self, obj):
 
300
        """
 
301
        Send data over the XML stream.
 
302
 
 
303
        When there is no established XML stream, the data is queued and sent
 
304
        out when a new XML stream has been established and initialized.
 
305
 
 
306
        @param obj: data to be sent over the XML stream. This is usually an
 
307
        object providing L{domish.IElement}, or serialized XML. See
 
308
        L{xmlstream.XmlStream} for details.
 
309
        """
 
310
 
 
311
        if self.xmlstream != None:
 
312
            self.xmlstream.send(obj)
 
313
        else:
 
314
            self._packetQueue.append(obj)
 
315
 
 
316
def buildServiceManager(jid, password, strport):
 
317
    """
 
318
    Constructs a pre-built L{ServiceManager}, using the specified strport
 
319
    string.
 
320
    """
 
321
 
 
322
    svc = ServiceManager(jid, password)
 
323
    client_svc = jstrports.client(strport, svc.getFactory())
 
324
    client_svc.setServiceParent(svc)
 
325
    return svc
 
326
 
 
327
 
 
328
 
 
329
class Router(object):
 
330
    """
 
331
    XMPP Server's Router.
 
332
 
 
333
    A router connects the different components of the XMPP service and routes
 
334
    messages between them based on the given routing table.
 
335
 
 
336
    Connected components are trusted to have correct addressing in the
 
337
    stanzas they offer for routing.
 
338
 
 
339
    A route destination of C{None} adds a default route. Traffic for which no
 
340
    specific route exists, will be routed to this default route.
 
341
 
 
342
    @since: 8.2
 
343
    @ivar routes: Routes based on the host part of JIDs. Maps host names to the
 
344
                  L{EventDispatcher<utility.EventDispatcher>}s that should
 
345
                  receive the traffic. A key of C{None} means the default
 
346
                  route.
 
347
    @type routes: C{dict}
 
348
    """
 
349
 
 
350
    def __init__(self):
 
351
        self.routes = {}
 
352
 
 
353
 
 
354
    def addRoute(self, destination, xs):
 
355
        """
 
356
        Add a new route.
 
357
 
 
358
        The passed XML Stream C{xs} will have an observer for all stanzas
 
359
        added to route its outgoing traffic. In turn, traffic for
 
360
        C{destination} will be passed to this stream.
 
361
 
 
362
        @param destination: Destination of the route to be added as a host name
 
363
                            or C{None} for the default route.
 
364
        @type destination: C{str} or C{NoneType}.
 
365
        @param xs: XML Stream to register the route for.
 
366
        @type xs: L{EventDispatcher<utility.EventDispatcher>}.
 
367
        """
 
368
        self.routes[destination] = xs
 
369
        xs.addObserver('/*', self.route)
 
370
 
 
371
 
 
372
    def removeRoute(self, destination, xs):
 
373
        """
 
374
        Remove a route.
 
375
 
 
376
        @param destination: Destination of the route that should be removed.
 
377
        @type destination: C{str}.
 
378
        @param xs: XML Stream to remove the route for.
 
379
        @type xs: L{EventDispatcher<utility.EventDispatcher>}.
 
380
        """
 
381
        xs.removeObserver('/*', self.route)
 
382
        if (xs == self.routes[destination]):
 
383
            del self.routes[destination]
 
384
 
 
385
 
 
386
    def route(self, stanza):
 
387
        """
 
388
        Route a stanza.
 
389
 
 
390
        @param stanza: The stanza to be routed.
 
391
        @type stanza: L{domish.Element}.
 
392
        """
 
393
        destination = JID(stanza['to'])
 
394
 
 
395
        log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))
 
396
 
 
397
        if destination.host in self.routes:
 
398
            self.routes[destination.host].send(stanza)
 
399
        else:
 
400
            self.routes[None].send(stanza)
 
401
 
 
402
 
 
403
 
 
404
class XMPPComponentServerFactory(xmlstream.XmlStreamServerFactory):
 
405
    """
 
406
    XMPP Component Server factory.
 
407
 
 
408
    This factory accepts XMPP external component connections and makes
 
409
    the router service route traffic for a component's bound domain
 
410
    to that component.
 
411
 
 
412
    @since: 8.2
 
413
    """
 
414
 
 
415
    logTraffic = False
 
416
 
 
417
    def __init__(self, router, secret='secret'):
 
418
        self.router = router
 
419
        self.secret = secret
 
420
 
 
421
        def authenticatorFactory():
 
422
            return ListenComponentAuthenticator(self.secret)
 
423
 
 
424
        xmlstream.XmlStreamServerFactory.__init__(self, authenticatorFactory)
 
425
        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
 
426
                          self.onConnectionMade)
 
427
        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
 
428
                          self.onAuthenticated)
 
429
 
 
430
        self.serial = 0
 
431
 
 
432
 
 
433
    def onConnectionMade(self, xs):
 
434
        """
 
435
        Called when a component connection was made.
 
436
 
 
437
        This enables traffic debugging on incoming streams.
 
438
        """
 
439
        xs.serial = self.serial
 
440
        self.serial += 1
 
441
 
 
442
        def logDataIn(buf):
 
443
            log.msg("RECV (%d): %r" % (xs.serial, buf))
 
444
 
 
445
        def logDataOut(buf):
 
446
            log.msg("SEND (%d): %r" % (xs.serial, buf))
 
447
 
 
448
        if self.logTraffic:
 
449
            xs.rawDataInFn = logDataIn
 
450
            xs.rawDataOutFn = logDataOut
 
451
 
 
452
        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
 
453
 
 
454
 
 
455
    def onAuthenticated(self, xs):
 
456
        """
 
457
        Called when a component has succesfully authenticated.
 
458
 
 
459
        Add the component to the routing table and establish a handler
 
460
        for a closed connection.
 
461
        """
 
462
        destination = xs.thisEntity.host
 
463
 
 
464
        self.router.addRoute(destination, xs)
 
465
        xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost, 0,
 
466
                                                   destination, xs)
 
467
 
 
468
 
 
469
    def onError(self, reason):
 
470
        log.err(reason, "Stream Error")
 
471
 
 
472
 
 
473
    def onConnectionLost(self, destination, xs, reason):
 
474
        self.router.removeRoute(destination, xs)