1
1
#!/usr/bin/env python
3
import optparse, sys, uuid
5
5
from twisted.internet import reactor
6
6
from twisted.internet.defer import inlineCallbacks, returnValue
169
169
msg = yield queue.get()
170
170
input_ = msg.content.body
171
properties = msg.content.properties
172
173
print ' [%04d] Received fib(%r) from channel #%d' % (
173
174
msgnum, input_, channel.id)
175
output = fib(int(input_))
175
176
response = Content(str(output))
176
response['correlation id'] = msg['correlation id']
177
channel.basic_publish(exchange='', routing_key=msg['reply to'], content=response)
177
response['correlation id'] = properties['correlation id']
178
channel.basic_publish(exchange='', routing_key=properties['reply to'], content=response)
178
179
returnValue(result)
245
246
def publish6(result, message, count_):
246
247
connection, channel = result
247
yield channel.exchange_declare(exchange=exchange_name, type='topic')
248
for i in range(count_):
249
msg = Content('%s [%04d]' % (message, i))
250
severity = SEVERITIES[i % len(SEVERITIES)]
251
facility = FACILITIES[i % len(FACILITIES)]
252
routing_key = '%s.%s' % (facility, severity)
253
yield channel.basic_publish(
254
exchange=exchange_name, routing_key=routing_key, content=msg)
255
print ' [x] Sent "%s [%04d]" with facility and severity [%s]' % (
256
message, i, routing_key)
249
class FibonacciClient(object):
251
self.connection = connection
252
self.channel = channel
257
self.corr_id = str(uuid.uuid4())
258
reply = yield self.channel.queue_declare(exclusive=True)
259
self.callback_queue = reply.queue
260
msg = Content(str(n))
261
msg['correlation id'] = self.corr_id
262
msg['reply to'] = self.callback_queue
263
yield self.channel.basic_publish(
264
exchange='', routing_key='rpc_queue', content=msg)
265
print ' [x] Sent "%s"' % n
266
yield self.channel.basic_consume(
267
queue=self.callback_queue, no_ack=True, consumer_tag='qtag')
268
queue = yield connection.queue('qtag')
270
response = yield queue.get()
271
if response.content.properties['correlation id'] == self.corr_id:
272
returnValue(response.content.body)
274
fib_rpc = FibonacciClient()
275
print ' [x] Requesting fib(%s)' % message
276
response = yield fib_rpc.call(message)
277
print ' [.] Got %r' % response
257
278
returnValue(result)
259
280
PUBLISH_EXAMPLES = {
346
367
d.addCallback(ALL_EXAMPLES[example], message, count_)
348
369
d.addCallback(ALL_EXAMPLES[example], topics)
349
# if example not in PUBLISH_EXAMPLES:
350
# d.addCallback(setup_queue_cleanup)
351
# only executed in publishers
352
370
d.addCallback(publisher_cleanup)
353
371
d.addErrback(lambda f: sys.stderr.write(str(f)))