~ubuntu-branches/ubuntu/trusty/heat/trusty-security

« back to all changes in this revision

Viewing changes to heat/openstack/common/rpc/matchmaker.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Yolanda Robla, Chuck Short
  • Date: 2013-07-22 16:22:29 UTC
  • mfrom: (1.1.2)
  • Revision ID: package-import@ubuntu.com-20130722162229-zzvfu40id94ii0hc
Tags: 2013.2~b2-0ubuntu1
[ Yolanda Robla ]
* debian/tests: added autopkg tests

[ Chuck Short ]
* New upstream release
* debian/control:
  - Add python-pbr to build-depends.
  - Add python-d2to to build-depends.
  - Dropped python-argparse.
  - Add python-six to build-depends.
  - Dropped python-sendfile.
  - Dropped python-nose.
  - Added testrepository.
  - Added python-testtools.
* debian/rules: Run testrepository instead of nosetets.
* debian/patches/removes-lxml-version-limitation-from-pip-requires.patch: Dropped
  no longer needed.
* debian/patches/fix-package-version-detection-when-building-doc.patch: Dropped
  no longer needed.

Show diffs side-by-side

added added

removed removed

Lines of Context:
19
19
"""
20
20
 
21
21
import contextlib
22
 
import itertools
23
 
import json
24
22
 
 
23
import eventlet
25
24
from oslo.config import cfg
26
25
 
27
26
from heat.openstack.common.gettextutils import _
29
28
 
30
29
 
31
30
matchmaker_opts = [
32
 
    # Matchmaker ring file
33
 
    cfg.StrOpt('matchmaker_ringfile',
34
 
               default='/etc/nova/matchmaker_ring.json',
35
 
               help='Matchmaker ring file (JSON)'),
 
31
    cfg.IntOpt('matchmaker_heartbeat_freq',
 
32
               default=300,
 
33
               help='Heartbeat frequency'),
 
34
    cfg.IntOpt('matchmaker_heartbeat_ttl',
 
35
               default=600,
 
36
               help='Heartbeat time-to-live.'),
36
37
]
37
38
 
38
39
CONF = cfg.CONF
47
48
 
48
49
 
49
50
class Exchange(object):
50
 
    """
51
 
    Implements lookups.
 
51
    """Implements lookups.
 
52
 
52
53
    Subclass this to support hashtables, dns, etc.
53
54
    """
54
55
    def __init__(self):
59
60
 
60
61
 
61
62
class Binding(object):
62
 
    """
63
 
    A binding on which to perform a lookup.
64
 
    """
 
63
    """A binding on which to perform a lookup."""
65
64
    def __init__(self):
66
65
        pass
67
66
 
70
69
 
71
70
 
72
71
class MatchMakerBase(object):
73
 
    """Match Maker Base Class."""
 
72
    """Match Maker Base Class.
74
73
 
 
74
    Build off HeartbeatMatchMakerBase if building a heartbeat-capable
 
75
    MatchMaker.
 
76
    """
75
77
    def __init__(self):
76
78
        # Array of tuples. Index [2] toggles negation, [3] is last-if-true
77
79
        self.bindings = []
78
80
 
 
81
        self.no_heartbeat_msg = _('Matchmaker does not implement '
 
82
                                  'registration or heartbeat.')
 
83
 
 
84
    def register(self, key, host):
 
85
        """Register a host on a backend.
 
86
 
 
87
        Heartbeats, if applicable, may keepalive registration.
 
88
        """
 
89
        pass
 
90
 
 
91
    def ack_alive(self, key, host):
 
92
        """Acknowledge that a key.host is alive.
 
93
 
 
94
        Used internally for updating heartbeats, but may also be used
 
95
        publically to acknowledge a system is alive (i.e. rpc message
 
96
        successfully sent to host)
 
97
        """
 
98
        pass
 
99
 
 
100
    def is_alive(self, topic, host):
 
101
        """Checks if a host is alive."""
 
102
        pass
 
103
 
 
104
    def expire(self, topic, host):
 
105
        """Explicitly expire a host's registration."""
 
106
        pass
 
107
 
 
108
    def send_heartbeats(self):
 
109
        """Send all heartbeats.
 
110
 
 
111
        Use start_heartbeat to spawn a heartbeat greenthread,
 
112
        which loops this method.
 
113
        """
 
114
        pass
 
115
 
 
116
    def unregister(self, key, host):
 
117
        """Unregister a topic."""
 
118
        pass
 
119
 
 
120
    def start_heartbeat(self):
 
121
        """Spawn heartbeat greenthread."""
 
122
        pass
 
123
 
 
124
    def stop_heartbeat(self):
 
125
        """Destroys the heartbeat greenthread."""
 
126
        pass
 
127
 
79
128
    def add_binding(self, binding, rule, last=True):
80
129
        self.bindings.append((binding, rule, False, last))
81
130
 
99
148
        return workers
100
149
 
