1
# -*- test-case-name: twisted.words.test.test_jabbercomponent -*-
3
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
4
# See LICENSE for details.
7
External server-side components.
9
Most Jabber server implementations allow for add-on components that act as a
10
seperate entity on the Jabber network, but use the server-to-server
11
functionality of a regular Jabber IM server. These so-called 'external
12
components' are connected to the Jabber server using the Jabber Component
13
Protocol as defined in U{JEP-0114<http://www.jabber.org/jeps/jep-0114.html>}.
15
This module allows for writing external server-side component by assigning one
16
or more services implementing L{ijabber.IService} up to L{ServiceManager}. The
17
ServiceManager connects to the Jabber server and is responsible for the
18
corresponding XML stream.
21
from zope.interface import implements
23
from twisted.application import service
24
from twisted.internet import defer
25
from twisted.python import log
26
from twisted.words.xish import domish
27
from twisted.words.protocols.jabber import error, ijabber, jstrports, xmlstream
28
from twisted.words.protocols.jabber.jid import internJID as JID
30
NS_COMPONENT_ACCEPT = 'jabber:component:accept'
32
def componentFactory(componentid, password):
34
XML stream factory for external server-side components.
36
@param componentid: JID of the component.
37
@type componentid: L{unicode}
38
@param password: password used to authenticate to the server.
39
@type password: L{str}
41
a = ConnectComponentAuthenticator(componentid, password)
42
return xmlstream.XmlStreamFactory(a)
44
class ComponentInitiatingInitializer(object):
46
External server-side component authentication initializer for the
49
@ivar xmlstream: XML stream between server and component.
50
@type xmlstream: L{xmlstream.XmlStream}
53
def __init__(self, xs):
59
hs = domish.Element((self.xmlstream.namespace, "handshake"))
60
hs.addContent(xmlstream.hashPassword(xs.sid,
61
unicode(xs.authenticator.password)))
63
# Setup observer to watch for handshake result
64
xs.addOnetimeObserver("/handshake", self._cbHandshake)
66
self._deferred = defer.Deferred()
69
def _cbHandshake(self, _):
70
# we have successfully shaken hands and can now consider this
71
# entity to represent the component JID.
72
self.xmlstream.thisEntity = self.xmlstream.otherEntity
73
self._deferred.callback(None)
77
class ConnectComponentAuthenticator(xmlstream.ConnectAuthenticator):
79
Authenticator to permit an XmlStream to authenticate against a Jabber
80
server as an external component (where the Authenticator is initiating the
83
namespace = NS_COMPONENT_ACCEPT
85
def __init__(self, componentjid, password):
87
@type componentjid: L{str}
88
@param componentjid: Jabber ID that this component wishes to bind to.
90
@type password: L{str}
91
@param password: Password/secret this component uses to authenticate.
93
# Note that we are sending 'to' our desired component JID.
94
xmlstream.ConnectAuthenticator.__init__(self, componentjid)
95
self.password = password
97
def associateWithStream(self, xs):
99
xmlstream.ConnectAuthenticator.associateWithStream(self, xs)
101
xs.initializers = [ComponentInitiatingInitializer(xs)]
105
class ListenComponentAuthenticator(xmlstream.ListenAuthenticator):
107
Authenticator for accepting components.
110
@ivar secret: The shared secret used to authorized incoming component
112
@type secret: C{unicode}.
115
namespace = NS_COMPONENT_ACCEPT
117
def __init__(self, secret):
119
xmlstream.ListenAuthenticator.__init__(self)
122
def associateWithStream(self, xs):
124
Associate the authenticator with a stream.
126
This sets the stream's version to 0.0, because the XEP-0114 component
127
protocol was not designed for XMPP 1.0.
130
xmlstream.ListenAuthenticator.associateWithStream(self, xs)
133
def streamStarted(self, rootElement):
135
Called by the stream when it has started.
137
This examines the default namespace of the incoming stream and whether
138
there is a requested hostname for the component. Then it generates a
139
stream identifier, sends a response header and adds an observer for
140
the first incoming element, triggering L{onElement}.
143
xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
145
if rootElement.defaultUri != self.namespace:
146
exc = error.StreamError('invalid-namespace')
147
self.xmlstream.sendStreamError(exc)
150
# self.xmlstream.thisEntity is set to the address the component
152
if not self.xmlstream.thisEntity:
153
exc = error.StreamError('improper-addressing')
154
self.xmlstream.sendStreamError(exc)
157
self.xmlstream.sendHeader()
158
self.xmlstream.addOnetimeObserver('/*', self.onElement)
161
def onElement(self, element):
163
Called on incoming XML Stanzas.
165
The very first element received should be a request for handshake.
166
Otherwise, the stream is dropped with a 'not-authorized' error. If a
167
handshake request was received, the hash is extracted and passed to
170
if (element.uri, element.name) == (self.namespace, 'handshake'):
171
self.onHandshake(unicode(element))
173
exc = error.StreamError('not-authorized')
174
self.xmlstream.sendStreamError(exc)
177
def onHandshake(self, handshake):
179
Called upon receiving the handshake request.
181
This checks that the given hash in C{handshake} is equal to a
182
calculated hash, responding with a handshake reply or a stream error.
183
If the handshake was ok, the stream is authorized, and XML Stanzas may
186
calculatedHash = xmlstream.hashPassword(self.xmlstream.sid,
187
unicode(self.secret))
188
if handshake != calculatedHash:
189
exc = error.StreamError('not-authorized', text='Invalid hash')
190
self.xmlstream.sendStreamError(exc)
192
self.xmlstream.send('<handshake/>')
193
self.xmlstream.dispatch(self.xmlstream,
194
xmlstream.STREAM_AUTHD_EVENT)
198
class Service(service.Service):
200
External server-side component service.
203
implements(ijabber.IService)
205
def componentConnected(self, xs):
208
def componentDisconnected(self):
211
def transportConnected(self, xs):
216
Send data over service parent's XML stream.
218
@note: L{ServiceManager} maintains a queue for data sent using this
219
method when there is no current established XML stream. This data is
220
then sent as soon as a new stream has been established and initialized.
221
Subsequently, L{componentConnected} will be called again. If this
222
queueing is not desired, use C{send} on the XmlStream object (passed to
223
L{componentConnected}) directly.
225
@param obj: data to be sent over the XML stream. This is usually an
226
object providing L{domish.IElement}, or serialized XML. See
227
L{xmlstream.XmlStream} for details.
230
self.parent.send(obj)
232
class ServiceManager(service.MultiService):
234
Business logic representing a managed component connection to a Jabber
237
This service maintains a single connection to a Jabber router and provides
238
facilities for packet routing and transmission. Business logic modules are
239
services implementing L{ijabber.IService} (like subclasses of L{Service}), and
240
added as sub-service.
243
def __init__(self, jid, password):
244
service.MultiService.__init__(self)
248
self.xmlstream = None
250
# Internal buffer of packets
251
self._packetQueue = []
253
# Setup the xmlstream factory
254
self._xsFactory = componentFactory(self.jabberId, password)
256
# Register some lambda functions to keep the self.xmlstream var up to
258
self._xsFactory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
260
self._xsFactory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd)
261
self._xsFactory.addBootstrap(xmlstream.STREAM_END_EVENT,
264
# Map addBootstrap and removeBootstrap to the underlying factory -- is
265
# this right? I have no clue...but it'll work for now, until i can
266
# think about it more.
267
self.addBootstrap = self._xsFactory.addBootstrap
268
self.removeBootstrap = self._xsFactory.removeBootstrap
270
def getFactory(self):
271
return self._xsFactory
273
def _connected(self, xs):
276
if ijabber.IService.providedBy(c):
277
c.transportConnected(xs)
279
def _authd(self, xs):
280
# Flush all pending packets
281
for p in self._packetQueue:
282
self.xmlstream.send(p)
283
self._packetQueue = []
285
# Notify all child services which implement the IService interface
287
if ijabber.IService.providedBy(c):
288
c.componentConnected(xs)
290
def _disconnected(self, _):
291
self.xmlstream = None
293
# Notify all child services which implement
294
# the IService interface
296
if ijabber.IService.providedBy(c):
297
c.componentDisconnected()
301
Send data over the XML stream.
303
When there is no established XML stream, the data is queued and sent
304
out when a new XML stream has been established and initialized.
306
@param obj: data to be sent over the XML stream. This is usually an
307
object providing L{domish.IElement}, or serialized XML. See
308
L{xmlstream.XmlStream} for details.
311
if self.xmlstream != None:
312
self.xmlstream.send(obj)
314
self._packetQueue.append(obj)
316
def buildServiceManager(jid, password, strport):
318
Constructs a pre-built L{ServiceManager}, using the specified strport
322
svc = ServiceManager(jid, password)
323
client_svc = jstrports.client(strport, svc.getFactory())
324
client_svc.setServiceParent(svc)
329
class Router(object):
331
XMPP Server's Router.
333
A router connects the different components of the XMPP service and routes
334
messages between them based on the given routing table.
336
Connected components are trusted to have correct addressing in the
337
stanzas they offer for routing.
339
A route destination of C{None} adds a default route. Traffic for which no
340
specific route exists, will be routed to this default route.
343
@ivar routes: Routes based on the host part of JIDs. Maps host names to the
344
L{EventDispatcher<utility.EventDispatcher>}s that should
345
receive the traffic. A key of C{None} means the default
347
@type routes: C{dict}
354
def addRoute(self, destination, xs):
358
The passed XML Stream C{xs} will have an observer for all stanzas
359
added to route its outgoing traffic. In turn, traffic for
360
C{destination} will be passed to this stream.
362
@param destination: Destination of the route to be added as a host name
363
or C{None} for the default route.
364
@type destination: C{str} or C{NoneType}.
365
@param xs: XML Stream to register the route for.
366
@type xs: L{EventDispatcher<utility.EventDispatcher>}.
368
self.routes[destination] = xs
369
xs.addObserver('/*', self.route)
372
def removeRoute(self, destination, xs):
376
@param destination: Destination of the route that should be removed.
377
@type destination: C{str}.
378
@param xs: XML Stream to remove the route for.
379
@type xs: L{EventDispatcher<utility.EventDispatcher>}.
381
xs.removeObserver('/*', self.route)
382
if (xs == self.routes[destination]):
383
del self.routes[destination]
386
def route(self, stanza):
390
@param stanza: The stanza to be routed.
391
@type stanza: L{domish.Element}.
393
destination = JID(stanza['to'])
395
log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))
397
if destination.host in self.routes:
398
self.routes[destination.host].send(stanza)
400
self.routes[None].send(stanza)
404
class XMPPComponentServerFactory(xmlstream.XmlStreamServerFactory):
406
XMPP Component Server factory.
408
This factory accepts XMPP external component connections and makes
409
the router service route traffic for a component's bound domain
417
def __init__(self, router, secret='secret'):
421
def authenticatorFactory():
422
return ListenComponentAuthenticator(self.secret)
424
xmlstream.XmlStreamServerFactory.__init__(self, authenticatorFactory)
425
self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
426
self.onConnectionMade)
427
self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
428
self.onAuthenticated)
433
def onConnectionMade(self, xs):
435
Called when a component connection was made.
437
This enables traffic debugging on incoming streams.
439
xs.serial = self.serial
443
log.msg("RECV (%d): %r" % (xs.serial, buf))
446
log.msg("SEND (%d): %r" % (xs.serial, buf))
449
xs.rawDataInFn = logDataIn
450
xs.rawDataOutFn = logDataOut
452
xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
455
def onAuthenticated(self, xs):
457
Called when a component has succesfully authenticated.
459
Add the component to the routing table and establish a handler
460
for a closed connection.
462
destination = xs.thisEntity.host
464
self.router.addRoute(destination, xs)
465
xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost, 0,
469
def onError(self, reason):
470
log.err(reason, "Stream Error")
473
def onConnectionLost(self, destination, xs, reason):
474
self.router.removeRoute(destination, xs)