1
# -*- test-case-name: epsilon.test.test_amprouter -*-
2
# Copyright (c) 2008 Divmod. See LICENSE for details.
5
This module provides an implementation of I{Routes}, a system for multiplexing
6
multiple L{IBoxReceiver}/I{IBoxSender} pairs over a single L{AMP} connection.
9
from itertools import count
11
from zope.interface import implements
13
from twisted.protocols.amp import IBoxReceiver, IBoxSender
15
from epsilon.structlike import record
20
_unspecified = object()
23
class RouteNotConnected(Exception):
25
An attempt was made to send AMP boxes through a L{Route} which is not yet
26
connected to anything.
31
class Route(record('router receiver localRouteName remoteRouteName',
32
remoteRouteName=_unspecified)):
34
Wrap up a route name and a box sender to transparently add the route name
35
to boxes sent through this box sender.
37
@ivar router: The L{Router} which created this route. This will be used
38
for route tear down and for its L{IBoxSender}, to send boxes.
40
@ivar receiver: The receiver which will be started with this object as its
43
@type localRouteName: C{unicode}
44
@ivar localRouteName: The name of this route as known by the other side of
45
the AMP connection. AMP boxes with this route are expected to be
46
routed to this object.
48
@type remoteRouteName: C{unicode} or L{NoneType}
49
@ivar remoteRouteName: The name of the route which will be added to all
50
boxes sent to this sender. If C{None}, no route will be added.
52
implements(IBoxSender)
54
def connectTo(self, remoteRouteName):
56
Set the name of the route which will be added to outgoing boxes.
58
self.remoteRouteName = remoteRouteName
59
# This route must not be started before its router is started. If
60
# sender is None, then the router is not started. When the router is
61
# started, it will start this route.
62
if self.router._sender is not None:
68
Remove the association between this route and its router.
70
del self.router._routes[self.localRouteName]
75
Associate this object with a receiver as its L{IBoxSender}.
77
self.receiver.startReceivingBoxes(self)
80
def stop(self, reason):
82
Shut down the underlying receiver.
84
self.receiver.stopReceivingBoxes(reason)
87
def sendBox(self, box):
89
Add the route and send the box.
91
if self.remoteRouteName is _unspecified:
92
raise RouteNotConnected()
93
if self.remoteRouteName is not None:
94
box[_ROUTE] = self.remoteRouteName.encode('ascii')
95
self.router._sender.sendBox(box)
98
def unhandledError(self, failure):
100
Pass failures through to the wrapped L{IBoxSender} without
103
self.router._sender.unhandledError(failure)
109
An L{IBoxReceiver} implementation which demultiplexes boxes from an AMP
110
connection being used with zero, one, or more routes.
112
@ivar _sender: An L{IBoxSender} provider which is used to allow
113
L{IBoxReceiver}s added to this router to send boxes.
115
@ivar _unstarted: A C{dict} similar to C{_routes} set before
116
C{startReceivingBoxes} is called and containing all routes which have
117
been added but not yet started. These are started and moved to the
118
C{_routes} dict when the router is started.
120
@ivar _routes: A C{dict} mapping local route identifiers to
121
L{IBoxReceivers} associated with them. This is only initialized after
122
C{startReceivingBoxes} is called.
124
@ivar _routeCounter: A L{itertools.count} instance used to generate unique
125
identifiers for routes in this router.
127
implements(IBoxReceiver)
133
self._routeCounter = count()
137
def createRouteIdentifier(self):
139
Return a route identifier which is not yet associated with a route on
144
return unicode(self._routeCounter.next())
147
def bindRoute(self, receiver, routeName=_unspecified):
149
Create a new route to associate the given route name with the given
152
@type routeName: C{unicode} or L{NoneType}
153
@param routeName: The identifier for the newly created route. If
154
C{None}, boxes with no route in them will be delivered to this
159
if routeName is _unspecified:
160
routeName = self.createRouteIdentifier()
161
# self._sender may yet be None; if so, this route goes into _unstarted
162
# and will have its sender set correctly in startReceivingBoxes below.
163
route = Route(self, receiver, routeName)
164
mapping = self._routes
166
mapping = self._unstarted
167
mapping[routeName] = route
171
def startReceivingBoxes(self, sender):
173
Initialize route tracking objects.
175
self._sender = sender
176
for routeName, route in self._unstarted.iteritems():
177
# Any route which has been bound but which does not yet have a
178
# remote route name should not yet be started. These will be
179
# started in Route.connectTo.
180
if route.remoteRouteName is not _unspecified:
182
self._routes = self._unstarted
183
self._unstarted = None
186
def ampBoxReceived(self, box):
188
Dispatch the given box to the L{IBoxReceiver} associated with the route
189
indicated by the box, or handle it directly if there is no route.
191
route = box.pop(_ROUTE, None)
192
self._routes[route].receiver.ampBoxReceived(box)
195
def stopReceivingBoxes(self, reason):
197
Stop all the L{IBoxReceiver}s which have been added to this router.
199
for routeName, route in self._routes.iteritems():
205
__all__ = ['Router', 'Route']