1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
# Copyright [2010] [Anso Labs, LLC]
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
8
# http://www.apache.org/licenses/LICENSE-2.0
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
17
AMQP-based RPC. Queues have consumers and publishers.
18
No fan-out support yet.
25
from nova import vendor
27
from carrot import connection
28
from carrot import messaging
29
from twisted.internet import defer
30
from twisted.internet import reactor
31
from twisted.internet import task
33
from nova import fakerabbit
34
from nova import flags
40
_log = logging.getLogger('amqplib')
41
_log.setLevel(logging.WARN)
44
class Connection(connection.BrokerConnection):
47
if not hasattr(cls, '_instance'):
48
params = dict(hostname=FLAGS.rabbit_host,
49
port=FLAGS.rabbit_port,
50
userid=FLAGS.rabbit_userid,
51
password=FLAGS.rabbit_password,
52
virtual_host=FLAGS.rabbit_virtual_host)
55
params['backend_cls'] = fakerabbit.Backend
57
cls._instance = cls(**params)
61
class Consumer(messaging.Consumer):
62
# TODO(termie): it would be nice to give these some way of automatically
63
# cleaning up after themselves
64
def attach_to_tornado(self, io_inst=None):
65
from tornado import ioloop
67
io_inst = ioloop.IOLoop.instance()
69
injected = ioloop.PeriodicCallback(
70
lambda: self.fetch(enable_callbacks=True), 1, io_loop=io_inst)
74
attachToTornado = attach_to_tornado
76
def attach_to_twisted(self):
77
loop = task.LoopingCall(self.fetch, enable_callbacks=True)
78
loop.start(interval=0.001)
80
class Publisher(messaging.Publisher):
84
class TopicConsumer(Consumer):
85
exchange_type = "topic"
86
def __init__(self, connection=None, topic="broadcast"):
88
self.routing_key = topic
89
self.exchange = FLAGS.control_exchange
90
super(TopicConsumer, self).__init__(connection=connection)
93
class AdapterConsumer(TopicConsumer):
94
def __init__(self, connection=None, topic="broadcast", proxy=None):
95
_log.debug('Initing the Adapter Consumer for %s' % (topic))
97
super(AdapterConsumer, self).__init__(connection=connection, topic=topic)
99
def receive(self, message_data, message):
100
_log.debug('received %s' % (message_data))
101
msg_id = message_data.pop('_msg_id', None)
103
method = message_data.get('method')
104
args = message_data.get('args', {})
108
node_func = getattr(self.proxy, str(method))
109
node_args = dict((str(k), v) for k, v in args.iteritems())
110
d = defer.maybeDeferred(node_func, **node_args)
112
d.addCallback(lambda rval: msg_reply(msg_id, rval))
113
d.addErrback(lambda e: msg_reply(msg_id, str(e)))
118
class TopicPublisher(Publisher):
119
exchange_type = "topic"
120
def __init__(self, connection=None, topic="broadcast"):
121
self.routing_key = topic
122
self.exchange = FLAGS.control_exchange
123
super(TopicPublisher, self).__init__(connection=connection)
126
class DirectConsumer(Consumer):
127
exchange_type = "direct"
128
def __init__(self, connection=None, msg_id=None):
130
self.routing_key = msg_id
131
self.exchange = msg_id
132
self.auto_delete = True
133
super(DirectConsumer, self).__init__(connection=connection)
136
class DirectPublisher(Publisher):
137
exchange_type = "direct"
138
def __init__(self, connection=None, msg_id=None):
139
self.routing_key = msg_id
140
self.exchange = msg_id
141
self.auto_delete = True
142
super(DirectPublisher, self).__init__(connection=connection)
145
def msg_reply(msg_id, reply):
146
conn = Connection.instance()
147
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
150
publisher.send({'result': reply})
153
{'result': dict((k, repr(v))
154
for k, v in reply.__dict__.iteritems())
159
def call(topic, msg):
160
_log.debug("Making asynchronous call...")
161
msg_id = uuid.uuid4().hex
162
msg.update({'_msg_id': msg_id})
163
_log.debug("MSG_ID is %s" % (msg_id))
165
conn = Connection.instance()
167
consumer = DirectConsumer(connection=conn, msg_id=msg_id)
168
consumer.register_callback(lambda data, message: d.callback(data))
169
injected = consumer.attach_to_tornado()
171
# clean up after the injected listened and return x
172
d.addCallback(lambda x: injected.stop() and x or x)
174
publisher = TopicPublisher(connection=conn, topic=topic)
180
def cast(topic, msg):
181
_log.debug("Making asynchronous cast...")
182
conn = Connection.instance()
183
publisher = TopicPublisher(connection=conn, topic=topic)
188
def generic_response(message_data, message):
189
_log.debug('response %s', message_data)
194
def send_message(topic, message, wait=True):
195
msg_id = uuid.uuid4().hex
196
message.update({'_msg_id': msg_id})
197
_log.debug('topic is %s', topic)
198
_log.debug('message %s', message)
201
consumer = messaging.Consumer(connection=rpc.Connection.instance(),
205
exchange_type="direct",
207
consumer.register_callback(generic_response)
209
publisher = messaging.Publisher(connection=rpc.Connection.instance(),
211
exchange_type="topic",
213
publisher.send(message)
220
# TODO: Replace with a docstring test
221
if __name__ == "__main__":
222
send_message(sys.argv[1], anyjson.deserialize(sys.argv[2]))