~cbehrens/nova/lp844160-build-works-with-zones

« back to all changes in this revision

Viewing changes to nova/rpc.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
# Copyright [2010] [Anso Labs, LLC]
 
3
 
4
#    Licensed under the Apache License, Version 2.0 (the "License");
 
5
#    you may not use this file except in compliance with the License.
 
6
#    You may obtain a copy of the License at
 
7
 
8
#        http://www.apache.org/licenses/LICENSE-2.0
 
9
 
10
#    Unless required by applicable law or agreed to in writing, software
 
11
#    distributed under the License is distributed on an "AS IS" BASIS,
 
12
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
13
#    See the License for the specific language governing permissions and
 
14
#    limitations under the License.
 
15
 
 
16
"""
 
17
AMQP-based RPC. Queues have consumers and publishers.
 
18
No fan-out support yet.
 
19
"""
 
20
 
 
21
import logging
 
22
import sys
 
23
import uuid
 
24
 
 
25
from nova import vendor
 
26
import anyjson
 
27
from carrot import connection
 
28
from carrot import messaging
 
29
from twisted.internet import defer
 
30
from twisted.internet import reactor
 
31
from twisted.internet import task
 
32
 
 
33
from nova import fakerabbit
 
34
from nova import flags
 
35
 
 
36
 
 
37
FLAGS = flags.FLAGS
 
38
 
 
39
 
 
40
_log = logging.getLogger('amqplib')
 
41
_log.setLevel(logging.WARN)
 
42
 
 
43
 
 
44
class Connection(connection.BrokerConnection):
 
45
    @classmethod
 
46
    def instance(cls):
 
47
        if not hasattr(cls, '_instance'):
 
48
            params = dict(hostname=FLAGS.rabbit_host,
 
49
                          port=FLAGS.rabbit_port,
 
50
                          userid=FLAGS.rabbit_userid,
 
51
                          password=FLAGS.rabbit_password,
 
52
                          virtual_host=FLAGS.rabbit_virtual_host)
 
53
 
 
54
            if FLAGS.fake_rabbit:
 
55
                params['backend_cls'] = fakerabbit.Backend
 
56
 
 
57
            cls._instance = cls(**params)
 
58
        return cls._instance
 
59
 
 
60
 
 
61
class Consumer(messaging.Consumer):
 
62
    # TODO(termie): it would be nice to give these some way of automatically
 
63
    #               cleaning up after themselves
 
64
    def attach_to_tornado(self, io_inst=None):
 
65
        from tornado import ioloop
 
66
        if io_inst is None:
 
67
            io_inst = ioloop.IOLoop.instance()
 
68
 
 
69
        injected = ioloop.PeriodicCallback(
 
70
            lambda: self.fetch(enable_callbacks=True), 1, io_loop=io_inst)
 
71
        injected.start()
 
72
        return injected
 
73
 
 
74
    attachToTornado = attach_to_tornado
 
75
 
 
76
    def attach_to_twisted(self):
 
77
        loop = task.LoopingCall(self.fetch, enable_callbacks=True)
 
78
        loop.start(interval=0.001)
 
79
 
 
80
class Publisher(messaging.Publisher):
 
81
    pass
 
82
 
 
83
 
 
84
class TopicConsumer(Consumer):
 
85
    exchange_type = "topic" 
 
86
    def __init__(self, connection=None, topic="broadcast"):
 
87
        self.queue = topic
 
88
        self.routing_key = topic
 
89
        self.exchange = FLAGS.control_exchange
 
90
        super(TopicConsumer, self).__init__(connection=connection)
 
91
 
 
92
 
 
93
class AdapterConsumer(TopicConsumer):
 
94
    def __init__(self, connection=None, topic="broadcast", proxy=None):
 
95
        _log.debug('Initing the Adapter Consumer for %s' % (topic))
 
96
        self.proxy = proxy
 
97
        super(AdapterConsumer, self).__init__(connection=connection, topic=topic)
 
98
 
 
99
    def receive(self, message_data, message):
 
100
        _log.debug('received %s' % (message_data))
 
101
        msg_id = message_data.pop('_msg_id', None)
 
102
 
 
103
        method = message_data.get('method')
 
104
        args = message_data.get('args', {})
 
105
        if not method:
 
106
            return
 
107
 
 
108
        node_func = getattr(self.proxy, str(method))     
 
109
        node_args = dict((str(k), v) for k, v in args.iteritems())
 
110
        d = defer.maybeDeferred(node_func, **node_args)
 
111
        if msg_id:
 
