~txamqpteam/txamqp/sync-with-thrift

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from zope.interface import Interface, Attribute
from txamqp.protocol import AMQClient
from txamqp.contrib.thrift.transport import TwistedAMQPTransport
from txamqp.content import Content

from twisted.internet import defer
from twisted.python import log

from thrift.protocol import TBinaryProtocol
from thrift.transport import TTwisted, TTransport

class ThriftAMQClient(AMQClient):

    def __init__(self, *args, **kwargs):
        AMQClient.__init__(self, *args, **kwargs)

        if self.check_0_8():
            self.replyToField = "reply to"
        else:
            self.replyToField = "reply-to"

    @defer.inlineCallbacks
    def createThriftClient(self, responsesExchange, serviceExchange,
        routingKey, clientClass, responseQueue=None, iprot_factory=None,
        oprot_factory=None):

        channel = yield self.channel(1)

        if responseQueue is None:
            reply = yield channel.queue_declare(exclusive=True,
                auto_delete=True)

            responseQueue = reply.queue

            yield channel.queue_bind(queue=responseQueue,
                exchange=responsesExchange, routing_key=responseQueue)

        reply = yield channel.basic_consume(queue=responseQueue)

        log.msg("Consuming messages on queue: %s" % responseQueue)

        amqpTransport = TwistedAMQPTransport(channel, serviceExchange,
            routingKey, replyTo=responseQueue, replyToField=self.replyToField)

        if iprot_factory is None:
            iprot_factory = self.factory.iprot_factory

        if oprot_factory is None:
            oprot_factory = self.factory.oprot_factory

        thriftClient = clientClass(amqpTransport, oprot_factory)

        queue = yield self.queue(reply.consumer_tag)
        queue.get().addCallback(self.parseClientMessage, channel, queue,
            thriftClient, iprot_factory=iprot_factory)

        defer.returnValue(thriftClient)

    def parseClientMessage(self, msg, channel, queue, thriftClient,
        iprot_factory=None):
        deliveryTag = msg.delivery_tag
        tr = TTransport.TMemoryBuffer(msg.content.body)
        if iprot_factory is None:
            iprot = self.factory.iprot_factory.getProtocol(tr)
        else:
            iprot = iprot_factory.getProtocol(tr)
        (fname, mtype, rseqid) = iprot.readMessageBegin()

        method = getattr(thriftClient, 'recv_' + fname)
        method(iprot, mtype, rseqid)

        channel.basic_ack(deliveryTag, True)
        queue.get().addCallback(self.parseClientMessage, channel, queue,
            thriftClient, iprot_factory=iprot_factory)

    def parseServerMessage(self, msg, channel, exchange, queue, processor,
        iprot_factory=None, oprot_factory=None):
        deliveryTag = msg.delivery_tag
        try:
            replyTo = msg.content[self.replyToField]
        except KeyError:
            replyTo = None

        tmi = TTransport.TMemoryBuffer(msg.content.body)
        tr = TwistedAMQPTransport(channel, exchange, replyTo)

        if iprot_factory is None:
            iprot = self.factory.iprot_factory.getProtocol(tmi)
        else:
            iprot = iprot_factory.getProtocol(tmi)

        if oprot_factory is None:
            oprot = self.factory.oprot_factory.getProtocol(tr)
        else:
            oprot = oprot_factory.getProtocol(tr)

        d = processor.process(iprot, oprot)
        channel.basic_ack(deliveryTag, True)

        return queue.get().addCallback(self.parseServerMessage, channel,
            exchange, queue, processor, iprot_factory, oprot_factory)


class IThriftAMQClientFactory(Interface):

    iprot_factory = Attribute("Input protocol factory")
    oprot_factory = Attribute("Input protocol factory")
    processor = Attribute("Thrift processor")