30
34
class TestReceiver(TestCase):
32
def test_receive_one(self):
33
# Receiving a message from AMQP should republish it unaltered.
36
reports.append(report)
38
expected_report = {'id': 'foo', 'otherkey': 42}
39
message = amqp.Message(bson.dumps(expected_report))
40
connection = self.connection_factory()
41
self.addCleanup(connection.close)
42
channel = connection.channel()
43
self.addCleanup(channel.close)
44
queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
45
channel.basic_publish(message, queue.exchange_name, routing_key="")
47
config.publishers.append(capture)
48
receiver_channel = connection.channel()
49
self.addCleanup(receiver_channel.close)
50
receiver = Receiver(config, receiver_channel, queue.queue_name)
36
def test_stop_on_sentinel(self):
37
# A sentinel can be used to stop the receiver (useful for testing).
40
reports.append(report)
42
expected_report = {'id': 'foo', 'otherkey': 42}
43
message = amqp.Message(bson.dumps(expected_report))
44
channel = self.useFixture(
45
ChannelFixture(self.connection_factory)).channel
46
queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
47
channel.basic_publish(
48
message, queue.exchange_name, routing_key="")
50
channel.basic_publish(
51
amqp.Message(sentinel), queue.exchange_name, routing_key="")
53
config.publishers.append(capture)
54
receiver = Receiver(config, self.connection_factory, queue.queue_name)
55
receiver.sentinel = sentinel
56
receiver.run_forever()
57
self.assertEqual([expected_report], reports)
59
def test_stop_via_stopping(self):
60
# Setting the stopping field should stop the run_forever loop.
63
reports.append(report)
65
expected_report = {'id': 'foo', 'otherkey': 42}
66
message = amqp.Message(bson.dumps(expected_report))
67
channel = self.useFixture(
68
ChannelFixture(self.connection_factory)).channel
69
queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
70
channel.basic_publish(
71
message, queue.exchange_name, routing_key="")
73
config.publishers.append(capture)
51
74
# We don't want to loop forever: patch the channel so that after one
52
75
# call to wait (which will get our injected message) the loop will shut
53
# down. This also checks we use the consume_tag correctly.
54
old_wait = receiver_channel.wait
55
def new_wait(allowed_methods=None):
56
receiver.stopping = True
57
return old_wait(allowed_methods=allowed_methods)
58
receiver_channel.wait = new_wait
77
def patching_factory():
78
connection = self.connection_factory()
79
old_channel = connection.channel
81
result = old_channel()
82
old_wait = result.wait
83
def new_wait(allowed_methods=None):
84
receiver.stopping = True
85
return old_wait(allowed_methods=allowed_methods)
86
result.wait = new_wait
88
connection.channel = new_channel
90
receiver = Receiver(config, patching_factory, queue.queue_name)
59
91
receiver.run_forever()
60
92
self.assertEqual([expected_report], reports)
62
94
def test_run_forever(self):
63
# run_forever subscribes and then calls wait.
95
# run_forever subscribes and then calls wait in a loop.
99
def __init__(self, calls):
68
101
def basic_consume(self, queue_name, callback=None):
69
102
self.calls.append(('basic_consume', queue_name, callback))
71
105
self.calls.append(('wait',))
72
channel = FakeChannel()
73
receiver = Receiver(None, channel, 'foo')
106
if len(self.calls) > 2:
107
receiver.stopping = True
108
def basic_cancel(self, tag):
109
self.calls.append(('basic_cancel', tag))
112
class FakeConnection:
114
return FakeChannel(calls)
117
receiver = Receiver(None, FakeConnection, 'foo')
74
118
receiver.run_forever()
76
[('basic_consume', 'foo', receiver.handle_report), ('wait',)],
120
[('basic_consume', 'foo', receiver.handle_report),
123
('basic_cancel', 'tag')],
126
def test_tolerates_amqp_trouble(self):
127
# If the AMQP server is unavailable for a short period, the receiver
128
# will automatically reconnect.
129
# Break a connection to raise socket.error (which we know from the
130
# publisher tests is what leaks through when rabbit is shutdown).
131
# We raise it the first time on each amqp method call.
134
reports.append(report)
136
expected_report = {'id': 'foo', 'otherkey': 42}
137
message = amqp.Message(bson.dumps(expected_report))
138
channel = self.useFixture(
139
ChannelFixture(self.connection_factory)).channel
140
queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
141
channel.basic_publish(message, queue.exchange_name, routing_key="")
143
config.publishers.append(capture)
145
def error_once(func):
146
def wrapped(*args, **kwargs):
147
func_ref = func.func_code
148
if func_ref in state:
149
return func(*args, **kwargs)
151
state[func_ref] = True
152
# Use EPIPE because the close() code checks that (though
154
raise socket.error(errno.EPIPE, "booyah")
157
def patching_factory():
158
connection = self.connection_factory()
159
old_channel = connection.channel
162
result = old_channel()
163
result.wait = error_once(result.wait)
164
result.basic_consume = error_once(result.basic_consume)
165
result.basic_cancel = error_once(result.basic_cancel)
166
result.close = error_once(result.close)
168
connection.channel = new_channel
169
connection.close = error_once(connection.close)
171
receiver = Receiver(config, patching_factory, queue.queue_name)
172
receiver.sentinel = "arhh"
173
channel.basic_publish(
174
amqp.Message("arhh"), queue.exchange_name, routing_key="")
175
receiver.run_forever()
176
self.assertEqual([expected_report], reports)