101
150
 
 
151
class HeartbeatMatchMakerBase(MatchMakerBase):
 
152
    """Base for a heart-beat capable MatchMaker.
 
153
 
 
154
    Provides common methods for registering, unregistering, and maintaining
 
155
    heartbeats.
 
156
    """
 
157
    def __init__(self):
 
158
        self.hosts = set()
 
159
        self._heart = None
 
160
        self.host_topic = {}
 
161
 
 
162
        super(HeartbeatMatchMakerBase, self).__init__()
 
163
 
 
164
    def send_heartbeats(self):
 
165
        """Send all heartbeats.
 
166
 
 
167
        Use start_heartbeat to spawn a heartbeat greenthread,
 
168
        which loops this method.
 
169
        """
 
170
        for key, host in self.host_topic:
 
171
            self.ack_alive(key, host)
 
172
 
 
173
    def ack_alive(self, key, host):
 
174
        """Acknowledge that a host.topic is alive.
 
175
 
 
176
        Used internally for updating heartbeats, but may also be used
 
177
        publically to acknowledge a system is alive (i.e. rpc message
 
178
        successfully sent to host)
 
179
        """
 
180
        raise NotImplementedError("Must implement ack_alive")
 
181
 
 
182
    def backend_register(self, key, host):
 
183
        """Implements registration logic.
 
184
 
 
185
        Called by register(self,key,host)
 
186
        """
 
187
        raise NotImplementedError("Must implement backend_register")
 
188
 
 
189
    def backend_unregister(self, key, key_host):
 
190
        """Implements de-registration logic.
 
191
 
 
192
        Called by unregister(self,key,host)
 
193
        """
 
194
        raise NotImplementedError("Must implement backend_unregister")
 
195
 
 
196
    def register(self, key, host):
 
197
        """Register a host on a backend.
 
198
 
 
199
        Heartbeats, if applicable, may keepalive registration.
 
200
        """
 
201
        self.hosts.add(host)
 
202
        self.host_topic[(key, host)] = host
 
203
        key_host = '.'.join((key, host))
 
204
 
 
205
        self.backend_register(key, key_host)
 
206
 
 
207
        self.ack_alive(key, host)
 
208
 
 
209
    def unregister(self, key, host):
 
210
        """Unregister a topic."""
 
211
        if (key, host) in self.host_topic:
 
212
            del self.host_topic[(key, host)]
 
213
 
 
214
        self.hosts.discard(host)
 
215
        self.backend_unregister(key, '.'.join((key, host)))
 
216
 
 
217
        LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"),
 
218
                 {'key': key, 'host': host})
 
219
 
 
220
    def start_heartbeat(self):
 
221
        """Implementation of MatchMakerBase.start_heartbeat.
 
222
 
 
223
        Launches greenthread looping send_heartbeats(),
 
224
        yielding for CONF.matchmaker_heartbeat_freq seconds
 
225
        between iterations.
 
226
        """
 
227
        if not self.hosts:
 
228
            raise MatchMakerException(
 
229
                _("Register before starting heartbeat."))
 
230
 
 
231
        def do_heartbeat():
 
232
            while True:
 
233
                self.send_heartbeats()
 
234
                eventlet.sleep(CONF.matchmaker_heartbeat_freq)
 
235
 
 
236
        self._heart = eventlet.spawn(do_heartbeat)
 
237
 
 
238
    def stop_heartbeat(self):
 
239
        """Destroys the heartbeat greenthread."""
 
240
        if self._heart:
 
241
            self._heart.kill()
 
242
 
 
243
 
102
244
class DirectBinding(Binding):
103
 
    """
104
 
    Specifies a host in the key via a '.' character
 
245
    """Specifies a host in the key via a '.' character.
 
246
 
105
247
    Although dots are used in the key, the behavior here is
106
248
    that it maps directly to a host, thus direct.
107
249
    """
112
254
 
113
255
 
114
256
class TopicBinding(Binding):
115
 
    """
116
 
    Where a 'bare' key without dots.
 
257
    """Where a 'bare' key without dots.
 
258
 
117
259
    AMQP generally considers topic exchanges to be those *with* dots,
118
260
    but we deviate here in terminology as the behavior here matches
119
261
    that of a topic exchange (whereas where there are dots, behavior
139
281
        return [(key, None)]
140
282
 
141
283
 
142
 
class RingExchange(Exchange):
143
 
    """
144
 
    Match Maker where hosts are loaded from a static file containing
145
 
    a hashmap (JSON formatted).
146
 
 
147
 
    __init__ takes optional ring dictionary argument, otherwise
148
 
    loads the ringfile from CONF.mathcmaker_ringfile.
