~ubuntu-branches/ubuntu/saucy/nova/saucy-proposed

« back to all changes in this revision

Viewing changes to nova/rpc/impl_carrot.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2012-05-24 13:12:53 UTC
  • mfrom: (1.1.55)
  • Revision ID: package-import@ubuntu.com-20120524131253-ommql08fg1en06ut
Tags: 2012.2~f1-0ubuntu1
* New upstream release.
* Prepare for quantal:
  - Dropped debian/patches/upstream/0006-Use-project_id-in-ec2.cloud._format_image.patch
  - Dropped debian/patches/upstream/0005-Populate-image-properties-with-project_id-again.patch
  - Dropped debian/patches/upstream/0004-Fixed-bug-962840-added-a-test-case.patch
  - Dropped debian/patches/upstream/0003-Allow-unprivileged-RADOS-users-to-access-rbd-volumes.patch
  - Dropped debian/patches/upstream/0002-Stop-libvirt-test-from-deleting-instances-dir.patch
  - Dropped debian/patches/upstream/0001-fix-bug-where-nova-ignores-glance-host-in-imageref.patch 
  - Dropped debian/patches/0001-fix-useexisting-deprecation-warnings.patch
* debian/control: Add python-keystone as a dependency. (LP: #907197)
* debian/patches/kombu_tests_timeout.patch: Refreshed.
* debian/nova.conf, debian/nova-common.postinst: Convert to new ini
  file configuration
* debian/patches/nova-manage_flagfile_location.patch: Refreshed

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
 
 
3
 
# Copyright 2010 United States Government as represented by the
4
 
# Administrator of the National Aeronautics and Space Administration.
5
 
# All Rights Reserved.
6
 
#
7
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
8
 
#    not use this file except in compliance with the License. You may obtain
9
 
#    a copy of the License at
10
 
#
11
 
#         http://www.apache.org/licenses/LICENSE-2.0
12
 
#
13
 
#    Unless required by applicable law or agreed to in writing, software
14
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16
 
#    License for the specific language governing permissions and limitations
17
 
#    under the License.
18
 
 
19
 
"""AMQP-based RPC.
20
 
 
21
 
Queues have consumers and publishers.
22
 
 
23
 
No fan-out support yet.
24
 
 
25
 
"""
26
 
 
27
 
import inspect
28
 
import json
29
 
import sys
30
 
import time
31
 
import traceback
32
 
import uuid
33
 
 
34
 
from carrot import connection as carrot_connection
35
 
from carrot import messaging
36
 
import eventlet
37
 
from eventlet import greenpool
38
 
from eventlet import pools
39
 
from eventlet import queue
40
 
import greenlet
41
 
 
42
 
from nova import context
43
 
from nova import exception
44
 
from nova import flags
45
 
from nova import local
46
 
from nova import log as logging
47
 
from nova.rpc import common as rpc_common
48
 
from nova.testing import fake
49
 
from nova import utils
50
 
 
51
 
FLAGS = flags.FLAGS
52
 
LOG = logging.getLogger(__name__)
53
 
 
54
 
 
55
 
@utils.deprecated('Use of carrot will be removed in a future release. '
56
 
        'Use kombu, instead.')
57
 
class Connection(carrot_connection.BrokerConnection, rpc_common.Connection):
58
 
    """Connection instance object."""
59
 
 
60
 
    def __init__(self, *args, **kwargs):
61
 
        super(Connection, self).__init__(*args, **kwargs)
62
 
        self._rpc_consumers = []
63
 
        self._rpc_consumer_thread = None
64
 
 
65
 
    @classmethod
66
 
    def instance(cls, new=True):
67
 
        """Returns the instance."""
68
 
        if new or not hasattr(cls, '_instance'):
69
 
            params = dict(hostname=FLAGS.rabbit_host,
70
 
                          port=FLAGS.rabbit_port,
71
 
                          ssl=FLAGS.rabbit_use_ssl,
72
 
                          userid=FLAGS.rabbit_userid,
73
 
                          password=FLAGS.rabbit_password,
74
 
                          virtual_host=FLAGS.rabbit_virtual_host)
75
 
 
76
 
            if FLAGS.fake_rabbit:
77
 
                params['backend_cls'] = fake.rabbit.Backend
78
 
 
79
 
            # NOTE(vish): magic is fun!
80
 
            # pylint: disable=W0142
81
 
            if new:
82
 
                return cls(**params)
83
 
            else:
84
 
                cls._instance = cls(**params)
85
 
        return cls._instance
86
 
 
87
 
    @classmethod
88
 
    def recreate(cls):
89
 
        """Recreates the connection instance.
90
 
 
91
 
        This is necessary to recover from some network errors/disconnects.
92
 
 
93
 
        """
94
 
        try:
95
 
            del cls._instance
96
 
        except AttributeError, e:
97
 
            # The _instance stuff is for testing purposes. Usually we don't use
98
 
            # it. So don't freak out if it doesn't exist.
99
 
            pass
100
 
        return cls.instance()
101
 
 
102
 
    def close(self):
103
 
        self.cancel_consumer_thread()
104
 
        for consumer in self._rpc_consumers:
105
 
            try:
106
 
                consumer.close()
107
 
            except Exception:
108
 
                # ignore all errors
109
 
                pass
110
 
        self._rpc_consumers = []
111
 
        carrot_connection.BrokerConnection.close(self)
112
 
 
113
 
    def consume_in_thread(self):
114
 
        """Consumer from all queues/consumers in a greenthread"""
115
 
 
116
 
        consumer_set = ConsumerSet(connection=self,
117
 
                consumer_list=self._rpc_consumers)
118
 
 
119
 
        def _consumer_thread():
120
 
            try:
121
 
                consumer_set.wait()
122
 
            except greenlet.GreenletExit:
123
 
                return
124
 
        if self._rpc_consumer_thread is None:
125
 
            self._rpc_consumer_thread = eventlet.spawn(_consumer_thread)
126
 
        return self._rpc_consumer_thread
127
 
 
128
 
    def cancel_consumer_thread(self):
129
 
        """Cancel a consumer thread"""
130
 
        if self._rpc_consumer_thread is not None:
131
 
            self._rpc_consumer_thread.kill()
132
 
            try:
133
 
                self._rpc_consumer_thread.wait()
134
 
            except greenlet.GreenletExit:
135
 
                pass
136
 
            self._rpc_consumer_thread = None
137
 
 
138
 
    def create_consumer(self, topic, proxy, fanout=False):
139
 
        """Create a consumer that calls methods in the proxy"""
140
 
        if fanout:
141
 
            consumer = FanoutAdapterConsumer(
142
 
                    connection=self,
143
 
                    topic=topic,
144
 
                    proxy=proxy)
145
 
        else:
146
 
            consumer = TopicAdapterConsumer(
147
 
                    connection=self,
148
 
                    topic=topic,
149
 
                    proxy=proxy)
150
 
        self._rpc_consumers.append(consumer)
151
 
 
152
 
 
153
 
class Pool(pools.Pool):
154
 
    """Class that implements a Pool of Connections."""
155
 
 
156
 
    # TODO(comstud): Timeout connections not used in a while
157
 
    def create(self):
158
 
        LOG.debug('Pool creating new connection')
159
 
        return Connection.instance(new=True)
160
 
 
161
 
# Create a ConnectionPool to use for RPC calls.  We'll order the
162
 
# pool as a stack (LIFO), so that we can potentially loop through and
163
 
# timeout old unused connections at some point
164
 
ConnectionPool = Pool(
165
 
        max_size=FLAGS.rpc_conn_pool_size,
166
 
        order_as_stack=True)
167
 
 
168
 
 
169
 
class Consumer(messaging.Consumer):
170
 
    """Consumer base class.
171
 
 
172
 
    Contains methods for connecting the fetch method to async loops.
173
 
 
174
 
    """
175
 
 
176
 
    def __init__(self, *args, **kwargs):
177
 
        max_retries = FLAGS.rabbit_max_retries
178
 
        sleep_time = FLAGS.rabbit_retry_interval
179
 
        tries = 0
180
 
        while True:
181
 
            tries += 1
182
 
            if tries > 1:
183
 
                time.sleep(sleep_time)
184
 
                # backoff for next retry attempt.. if there is one
185
 
                sleep_time += FLAGS.rabbit_retry_backoff
186
 
                if sleep_time > 30:
187
 
                    sleep_time = 30
188
 
            try:
189
 
                super(Consumer, self).__init__(*args, **kwargs)
190
 
                self.failed_connection = False
191
 
                break
192
 
            except Exception as e:  # Catching all because carrot sucks
193
 
                self.failed_connection = True
194
 
                if max_retries > 0 and tries == max_retries:
195
 
                    break
196
 
                fl_host = FLAGS.rabbit_host
197
 
                fl_port = FLAGS.rabbit_port
198
 
                fl_intv = sleep_time
199
 
                LOG.error(_('AMQP server on %(fl_host)s:%(fl_port)d is'
200
 
                            ' unreachable: %(e)s. Trying again in %(fl_intv)d'
201
 
                            ' seconds.') % locals())
202
 
        if self.failed_connection:
203
 
            LOG.error(_('Unable to connect to AMQP server '
204
 
                        'after %(tries)d tries. Shutting down.') % locals())
205
 
            sys.exit(1)
206
 
 
207
 
    def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
208
 
        """Wraps the parent fetch with some logic for failed connection."""
209
 
        # TODO(vish): the logic for failed connections and logging should be
210
 
        #             refactored into some sort of connection manager object
211
 
        try:
212
 
            if self.failed_connection:
213
 
                # NOTE(vish): connection is defined in the parent class, we can
214
 
                #             recreate it as long as we create the backend too
215
 
                # pylint: disable=W0201
216
 
                self.connection = Connection.recreate()
217
 
                self.backend = self.connection.create_backend()
218
 
                self.declare()
219
 
            return super(Consumer, self).fetch(no_ack,
220
 
                                               auto_ack,
221
 
                                               enable_callbacks)
222
 
            if self.failed_connection:
223
 
                LOG.error(_('Reconnected to queue'))
224
 
                self.failed_connection = False
225
 
        # NOTE(vish): This is catching all errors because we really don't
226
 
        #             want exceptions to be logged 10 times a second if some
227
 
        #             persistent failure occurs.
228
 
        except Exception, e:  # pylint: disable=W0703
229
 
            if not self.failed_connection:
230
 
                LOG.exception(_('Failed to fetch message from queue: %s') % e)
231
 
                self.failed_connection = True
232
 
 
233
 
 
234
 
class AdapterConsumer(Consumer):
235
 
    """Calls methods on a proxy object based on method and args."""
236
 
 
237
 
    def __init__(self, connection=None, topic='broadcast', proxy=None):
238
 
        LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
239
 
        self.proxy = proxy
240
 
        self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
241
 
        super(AdapterConsumer, self).__init__(connection=connection,
242
 
                                              topic=topic)
243
 
        self.register_callback(self.process_data)
244
 
 
245
 
    def process_data(self, message_data, message):
246
 
        """Consumer callback to call a method on a proxy object.
247
 
 
248
 
        Parses the message for validity and fires off a thread to call the
249
 
        proxy object method.
250
 
 
251
 
        Message data should be a dictionary with two keys:
252
 
            method: string representing the method to call
253
 
            args: dictionary of arg: value
254
 
 
255
 
        Example: {'method': 'echo', 'args': {'value': 42}}
256
 
 
257
 
        """
258
 
        # It is important to clear the context here, because at this point
259
 
        # the previous context is stored in local.store.context
260
 
        if hasattr(local.store, 'context'):
261
 
            del local.store.context
262
 
        rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
263
 
        # This will be popped off in _unpack_context
264
 
        msg_id = message_data.get('_msg_id', None)
265
 
        ctxt = _unpack_context(message_data)
266
 
 
267
 
        method = message_data.get('method')
268
 
        args = message_data.get('args', {})
269
 
        message.ack()
270
 
        if not method:
271
 
            # NOTE(vish): we may not want to ack here, but that means that bad
272
 
            #             messages stay in the queue indefinitely, so for now
273
 
            #             we just log the message and send an error string
274
 
            #             back to the caller
275
 
            LOG.warn(_('no method for message: %s') % message_data)
276
 
            ctxt.reply(msg_id,
277
 
                    _('No method for message: %s') % message_data)
278
 
            return
279
 
        self.pool.spawn_n(self._process_data, ctxt, method, args)
280
 
 
281
 
    @exception.wrap_exception()
282
 
    def _process_data(self, ctxt, method, args):
283
 
        """Thread that magically looks for a method on the proxy
284
 
        object and calls it.
285
 
        """
286
 
        ctxt.update_store()
287
 
        node_func = getattr(self.proxy, str(method))
288
 
        node_args = dict((str(k), v) for k, v in args.iteritems())
289
 
        # NOTE(vish): magic is fun!
290
 
        try:
291
 
            rval = node_func(context=ctxt, **node_args)
292
 
            # Check if the result was a generator
293
 
            if inspect.isgenerator(rval):
294
 
                for x in rval:
295
 
                    ctxt.reply(x, None)
296
 
            else:
297
 
                ctxt.reply(rval, None)
298
 
 
299
 
            # This final None tells multicall that it is done.
300
 
            ctxt.reply(ending=True)
301
 
        except Exception as e:
302
 
            LOG.exception('Exception during message handling')
303
 
            ctxt.reply(None, sys.exc_info())
304
 
        return
305
 
 
306
 
 
307
 
class TopicAdapterConsumer(AdapterConsumer):
308
 
    """Consumes messages on a specific topic."""
309
 
 
310
 
    exchange_type = 'topic'
311
 
 
312
 
    def __init__(self, connection=None, topic='broadcast', proxy=None):
313
 
        self.queue = topic
314
 
        self.routing_key = topic
315
 
        self.exchange = FLAGS.control_exchange
316
 
        self.durable = FLAGS.rabbit_durable_queues
317
 
        super(TopicAdapterConsumer, self).__init__(connection=connection,
318
 
                                    topic=topic, proxy=proxy)
319
 
 
320
 
 
321
 
class FanoutAdapterConsumer(AdapterConsumer):
322
 
    """Consumes messages from a fanout exchange."""
323
 
 
324
 
    exchange_type = 'fanout'
325
 
 
326
 
    def __init__(self, connection=None, topic='broadcast', proxy=None):
327
 
        self.exchange = '%s_fanout' % topic
328
 
        self.routing_key = topic
329
 
        unique = uuid.uuid4().hex
330
 
        self.queue = '%s_fanout_%s' % (topic, unique)
331
 
        self.durable = False
332
 
        # Fanout creates unique queue names, so we should auto-remove
333
 
        # them when done, so they're not left around on restart.
334
 
        # Also, we're the only one that should be consuming.  exclusive
335
 
        # implies auto_delete, so we'll just set that..
336
 
        self.exclusive = True
337
 
        LOG.info(_('Created "%(exchange)s" fanout exchange '
338
 
                   'with "%(key)s" routing key'),
339
 
                 dict(exchange=self.exchange, key=self.routing_key))
340
 
        super(FanoutAdapterConsumer, self).__init__(connection=connection,
341
 
                                    topic=topic, proxy=proxy)
342
 
 
343
 
 
344
 
class ConsumerSet(object):
345
 
    """Groups consumers to listen on together on a single connection."""
346
 
 
347
 
    def __init__(self, connection, consumer_list):
348
 
        self.consumer_list = set(consumer_list)
349
 
        self.consumer_set = None
350
 
        self.enabled = True
351
 
        self.init(connection)
352
 
 
353
 
    def init(self, conn):
354
 
        if not conn:
355
 
            conn = Connection.instance(new=True)
356
 
        if self.consumer_set:
357
 
            self.consumer_set.close()
358
 
        self.consumer_set = messaging.ConsumerSet(conn)
359
 
        for consumer in self.consumer_list:
360
 
            consumer.connection = conn
361
 
            # consumer.backend is set for us
362
 
            self.consumer_set.add_consumer(consumer)
363
 
 
364
 
    def reconnect(self):
365
 
        self.init(None)
366
 
 
367
 
    def wait(self, limit=None):
368
 
        running = True
369
 
        while running:
370
 
            it = self.consumer_set.iterconsume(limit=limit)
371
 
            if not it:
372
 
                break
373
 
            while True:
374
 
                try:
375
 
                    it.next()
376
 
                except StopIteration:
377
 
                    return
378
 
                except greenlet.GreenletExit:
379
 
                    running = False
380
 
                    break
381
 
                except Exception as e:
382
 
                    LOG.exception(_("Exception while processing consumer"))
383
 
                    self.reconnect()
384
 
                    # Break to outer loop
385
 
                    break
386
 
 
387
 
    def close(self):
388
 
        self.consumer_set.close()
389
 
 
390
 
 
391
 
class Publisher(messaging.Publisher):
392
 
    """Publisher base class."""
393
 
    pass
394
 
 
395
 
 
396
 
class TopicPublisher(Publisher):
397
 
    """Publishes messages on a specific topic."""
398
 
 
399
 
    exchange_type = 'topic'
400
 
 
401
 
    def __init__(self, connection=None, topic='broadcast', durable=None):
402
 
        self.routing_key = topic
403
 
        self.exchange = FLAGS.control_exchange
404
 
        self.durable = (FLAGS.rabbit_durable_queues if durable is None
405
 
                                                    else durable)
406
 
        super(TopicPublisher, self).__init__(connection=connection)
407
 
 
408
 
 
409
 
class FanoutPublisher(Publisher):
410
 
    """Publishes messages to a fanout exchange."""
411
 
 
412
 
    exchange_type = 'fanout'
413
 
 
414
 
    def __init__(self, topic, connection=None):
415
 
        self.exchange = '%s_fanout' % topic
416
 
        self.queue = '%s_fanout' % topic
417
 
        self.durable = False
418
 
        self.auto_delete = True
419
 
        LOG.info(_('Creating "%(exchange)s" fanout exchange'),
420
 
                 dict(exchange=self.exchange))
421
 
        super(FanoutPublisher, self).__init__(connection=connection)
422
 
 
423
 
 
424
 
class DirectConsumer(Consumer):
425
 
    """Consumes messages directly on a channel specified by msg_id."""
426
 
 
427
 
    exchange_type = 'direct'
428
 
 
429
 
    def __init__(self, connection=None, msg_id=None):
430
 
        self.queue = msg_id
431
 
        self.routing_key = msg_id
432
 
        self.exchange = msg_id
433
 
        self.durable = False
434
 
        self.auto_delete = True
435
 
        self.exclusive = True
436
 
        super(DirectConsumer, self).__init__(connection=connection)
437
 
 
438
 
 
439
 
class DirectPublisher(Publisher):
440
 
    """Publishes messages directly on a channel specified by msg_id."""
441
 
 
442
 
    exchange_type = 'direct'
443
 
 
444
 
    def __init__(self, connection=None, msg_id=None):
445
 
        self.routing_key = msg_id
446
 
        self.exchange = msg_id
447
 
        self.durable = False
448
 
        self.auto_delete = True
449
 
        super(DirectPublisher, self).__init__(connection=connection)
450
 
 
451
 
 
452
 
def msg_reply(msg_id, reply=None, failure=None, ending=False):
453
 
    """Sends a reply or an error on the channel signified by msg_id.
454
 
 
455
 
    Failure should be a sys.exc_info() tuple.
456
 
 
457
 
    """
458
 
    if failure:
459
 
        message = str(failure[1])
460
 
        tb = traceback.format_exception(*failure)
461
 
        LOG.error(_("Returning exception %s to caller"), message)
462
 
        LOG.error(tb)
463
 
        failure = (failure[0].__name__, str(failure[1]), tb)
464
 
 
465
 
    with ConnectionPool.item() as conn:
466
 
        publisher = DirectPublisher(connection=conn, msg_id=msg_id)
467
 
        try:
468
 
            msg = {'result': reply, 'failure': failure}
469
 
            if ending:
470
 
                msg['ending'] = True
471
 
            publisher.send(msg)
472
 
        except TypeError:
473
 
            msg = {'result': dict((k, repr(v))
474
 
                            for k, v in reply.__dict__.iteritems()),
475
 
                    'failure': failure}
476
 
            if ending:
477
 
                msg['ending'] = True
478
 
            publisher.send(msg)
479
 
 
480
 
        publisher.close()
481
 
 
482
 
 
483
 
def _unpack_context(msg):
484
 
    """Unpack context from msg."""
485
 
    context_dict = {}
486
 
    for key in list(msg.keys()):
487
 
        # NOTE(vish): Some versions of python don't like unicode keys
488
 
        #             in kwargs.
489
 
        key = str(key)
490
 
        if key.startswith('_context_'):
491
 
            value = msg.pop(key)
492
 
            context_dict[key[9:]] = value
493
 
    context_dict['msg_id'] = msg.pop('_msg_id', None)
494
 
    ctx = RpcContext.from_dict(context_dict)
495
 
    LOG.debug(_('unpacked context: %s'), ctx.to_dict())
496
 
    return ctx
497
 
 
498
 
 
499
 
def _pack_context(msg, context):
500
 
    """Pack context into msg.
501
 
 
502
 
    Values for message keys need to be less than 255 chars, so we pull
503
 
    context out into a bunch of separate keys. If we want to support
504
 
    more arguments in rabbit messages, we may want to do the same
505
 
    for args at some point.
506
 
 
507
 
    """
508
 
    context_d = dict([('_context_%s' % key, value)
509
 
                      for (key, value) in context.to_dict().iteritems()])
510
 
    msg.update(context_d)
511
 
 
512
 
 
513
 
class RpcContext(context.RequestContext):
514
 
    def __init__(self, *args, **kwargs):
515
 
        msg_id = kwargs.pop('msg_id', None)
516
 
        self.msg_id = msg_id
517
 
        super(RpcContext, self).__init__(*args, **kwargs)
518
 
 
519
 
    def reply(self, reply=None, failure=None, ending=False):
520
 
        if self.msg_id:
521
 
            msg_reply(self.msg_id, reply, failure, ending)
522
 
            if ending:
523
 
                self.msg_id = None
524
 
 
525
 
 
526
 
def multicall(context, topic, msg, timeout=None):
527
 
    """Make a call that returns multiple times."""
528
 
    # NOTE(russellb): carrot doesn't support timeouts
529
 
    LOG.debug(_('Making asynchronous call on %s ...'), topic)
530
 
    msg_id = uuid.uuid4().hex
531
 
    msg.update({'_msg_id': msg_id})
532
 
    LOG.debug(_('MSG_ID is %s') % (msg_id))
533
 
    _pack_context(msg, context)
534
 
 
535
 
    con_conn = ConnectionPool.get()
536
 
    consumer = DirectConsumer(connection=con_conn, msg_id=msg_id)
537
 
    wait_msg = MulticallWaiter(consumer)
538
 
    consumer.register_callback(wait_msg)
539
 
 
540
 
    publisher = TopicPublisher(connection=con_conn, topic=topic)
541
 
    publisher.send(msg)
542
 
    publisher.close()
543
 
 
544
 
    return wait_msg
545
 
 
546
 
 
547
 
class MulticallWaiter(object):
548
 
    def __init__(self, consumer):
549
 
        self._consumer = consumer
550
 
        self._results = queue.Queue()
551
 
        self._closed = False
552
 
        self._got_ending = False
553
 
 
554
 
    def close(self):
555
 
        if self._closed:
556
 
            return
557
 
        self._closed = True
558
 
        self._consumer.close()
559
 
        ConnectionPool.put(self._consumer.connection)
560
 
 
561
 
    def __call__(self, data, message):
562
 
        """Acks message and sets result."""
563
 
        message.ack()
564
 
        if data['failure']:
565
 
            self._results.put(rpc_common.RemoteError(*data['failure']))
566
 
        elif data.get('ending', False):
567
 
            self._got_ending = True
568
 
        else:
569
 
            self._results.put(data['result'])
570
 
 
571
 
    def __iter__(self):
572
 
        return self.wait()
573
 
 
574
 
    def wait(self):
575
 
        while not self._closed:
576
 
            try:
577
 
                rv = self._consumer.fetch(enable_callbacks=True)
578
 
            except Exception:
579
 
                self.close()
580
 
                raise
581
 
            if rv is None:
582
 
                time.sleep(0.01)
583
 
                continue
584
 
            if self._got_ending:
585
 
                self.close()
586
 
                raise StopIteration
587
 
            result = self._results.get()
588
 
            if isinstance(result, Exception):
589
 
                self.close()
590
 
                raise result
591
 
            yield result
592
 
 
593
 
 
594
 
def create_connection(new=True):
595
 
    """Create a connection"""
596
 
    return Connection.instance(new=new)
597
 
 
598
 
 
599
 
def call(context, topic, msg, timeout=None):
600
 
    """Sends a message on a topic and wait for a response."""
601
 
    rv = multicall(context, topic, msg, timeout)
602
 
    # NOTE(vish): return the last result from the multicall
603
 
    rv = list(rv)
604
 
    if not rv:
605
 
        return
606
 
    return rv[-1]
607
 
 
608
 
 
609
 
def cast(context, topic, msg):
610
 
    """Sends a message on a topic without waiting for a response."""
611
 
    LOG.debug(_('Making asynchronous cast on %s...'), topic)
612
 
    _pack_context(msg, context)
613
 
    with ConnectionPool.item() as conn:
614
 
        publisher = TopicPublisher(connection=conn, topic=topic)
615
 
        publisher.send(msg)
616
 
        publisher.close()
617
 
 
618
 
 
619
 
def fanout_cast(context, topic, msg):
620
 
    """Sends a message on a fanout exchange without waiting for a response."""
621
 
    LOG.debug(_('Making asynchronous fanout cast...'))
622
 
    _pack_context(msg, context)
623
 
    with ConnectionPool.item() as conn:
624
 
        publisher = FanoutPublisher(topic, connection=conn)
625
 
        publisher.send(msg)
626
 
        publisher.close()
627
 
 
628
 
 
629
 
def notify(context, topic, msg):
630
 
    """Sends a notification event on a topic."""
631
 
    LOG.debug(_('Sending notification on %s...'), topic)
632
 
    _pack_context(msg, context)
633
 
    with ConnectionPool.item() as conn:
634
 
        publisher = TopicPublisher(connection=conn, topic=topic,
635
 
                                   durable=True)
636
 
        publisher.send(msg)
637
 
        publisher.close()
638
 
 
639
 
 
640
 
def cleanup():
641
 
    pass
642
 
 
643
 
 
644
 
def generic_response(message_data, message):
645
 
    """Logs a result and exits."""
646
 
    LOG.debug(_('response %s'), message_data)
647
 
    message.ack()
648
 
    sys.exit(0)
649
 
 
650
 
 
651
 
def send_message(topic, message, wait=True):
652
 
    """Sends a message for testing."""
653
 
    msg_id = uuid.uuid4().hex
654
 
    message.update({'_msg_id': msg_id})
655
 
    LOG.debug(_('topic is %s'), topic)
656
 
    LOG.debug(_('message %s'), message)
657
 
 
658
 
    if wait:
659
 
        consumer = messaging.Consumer(connection=Connection.instance(),
660
 
                                      queue=msg_id,
661
 
                                      exchange=msg_id,
662
 
                                      auto_delete=True,
663
 
                                      exchange_type='direct',
664
 
                                      routing_key=msg_id)
665
 
        consumer.register_callback(generic_response)
666
 
 
667
 
    publisher = messaging.Publisher(connection=Connection.instance(),
668
 
                                    exchange=FLAGS.control_exchange,
669
 
                                    durable=FLAGS.rabbit_durable_queues,
670
 
                                    exchange_type='topic',
671
 
                                    routing_key=topic)
672
 
    publisher.send(message)
673
 
    publisher.close()
674
 
 
675
 
    if wait:
676
 
        consumer.wait()
677
 
        consumer.close()
678
 
 
679
 
 
680
 
if __name__ == '__main__':
681
 
    # You can send messages from the command line using
682
 
    # topic and a json string representing a dictionary
683
 
    # for the method
684
 
    send_message(sys.argv[1], json.loads(sys.argv[2]))