112
            d.addCallback(lambda rval: msg_reply(msg_id, rval))
 
113
            d.addErrback(lambda e: msg_reply(msg_id, str(e)))
 
114
        message.ack()
 
115
        return
 
116
 
 
117
 
 
118
class TopicPublisher(Publisher):
 
119
    exchange_type = "topic" 
 
120
    def __init__(self, connection=None, topic="broadcast"):
 
121
        self.routing_key = topic
 
122
        self.exchange = FLAGS.control_exchange
 
123
        super(TopicPublisher, self).__init__(connection=connection)
 
124
        
 
125
 
 
126
class DirectConsumer(Consumer):
 
127
    exchange_type = "direct" 
 
128
    def __init__(self, connection=None, msg_id=None):
 
129
        self.queue = msg_id
 
130
        self.routing_key = msg_id
 
131
        self.exchange = msg_id
 
132
        self.auto_delete = True
 
133
        super(DirectConsumer, self).__init__(connection=connection)
 
134
 
 
135
 
 
136
class DirectPublisher(Publisher):
 
137
    exchange_type = "direct"
 
138
    def __init__(self, connection=None, msg_id=None):
 
139
        self.routing_key = msg_id
 
140
        self.exchange = msg_id
 
141
        self.auto_delete = True
 
142
        super(DirectPublisher, self).__init__(connection=connection)
 
143
 
 
144
 
 
145
def msg_reply(msg_id, reply):
 
146
    conn = Connection.instance()
 
147
    publisher = DirectPublisher(connection=conn, msg_id=msg_id)
 
148
    
 
149
    try:
 
150
        publisher.send({'result': reply})
 
151
    except TypeError:
 
152
        publisher.send(
 
153
                {'result': dict((k, repr(v)) 
 
154
                                for k, v in reply.__dict__.iteritems())
 
155
                 })
 
156
    publisher.close()
 
157
 
 
158
 
 
159
def call(topic, msg):
 
160
    _log.debug("Making asynchronous call...")
 
161
    msg_id = uuid.uuid4().hex
 
162
    msg.update({'_msg_id': msg_id})
 
163
    _log.debug("MSG_ID is %s" % (msg_id))
 
164
    
 
165
    conn = Connection.instance()
 
166
    d = defer.Deferred()
 
167
    consumer = DirectConsumer(connection=conn, msg_id=msg_id)
 
168
    consumer.register_callback(lambda data, message: d.callback(data))
 
169
    injected = consumer.attach_to_tornado()
 
170
 
 
171
    # clean up after the injected listened and return x
 
172
    d.addCallback(lambda x: injected.stop() and x or x)
 
173
 
 
174
    publisher = TopicPublisher(connection=conn, topic=topic)
 
175
    publisher.send(msg)
 
176
    publisher.close()
 
177
    return d
 
178
 
 
179
 
 
180
def cast(topic, msg):
 
181
    _log.debug("Making asynchronous cast...")
 
182
    conn = Connection.instance()
 
183
    publisher = TopicPublisher(connection=conn, topic=topic)
 
184
    publisher.send(msg)
 
185
    publisher.close()
 
186
 
 
187
 
 
188
def generic_response(message_data, message):
 
189
    _log.debug('response %s', message_data)
 
190
    message.ack()
 
191
    sys.exit(0)
 
192
 
 
193
 
 
194
def send_message(topic, message, wait=True):
 
195
    msg_id = uuid.uuid4().hex
 
196
    message.update({'_msg_id': msg_id})
 
197
    _log.debug('topic is %s', topic)
 
198
    _log.debug('message %s', message)
 
199
 
 
200
    if wait:
 
201
        consumer = messaging.Consumer(connection=rpc.Connection.instance(),
 
202
                                      queue=msg_id,
 
203
                                      exchange=msg_id,
 
204
                                      auto_delete=True,
 
205
                                      exchange_type="direct",
 
206
                                      routing_key=msg_id)
 
207
        consumer.register_callback(generic_response)
 
208
 
 
209
    publisher = messaging.Publisher(connection=rpc.Connection.instance(),
 
210
                                    exchange="nova",
 
211
                                    exchange_type="topic",
 
212
                                    routing_key=topic)
 
213
    publisher.send(message)
 
214
    publisher.close()
 
215
 
 
216
    if wait:
 
217
        consumer.wait()
 
218
    
 
219
 
 
220
# TODO: Replace with a docstring test    
 
221
if __name__ == "__main__":
 
222
    send_message(sys.argv[1], anyjson.deserialize(sys.argv[2]))