1
from zope.interface import Interface, Attribute
2
from txamqp.protocol import AMQClient
3
from txamqp.contrib.thrift.transport import TwistedAMQPTransport
4
from txamqp.content import Content
6
from twisted.internet import defer
7
from twisted.python import log
9
from thrift.protocol import TBinaryProtocol
10
from thrift.transport import TTwisted, TTransport
12
class ThriftAMQClient(AMQClient):
14
def __init__(self, *args, **kwargs):
15
AMQClient.__init__(self, *args, **kwargs)
18
self.replyToField = "reply to"
20
self.replyToField = "reply-to"
22
@defer.inlineCallbacks
23
def createThriftClient(self, responsesExchange, serviceExchange,
24
routingKey, clientClass, responseQueue=None, iprot_factory=None,
27
channel = yield self.channel(1)
29
if responseQueue is None:
30
reply = yield channel.queue_declare(exclusive=True,
33
responseQueue = reply.queue
35
yield channel.queue_bind(queue=responseQueue,
36
exchange=responsesExchange, routing_key=responseQueue)
38
reply = yield channel.basic_consume(queue=responseQueue)
40
log.msg("Consuming messages on queue: %s" % responseQueue)
42
amqpTransport = TwistedAMQPTransport(channel, serviceExchange,
43
routingKey, replyTo=responseQueue, replyToField=self.replyToField)
45
if iprot_factory is None:
46
iprot_factory = self.factory.iprot_factory
48
if oprot_factory is None:
49
oprot_factory = self.factory.oprot_factory
51
thriftClient = clientClass(amqpTransport, oprot_factory)
53
queue = yield self.queue(reply.consumer_tag)
54
queue.get().addCallback(self.parseClientMessage, channel, queue,
55
thriftClient, iprot_factory=iprot_factory)
57
defer.returnValue(thriftClient)
59
def parseClientMessage(self, msg, channel, queue, thriftClient,
61
deliveryTag = msg.delivery_tag
62
tr = TTransport.TMemoryBuffer(msg.content.body)
63
if iprot_factory is None:
64
iprot = self.factory.iprot_factory.getProtocol(tr)
66
iprot = iprot_factory.getProtocol(tr)
67
(fname, mtype, rseqid) = iprot.readMessageBegin()
69
method = getattr(thriftClient, 'recv_' + fname)
70
method(iprot, mtype, rseqid)
72
channel.basic_ack(deliveryTag, True)
73
queue.get().addCallback(self.parseClientMessage, channel, queue,
74
thriftClient, iprot_factory=iprot_factory)
76
def parseServerMessage(self, msg, channel, exchange, queue, processor,
77
iprot_factory=None, oprot_factory=None):
78
deliveryTag = msg.delivery_tag
80
replyTo = msg.content[self.replyToField]
84
tmi = TTransport.TMemoryBuffer(msg.content.body)
85
tr = TwistedAMQPTransport(channel, exchange, replyTo)
87
if iprot_factory is None:
88
iprot = self.factory.iprot_factory.getProtocol(tmi)
90
iprot = iprot_factory.getProtocol(tmi)
92
if oprot_factory is None:
93
oprot = self.factory.oprot_factory.getProtocol(tr)
95
oprot = oprot_factory.getProtocol(tr)
97
d = processor.process(iprot, oprot)
98
channel.basic_ack(deliveryTag, True)
100
return queue.get().addCallback(self.parseServerMessage, channel,
101
exchange, queue, processor, iprot_factory, oprot_factory)
104
class IThriftAMQClientFactory(Interface):
106
iprot_factory = Attribute("Input protocol factory")
107
oprot_factory = Attribute("Input protocol factory")
108
processor = Attribute("Thrift processor")