~txamqpteam/txamqp/sync-with-thrift

« back to all changes in this revision

Viewing changes to src/txamqp/contrib/thrift/protocol.py

  • Committer: Esteve Fernandez
  • Date: 2009-02-11 22:28:45 UTC
  • Revision ID: esteve@fluidinfo.com-20090211222845-wp1zmr7vexvqvt2c
synchronize with Thrift

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from zope.interface import Interface, Attribute
 
2
from txamqp.protocol import AMQClient
 
3
from txamqp.contrib.thrift.transport import TwistedAMQPTransport
 
4
from txamqp.content import Content
 
5
 
 
6
from twisted.internet import defer
 
7
from twisted.python import log
 
8
 
 
9
from thrift.protocol import TBinaryProtocol
 
10
from thrift.transport import TTwisted, TTransport
 
11
 
 
12
class ThriftAMQClient(AMQClient):
 
13
 
 
14
    def __init__(self, *args, **kwargs):
 
15
        AMQClient.__init__(self, *args, **kwargs)
 
16
 
 
17
        if self.check_0_8():
 
18
            self.replyToField = "reply to"
 
19
        else:
 
20
            self.replyToField = "reply-to"
 
21
 
 
22
    @defer.inlineCallbacks
 
23
    def createThriftClient(self, responsesExchange, serviceExchange,
 
24
        routingKey, clientClass, responseQueue=None, iprot_factory=None,
 
25
        oprot_factory=None):
 
26
 
 
27
        channel = yield self.channel(1)
 
28
 
 
29
        if responseQueue is None:
 
30
            reply = yield channel.queue_declare(exclusive=True,
 
31
                auto_delete=True)
 
32
 
 
33
            responseQueue = reply.queue
 
34
 
 
35
            yield channel.queue_bind(queue=responseQueue,
 
36
                exchange=responsesExchange, routing_key=responseQueue)
 
37
 
 
38
        reply = yield channel.basic_consume(queue=responseQueue)
 
39
 
 
40
        log.msg("Consuming messages on queue: %s" % responseQueue)
 
41
 
 
42
        amqpTransport = TwistedAMQPTransport(channel, serviceExchange,
 
43
            routingKey, replyTo=responseQueue, replyToField=self.replyToField)
 
44
 
 
45
        if iprot_factory is None:
 
46
            iprot_factory = self.factory.iprot_factory
 
47
 
 
48
        if oprot_factory is None:
 
49
            oprot_factory = self.factory.oprot_factory
 
50
 
 
51
        thriftClient = clientClass(amqpTransport, oprot_factory)
 
52
 
 
53
        queue = yield self.queue(reply.consumer_tag)
 
54
        queue.get().addCallback(self.parseClientMessage, channel, queue,
 
55
            thriftClient, iprot_factory=iprot_factory)
 
56
 
 
57
        defer.returnValue(thriftClient)
 
58
 
 
59
    def parseClientMessage(self, msg, channel, queue, thriftClient,
 
60
        iprot_factory=None):
 
61
        deliveryTag = msg.delivery_tag
 
62
        tr = TTransport.TMemoryBuffer(msg.content.body)
 
63
        if iprot_factory is None:
 
64
            iprot = self.factory.iprot_factory.getProtocol(tr)
 
65
        else:
 
66
            iprot = iprot_factory.getProtocol(tr)
 
67
        (fname, mtype, rseqid) = iprot.readMessageBegin()
 
68
 
 
69
        method = getattr(thriftClient, 'recv_' + fname)
 
70
        method(iprot, mtype, rseqid)
 
71
 
 
72
        channel.basic_ack(deliveryTag, True)
 
73
        queue.get().addCallback(self.parseClientMessage, channel, queue,
 
74
            thriftClient, iprot_factory=iprot_factory)
 
75
 
 
76
    def parseServerMessage(self, msg, channel, exchange, queue, processor,
 
77
        iprot_factory=None, oprot_factory=None):
 
78
        deliveryTag = msg.delivery_tag
 
79
        try:
 
80
            replyTo = msg.content[self.replyToField]
 
81
        except KeyError:
 
82
            replyTo = None
 
83
 
 
84
        tmi = TTransport.TMemoryBuffer(msg.content.body)
 
85
        tr = TwistedAMQPTransport(channel, exchange, replyTo)
 
86
 
 
87
        if iprot_factory is None:
 
88
            iprot = self.factory.iprot_factory.getProtocol(tmi)
 
89
        else:
 
90
            iprot = iprot_factory.getProtocol(tmi)
 
91
 
 
92
        if oprot_factory is None:
 
93
            oprot = self.factory.oprot_factory.getProtocol(tr)
 
94
        else:
 
95
            oprot = oprot_factory.getProtocol(tr)
 
96
 
 
97
        d = processor.process(iprot, oprot)
 
98
        channel.basic_ack(deliveryTag, True)
 
99
 
 
100
        return queue.get().addCallback(self.parseServerMessage, channel,
 
101
            exchange, queue, processor, iprot_factory, oprot_factory)
 
102
 
 
103
 
 
104
class IThriftAMQClientFactory(Interface):
 
105
 
 
106
    iprot_factory = Attribute("Input protocol factory")
 
107
    oprot_factory = Attribute("Input protocol factory")
 
108
    processor = Attribute("Thrift processor")