~ubuntu-branches/ubuntu/vivid/horizon/vivid-proposed

« back to all changes in this revision

Viewing changes to openstack_dashboard/openstack/common/rpc/matchmaker_redis.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2014-06-17 09:17:40 UTC
  • mfrom: (1.1.41)
  • Revision ID: package-import@ubuntu.com-20140617091740-g73ekvtkxum68uyv
Tags: 1:2014.2~b1-0ubuntu1
* New upstream release.
* debian/control: Open up juno release

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
 
 
3
 
#    Copyright 2013 Cloudscaling Group, Inc
4
 
#
5
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
6
 
#    not use this file except in compliance with the License. You may obtain
7
 
#    a copy of the License at
8
 
#
9
 
#         http://www.apache.org/licenses/LICENSE-2.0
10
 
#
11
 
#    Unless required by applicable law or agreed to in writing, software
12
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
 
#    License for the specific language governing permissions and limitations
15
 
#    under the License.
16
 
"""
17
 
The MatchMaker classes should accept a Topic or Fanout exchange key and
18
 
return keys for direct exchanges, per (approximate) AMQP parlance.
19
 
"""
20
 
 
21
 
from oslo.config import cfg
22
 
 
23
 
from openstack_dashboard.openstack.common import importutils
24
 
from openstack_dashboard.openstack.common import log as logging
25
 
from openstack_dashboard.openstack.common.rpc import matchmaker as mm_common
26
 
 
27
 
redis = importutils.try_import('redis')
28
 
 
29
 
 
30
 
matchmaker_redis_opts = [
31
 
    cfg.StrOpt('host',
32
 
               default='127.0.0.1',
33
 
               help='Host to locate redis'),
34
 
    cfg.IntOpt('port',
35
 
               default=6379,
36
 
               help='Use this port to connect to redis host.'),
37
 
    cfg.StrOpt('password',
38
 
               default=None,
39
 
               help='Password for Redis server. (optional)'),
40
 
]
41
 
 
42
 
CONF = cfg.CONF
43
 
opt_group = cfg.OptGroup(name='matchmaker_redis',
44
 
                         title='Options for Redis-based MatchMaker')
45
 
CONF.register_group(opt_group)
46
 
CONF.register_opts(matchmaker_redis_opts, opt_group)
47
 
LOG = logging.getLogger(__name__)
48
 
 
49
 
 
50
 
class RedisExchange(mm_common.Exchange):
51
 
    def __init__(self, matchmaker):
52
 
        self.matchmaker = matchmaker
53
 
        self.redis = matchmaker.redis
54
 
        super(RedisExchange, self).__init__()
55
 
 
56
 
 
57
 
class RedisTopicExchange(RedisExchange):
58
 
    """
59
 
    Exchange where all topic keys are split, sending to second half.
60
 
    i.e. "compute.host" sends a message to "compute" running on "host"
61
 
    """
62
 
    def run(self, topic):
63
 
        while True:
64
 
            member_name = self.redis.srandmember(topic)
65
 
 
66
 
            if not member_name:
67
 
                # If this happens, there are no
68
 
                # longer any members.
69
 
                break
70
 
 
71
 
            if not self.matchmaker.is_alive(topic, member_name):
72
 
                continue
73
 
 
74
 
            host = member_name.split('.', 1)[1]
75
 
            return [(member_name, host)]
76
 
        return []
77
 
 
78
 
 
79
 
class RedisFanoutExchange(RedisExchange):
80
 
    """
81
 
    Return a list of all hosts.
82
 
    """
83
 
    def run(self, topic):
84
 
        topic = topic.split('~', 1)[1]
85
 
        hosts = self.redis.smembers(topic)
86
 
        good_hosts = filter(
87
 
            lambda host: self.matchmaker.is_alive(topic, host), hosts)
88
 
 
89
 
        return [(x, x.split('.', 1)[1]) for x in good_hosts]
90
 
 
91
 
 
92
 
class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
93
 
    """
94
 
    MatchMaker registering and looking-up hosts with a Redis server.
95
 
    """
96
 
    def __init__(self):
97
 
        super(MatchMakerRedis, self).__init__()
98
 
 
99
 
        if not redis:
100
 
            raise ImportError("Failed to import module redis.")
101
 
 
102
 
        self.redis = redis.StrictRedis(
103
 
            host=CONF.matchmaker_redis.host,
104
 
            port=CONF.matchmaker_redis.port,
105
 
            password=CONF.matchmaker_redis.password)
106
 
 
107
 
        self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self))
108
 
        self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange())
109
 
        self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self))
110
 
 
111
 
    def ack_alive(self, key, host):
112
 
        topic = "%s.%s" % (key, host)
113
 
        if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl):
114
 
            # If we could not update the expiration, the key
115
 
            # might have been pruned. Re-register, creating a new
116
 
            # key in Redis.
117
 
            self.register(self.topic_host[host], host)
118
 
 
119
 
    def is_alive(self, topic, host):
120
 
        if self.redis.ttl(host) == -1:
121
 
            self.expire(topic, host)
122
 
            return False
123
 
        return True
124
 
 
125
 
    def expire(self, topic, host):
126
 
        with self.redis.pipeline() as pipe:
127
 
            pipe.multi()
128
 
            pipe.delete(host)
129
 
            pipe.srem(topic, host)
130
 
            pipe.execute()
131
 
 
132
 
    def backend_register(self, key, key_host):
133
 
        with self.redis.pipeline() as pipe:
134
 
            pipe.multi()
135
 
            pipe.sadd(key, key_host)
136
 
 
137
 
            # No value is needed, we just
138
 
            # care if it exists. Sets aren't viable
139
 
            # because only keys can expire.
140
 
            pipe.set(key_host, '')
141
 
 
142
 
            pipe.execute()
143
 
 
144
 
    def backend_unregister(self, key, key_host):
145
 
        with self.redis.pipeline() as pipe:
146
 
            pipe.multi()
147
 
            pipe.srem(key, key_host)
148
 
            pipe.delete(key_host)
149
 
            pipe.execute()