~jamestait/txamqp/txamqp-additional-examples

« back to all changes in this revision

Viewing changes to src/examples/rabbitmq-tutorial/txamqp-example.py

  • Committer: James Tait
  • Date: 2011-03-25 01:16:59 UTC
  • Revision ID: james.tait@canonical.com-20110325011659-6au3ia16kz5lx69z
Re-wrote and enabled publish6. Fixed consume6 to turn the input into an int, and get the request properties to use on the response. A little more clean-up, and reference the spec within the txamp codebase instead of relying on one in python-txamqp being in the right place.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
#!/usr/bin/env python
2
2
 
3
 
import optparse, sys
 
3
import optparse, sys, uuid
4
4
 
5
5
from twisted.internet import reactor
6
6
from twisted.internet.defer import inlineCallbacks, returnValue
168
168
    while True:
169
169
        msg = yield queue.get()
170
170
        input_ = msg.content.body
 
171
        properties = msg.content.properties
171
172
        msgnum += 1
172
173
        print ' [%04d] Received fib(%r) from channel #%d' % (
173
174
            msgnum, input_, channel.id)
174
 
        output = fib(input_)
 
175
        output = fib(int(input_))
175
176
        response = Content(str(output))
176
 
        response['correlation id'] = msg['correlation id']
177
 
        channel.basic_publish(exchange='', routing_key=msg['reply to'], content=response)
 
177
        response['correlation id'] = properties['correlation id']
 
178
        channel.basic_publish(exchange='', routing_key=properties['reply to'], content=response)
178
179
    returnValue(result)
179
180
 
180
181
@inlineCallbacks
244
245
@inlineCallbacks
245
246
def publish6(result, message, count_):
246
247
    connection, channel = result
247
 
    yield channel.exchange_declare(exchange=exchange_name, type='topic')
248
 
    for i in range(count_):
249
 
        msg = Content('%s [%04d]' % (message, i))
250
 
        severity = SEVERITIES[i % len(SEVERITIES)]
251
 
        facility = FACILITIES[i % len(FACILITIES)]
252
 
        routing_key = '%s.%s' % (facility, severity)
253
 
        yield channel.basic_publish(
254
 
            exchange=exchange_name, routing_key=routing_key, content=msg)
255
 
        print ' [x] Sent "%s [%04d]" with facility and severity [%s]' % (
256
 
            message, i, routing_key)
 
248
 
 
249
    class FibonacciClient(object):
 
250
        def __init__(self):
 
251
            self.connection = connection
 
252
            self.channel = channel
 
253
            self.corr_id = None
 
254
 
 
255
        @inlineCallbacks
 
256
        def call(self, n):
 
257
            self.corr_id = str(uuid.uuid4())
 
258
            reply = yield self.channel.queue_declare(exclusive=True)
 
259
            self.callback_queue = reply.queue
 
260
            msg = Content(str(n))
 
261
            msg['correlation id'] = self.corr_id
 
262
            msg['reply to'] = self.callback_queue
 
263
            yield self.channel.basic_publish(
 
264
                exchange='', routing_key='rpc_queue', content=msg)
 
265
            print ' [x] Sent "%s"' % n
 
266
            yield self.channel.basic_consume(
 
267
                queue=self.callback_queue, no_ack=True, consumer_tag='qtag')
 
268
            queue = yield connection.queue('qtag')
 
269
            while True:
 
270
                response = yield queue.get()
 
271
                if response.content.properties['correlation id'] == self.corr_id:
 
272
                    returnValue(response.content.body)
 
273
 
 
274
    fib_rpc = FibonacciClient()
 
275
    print ' [x] Requesting fib(%s)' % message
 
276
    response = yield fib_rpc.call(message)
 
277
    print ' [.] Got %r' % response
257
278
    returnValue(result)
258
279
 
259
280
PUBLISH_EXAMPLES = {
262
283
    'publish3': publish3,
263
284
    'publish4': publish4,
264
285
    'publish5': publish5,
265
 
#    'publish6': publish6,
 
286
    'publish6': publish6,
266
287
}
267
288
 
268
289
CONSUME_EXAMPLES = {
334
355
    vhost = '/'
335
356
    username = 'guest'
336
357
    password = 'guest'
337
 
    spec = txamqp.spec.load('../../src/python-txamqp-0.3/src/specs/standard/amqp0-8.xml')
 
358
    spec = txamqp.spec.load('src/specs/standard/amqp0-8.stripped.xml')
338
359
 
339
360
    delegate = TwistedDelegate()
340
361
 
346
367
        d.addCallback(ALL_EXAMPLES[example], message, count_)
347
368
    else:
348
369
        d.addCallback(ALL_EXAMPLES[example], topics)
349
 
#    if example not in PUBLISH_EXAMPLES:
350
 
#        d.addCallback(setup_queue_cleanup)
351
 
    # only executed in publishers
352
370
    d.addCallback(publisher_cleanup)
353
371
    d.addErrback(lambda f: sys.stderr.write(str(f)))
354
372
    reactor.run()