149
 
    """
150
 
    def __init__(self, ring=None):
151
 
        super(RingExchange, self).__init__()
152
 
 
153
 
        if ring:
154
 
            self.ring = ring
155
 
        else:
156
 
            fh = open(CONF.matchmaker_ringfile, 'r')
157
 
            self.ring = json.load(fh)
158
 
            fh.close()
159
 
 
160
 
        self.ring0 = {}
161
 
        for k in self.ring.keys():
162
 
            self.ring0[k] = itertools.cycle(self.ring[k])
163
 
 
164
 
    def _ring_has(self, key):
165
 
        if key in self.ring0:
166
 
            return True
167
 
        return False
168
 
 
169
 
 
170
 
class RoundRobinRingExchange(RingExchange):
171
 
    """A Topic Exchange based on a hashmap."""
172
 
    def __init__(self, ring=None):
173
 
        super(RoundRobinRingExchange, self).__init__(ring)
174
 
 
175
 
    def run(self, key):
176
 
        if not self._ring_has(key):
177
 
            LOG.warn(
178
 
                _("No key defining hosts for topic '%s', "
179
 
                  "see ringfile") % (key, )
180
 
            )
181
 
            return []
182
 
        host = next(self.ring0[key])
183
 
        return [(key + '.' + host, host)]
184
 
 
185
 
 
186
 
class FanoutRingExchange(RingExchange):
187
 
    """Fanout Exchange based on a hashmap."""
188
 
    def __init__(self, ring=None):
189
 
        super(FanoutRingExchange, self).__init__(ring)
190
 
 
191
 
    def run(self, key):
192
 
        # Assume starts with "fanout~", strip it for lookup.
193
 
        nkey = key.split('fanout~')[1:][0]
194
 
        if not self._ring_has(nkey):
195
 
            LOG.warn(
196
 
                _("No key defining hosts for topic '%s', "
197
 
                  "see ringfile") % (nkey, )
198
 
            )
199
 
            return []
200
 
        return map(lambda x: (key + '.' + x, x), self.ring[nkey])
201
 
 
202
 
 
203
284
class LocalhostExchange(Exchange):
204
285
    """Exchange where all direct topics are local."""
205
 
    def __init__(self):
 
286
    def __init__(self, host='localhost'):
 
287
        self.host = host
206
288
        super(Exchange, self).__init__()
207
289
 
208
290
    def run(self, key):
209
 
        return [(key.split('.')[0] + '.localhost', 'localhost')]
 
291
        return [('.'.join((key.split('.')[0], self.host)), self.host)]
210
292
 
211
293
 
212
294
class DirectExchange(Exchange):
213
 
    """
214
 
    Exchange where all topic keys are split, sending to second half.
215
 
    i.e. "compute.host" sends a message to "compute" running on "host"
 
295
    """Exchange where all topic keys are split, sending to second half.
 
296
 
 
297
    i.e. "compute.host" sends a message to "compute.host" running on "host"
216
298
    """
217
299
    def __init__(self):
218
300
        super(Exchange, self).__init__()
219
301
 
220
302
    def run(self, key):
221
 
        b, e = key.split('.', 1)
222
 
        return [(b, e)]
223
 
 
224
 
 
225
 
class MatchMakerRing(MatchMakerBase):
226
 
    """
227
 
    Match Maker where hosts are loaded from a static hashmap.
228
 
    """
229
 
    def __init__(self, ring=None):
230
 
        super(MatchMakerRing, self).__init__()
231
 
        self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
232
 
        self.add_binding(DirectBinding(), DirectExchange())
233
 
        self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
 
303
        e = key.split('.', 1)[1]
 
304
        return [(key, e)]
234
305
 
235
306
 
236
307
class MatchMakerLocalhost(MatchMakerBase):
237
 
    """
238
 
    Match Maker where all bare topics resolve to localhost.
 
308
    """Match Maker where all bare topics resolve to localhost.
 
309
 
239
310
    Useful for testing.
240
311
    """
241
 
    def __init__(self):
 
312
    def __init__(self, host='localhost'):
242
313
        super(MatchMakerLocalhost, self).__init__()
243
 
        self.add_binding(FanoutBinding(), LocalhostExchange())
 
314
        self.add_binding(FanoutBinding(), LocalhostExchange(host))
244
315
        self.add_binding(DirectBinding(), DirectExchange())
245
 
        self.add_binding(TopicBinding(), LocalhostExchange())
 
316
        self.add_binding(TopicBinding(), LocalhostExchange(host))
246
317
 
247
318
 
248
319
class MatchMakerStub(MatchMakerBase):
249
 
    """
250
 
    Match Maker where topics are untouched.
 
320
    """Match Maker where topics are untouched.
 
321
 
251
322
    Useful for testing, or for AMQP/brokered queues.
252
323
    Will not work where knowledge of hosts is known (i.e. zeromq)
253
324
    """
254
325
    def __init__(self):
255
 
        super(MatchMakerLocalhost, self).__init__()
 
326
        super(MatchMakerStub, self).__init__()
256
327
 
257
328
        self.add_binding(FanoutBinding(), StubExchange())
258
329
        self.add_binding(DirectBinding(), StubExchange())