72
71
class MatchMakerBase(object):
73
"""Match Maker Base Class."""
72
"""Match Maker Base Class.
74
Build off HeartbeatMatchMakerBase if building a heartbeat-capable
75
77
def __init__(self):
76
78
# Array of tuples. Index [2] toggles negation, [3] is last-if-true
81
self.no_heartbeat_msg = _('Matchmaker does not implement '
82
'registration or heartbeat.')
84
def register(self, key, host):
85
"""Register a host on a backend.
87
Heartbeats, if applicable, may keepalive registration.
91
def ack_alive(self, key, host):
92
"""Acknowledge that a key.host is alive.
94
Used internally for updating heartbeats, but may also be used
95
publically to acknowledge a system is alive (i.e. rpc message
96
successfully sent to host)
100
def is_alive(self, topic, host):
101
"""Checks if a host is alive."""
104
def expire(self, topic, host):
105
"""Explicitly expire a host's registration."""
108
def send_heartbeats(self):
109
"""Send all heartbeats.
111
Use start_heartbeat to spawn a heartbeat greenthread,
112
which loops this method.
116
def unregister(self, key, host):
117
"""Unregister a topic."""
120
def start_heartbeat(self):
121
"""Spawn heartbeat greenthread."""
124
def stop_heartbeat(self):
125
"""Destroys the heartbeat greenthread."""
79
128
def add_binding(self, binding, rule, last=True):
80
129
self.bindings.append((binding, rule, False, last))
151
class HeartbeatMatchMakerBase(MatchMakerBase):
152
"""Base for a heart-beat capable MatchMaker.
154
Provides common methods for registering, unregistering, and maintaining
162
super(HeartbeatMatchMakerBase, self).__init__()
164
def send_heartbeats(self):
165
"""Send all heartbeats.
167
Use start_heartbeat to spawn a heartbeat greenthread,
168
which loops this method.
170
for key, host in self.host_topic:
171
self.ack_alive(key, host)
173
def ack_alive(self, key, host):
174
"""Acknowledge that a host.topic is alive.
176
Used internally for updating heartbeats, but may also be used
177
publically to acknowledge a system is alive (i.e. rpc message
178
successfully sent to host)
180
raise NotImplementedError("Must implement ack_alive")
182
def backend_register(self, key, host):
183
"""Implements registration logic.
185
Called by register(self,key,host)
187
raise NotImplementedError("Must implement backend_register")
189
def backend_unregister(self, key, key_host):
190
"""Implements de-registration logic.
192
Called by unregister(self,key,host)
194
raise NotImplementedError("Must implement backend_unregister")
196
def register(self, key, host):
197
"""Register a host on a backend.
199
Heartbeats, if applicable, may keepalive registration.
202
self.host_topic[(key, host)] = host
203
key_host = '.'.join((key, host))
205
self.backend_register(key, key_host)
207
self.ack_alive(key, host)
209
def unregister(self, key, host):
210
"""Unregister a topic."""
211
if (key, host) in self.host_topic:
212
del self.host_topic[(key, host)]
214
self.hosts.discard(host)
215
self.backend_unregister(key, '.'.join((key, host)))
217
LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"),
218
{'key': key, 'host': host})
220
def start_heartbeat(self):
221
"""Implementation of MatchMakerBase.start_heartbeat.
223
Launches greenthread looping send_heartbeats(),
224
yielding for CONF.matchmaker_heartbeat_freq seconds
228
raise MatchMakerException(
229
_("Register before starting heartbeat."))
233
self.send_heartbeats()
234
eventlet.sleep(CONF.matchmaker_heartbeat_freq)
236
self._heart = eventlet.spawn(do_heartbeat)
238
def stop_heartbeat(self):
239
"""Destroys the heartbeat greenthread."""
102
244
class DirectBinding(Binding):
104
Specifies a host in the key via a '.' character
245
"""Specifies a host in the key via a '.' character.
105
247
Although dots are used in the key, the behavior here is
106
248
that it maps directly to a host, thus direct.
139
281
return [(key, None)]
142
class RingExchange(Exchange):
144
Match Maker where hosts are loaded from a static file containing
145
a hashmap (JSON formatted).
147
__init__ takes optional ring dictionary argument, otherwise
148
loads the ringfile from CONF.mathcmaker_ringfile.
150
def __init__(self, ring=None):
151
super(RingExchange, self).__init__()
156
fh = open(CONF.matchmaker_ringfile, 'r')
157
self.ring = json.load(fh)
161
for k in self.ring.keys():
162
self.ring0[k] = itertools.cycle(self.ring[k])
164
def _ring_has(self, key):
165
if key in self.ring0:
170
class RoundRobinRingExchange(RingExchange):
171
"""A Topic Exchange based on a hashmap."""
172
def __init__(self, ring=None):
173
super(RoundRobinRingExchange, self).__init__(ring)
176
if not self._ring_has(key):
178
_("No key defining hosts for topic '%s', "
179
"see ringfile") % (key, )
182
host = next(self.ring0[key])
183
return [(key + '.' + host, host)]
186
class FanoutRingExchange(RingExchange):
187
"""Fanout Exchange based on a hashmap."""
188
def __init__(self, ring=None):
189
super(FanoutRingExchange, self).__init__(ring)
192
# Assume starts with "fanout~", strip it for lookup.
193
nkey = key.split('fanout~')[1:][0]
194
if not self._ring_has(nkey):
196
_("No key defining hosts for topic '%s', "
197
"see ringfile") % (nkey, )
200
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
203
284
class LocalhostExchange(Exchange):
204
285
"""Exchange where all direct topics are local."""
286
def __init__(self, host='localhost'):
206
288
super(Exchange, self).__init__()
208
290
def run(self, key):
209
return [(key.split('.')[0] + '.localhost', 'localhost')]
291
return [('.'.join((key.split('.')[0], self.host)), self.host)]
212
294
class DirectExchange(Exchange):
214
Exchange where all topic keys are split, sending to second half.
215
i.e. "compute.host" sends a message to "compute" running on "host"
295
"""Exchange where all topic keys are split, sending to second half.
297
i.e. "compute.host" sends a message to "compute.host" running on "host"
217
299
def __init__(self):
218
300
super(Exchange, self).__init__()
220
302
def run(self, key):
221
b, e = key.split('.', 1)
225
class MatchMakerRing(MatchMakerBase):
227
Match Maker where hosts are loaded from a static hashmap.
229
def __init__(self, ring=None):
230
super(MatchMakerRing, self).__init__()
231
self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
232
self.add_binding(DirectBinding(), DirectExchange())
233
self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
303
e = key.split('.', 1)[1]
236
307
class MatchMakerLocalhost(MatchMakerBase):
238
Match Maker where all bare topics resolve to localhost.
308
"""Match Maker where all bare topics resolve to localhost.
239
310
Useful for testing.
312
def __init__(self, host='localhost'):
242
313
super(MatchMakerLocalhost, self).__init__()
243
self.add_binding(FanoutBinding(), LocalhostExchange())
314
self.add_binding(FanoutBinding(), LocalhostExchange(host))
244
315
self.add_binding(DirectBinding(), DirectExchange())
245
self.add_binding(TopicBinding(), LocalhostExchange())
316
self.add_binding(TopicBinding(), LocalhostExchange(host))
248
319
class MatchMakerStub(MatchMakerBase):
250
Match Maker where topics are untouched.
320
"""Match Maker where topics are untouched.
251
322
Useful for testing, or for AMQP/brokered queues.
252
323
Will not work where knowledge of hosts is known (i.e. zeromq)
254
325
def __init__(self):
255
super(MatchMakerLocalhost, self).__init__()
326
super(MatchMakerStub, self).__init__()
257
328
self.add_binding(FanoutBinding(), StubExchange())
258
329
self.add_binding(DirectBinding(), StubExchange())