1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2013 Cloudscaling Group, Inc
5
# Licensed under the Apache License, Version 2.0 (the "License"); you may
6
# not use this file except in compliance with the License. You may obtain
7
# a copy of the License at
9
# http://www.apache.org/licenses/LICENSE-2.0
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
# License for the specific language governing permissions and limitations
17
The MatchMaker classes should accept a Topic or Fanout exchange key and
18
return keys for direct exchanges, per (approximate) AMQP parlance.
21
from oslo.config import cfg
23
from openstack_dashboard.openstack.common import importutils
24
from openstack_dashboard.openstack.common import log as logging
25
from openstack_dashboard.openstack.common.rpc import matchmaker as mm_common
27
redis = importutils.try_import('redis')
30
matchmaker_redis_opts = [
33
help='Host to locate redis'),
36
help='Use this port to connect to redis host.'),
37
cfg.StrOpt('password',
39
help='Password for Redis server. (optional)'),
43
opt_group = cfg.OptGroup(name='matchmaker_redis',
44
title='Options for Redis-based MatchMaker')
45
CONF.register_group(opt_group)
46
CONF.register_opts(matchmaker_redis_opts, opt_group)
47
LOG = logging.getLogger(__name__)
50
class RedisExchange(mm_common.Exchange):
51
def __init__(self, matchmaker):
52
self.matchmaker = matchmaker
53
self.redis = matchmaker.redis
54
super(RedisExchange, self).__init__()
57
class RedisTopicExchange(RedisExchange):
59
Exchange where all topic keys are split, sending to second half.
60
i.e. "compute.host" sends a message to "compute" running on "host"
64
member_name = self.redis.srandmember(topic)
67
# If this happens, there are no
71
if not self.matchmaker.is_alive(topic, member_name):
74
host = member_name.split('.', 1)[1]
75
return [(member_name, host)]
79
class RedisFanoutExchange(RedisExchange):
81
Return a list of all hosts.
84
topic = topic.split('~', 1)[1]
85
hosts = self.redis.smembers(topic)
87
lambda host: self.matchmaker.is_alive(topic, host), hosts)
89
return [(x, x.split('.', 1)[1]) for x in good_hosts]
92
class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
94
MatchMaker registering and looking-up hosts with a Redis server.
97
super(MatchMakerRedis, self).__init__()
100
raise ImportError("Failed to import module redis.")
102
self.redis = redis.StrictRedis(
103
host=CONF.matchmaker_redis.host,
104
port=CONF.matchmaker_redis.port,
105
password=CONF.matchmaker_redis.password)
107
self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self))
108
self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange())
109
self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self))
111
def ack_alive(self, key, host):
112
topic = "%s.%s" % (key, host)
113
if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl):
114
# If we could not update the expiration, the key
115
# might have been pruned. Re-register, creating a new
117
self.register(self.topic_host[host], host)
119
def is_alive(self, topic, host):
120
if self.redis.ttl(host) == -1:
121
self.expire(topic, host)
125
def expire(self, topic, host):
126
with self.redis.pipeline() as pipe:
129
pipe.srem(topic, host)
132
def backend_register(self, key, key_host):
133
with self.redis.pipeline() as pipe:
135
pipe.sadd(key, key_host)
137
# No value is needed, we just
138
# care if it exists. Sets aren't viable
139
# because only keys can expire.
140
pipe.set(key_host, '')
144
def backend_unregister(self, key, key_host):
145
with self.redis.pipeline() as pipe:
147
pipe.srem(key, key_host)
148
pipe.delete(key_host)