~lifeless/python-oops-amqp/0.0.3

« back to all changes in this revision

Viewing changes to oops_amqp/tests/test_receiver.py

  • Committer: Robert Collins
  • Date: 2011-10-18 03:48:34 UTC
  • mfrom: (2.1.11 0.0.2)
  • Revision ID: robertc@robertcollins.net-20111018034834-hu936k8vmpz3djwl
0.0.2
-----

* Fix documentation warts from initial release, updated to 0.0.2 and prepare
  for making the receiver deal with interrupted services.
  (Robert Collins, #875976, #875984)

* Fix Receiver.run_forever to actually run forever. (Robert Collins)

* Change API for constructing Receiver to take a connection factory rather than
  a channel. This will permit handling transient faults internally rather than
  forcing a restart. (Robert Collins)

* Implement resiliency for the Receiver: automatically reconnect if a socket
  error is received from rabbit, for up to two minutes of downtime.
  (Robert Collins)

* The publisher can honour any existing oops id if desired (set
  inherit_id=True). This is useful if using a chain of publishers where
  AMQP is the last stage rather than the primary mechanism.
  (Robert Collins)

* Receiver.sentinel may be set and used to cause the run_forever loop to exit
  when a crafted message is received. This is useful for testing, and defaults
  to None (so cannot be used to shutdown a normally running service).
  (Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Tests for AMQP receiving."""
18
18
 
 
19
import errno
 
20
import socket
 
21
 
19
22
from amqplib import client_0_8 as amqp
20
23
import bson
21
24
from oops import Config
22
25
 
23
26
from oops_amqp import Receiver
24
27
from oops_amqp.tests import (
 
28
    ChannelFixture,
25
29
    QueueFixture,
26
30
    TestCase,
27
31
    )
29
33
 
30
34
class TestReceiver(TestCase):
31
35
 
32
 
    def test_receive_one(self):
33
 
        # Receiving a message from AMQP should republish it unaltered.
34
 
        reports = []
35
 
        def capture(report):
36
 
            reports.append(report)
37
 
            return report['id']
38
 
        expected_report = {'id': 'foo', 'otherkey': 42}
39
 
        message = amqp.Message(bson.dumps(expected_report))
40
 
        connection = self.connection_factory()
41
 
        self.addCleanup(connection.close)
42
 
        channel = connection.channel()
43
 
        self.addCleanup(channel.close)
44
 
        queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
45
 
        channel.basic_publish(message, queue.exchange_name, routing_key="")
46
 
        config = Config()
47
 
        config.publishers.append(capture)
48
 
        receiver_channel = connection.channel()
49
 
        self.addCleanup(receiver_channel.close)
50
 
        receiver = Receiver(config, receiver_channel, queue.queue_name)
 
36
    def test_stop_on_sentinel(self):
 
37
        # A sentinel can be used to stop the receiver (useful for testing).
 
38
        reports = []
 
39
        def capture(report):
 
40
            reports.append(report)
 
41
            return report['id']
 
42
        expected_report = {'id': 'foo', 'otherkey': 42}
 
43
        message = amqp.Message(bson.dumps(expected_report))
 
44
        channel = self.useFixture(
 
45
            ChannelFixture(self.connection_factory)).channel
 
46
        queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
 
47
        channel.basic_publish(
 
48
            message, queue.exchange_name, routing_key="")
 
49
        sentinel = "xxx"
 
50
        channel.basic_publish(
 
51
            amqp.Message(sentinel), queue.exchange_name, routing_key="")
 
52
        config = Config()
 
53
        config.publishers.append(capture)
 
54
        receiver = Receiver(config, self.connection_factory, queue.queue_name)
 
55
        receiver.sentinel = sentinel
 
56
        receiver.run_forever()
 
57
        self.assertEqual([expected_report], reports)
 
58
 
 
59
    def test_stop_via_stopping(self):
 
60
        # Setting the stopping field should stop the run_forever loop.
 
61
        reports = []
 
62
        def capture(report):
 
63
            reports.append(report)
 
64
            return report['id']
 
65
        expected_report = {'id': 'foo', 'otherkey': 42}
 
66
        message = amqp.Message(bson.dumps(expected_report))
 
67
        channel = self.useFixture(
 
68
            ChannelFixture(self.connection_factory)).channel
 
69
        queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
 
70
        channel.basic_publish(
 
71
            message, queue.exchange_name, routing_key="")
 
72
        config = Config()
 
73
        config.publishers.append(capture)
51
74
        # We don't want to loop forever: patch the channel so that after one
52
75
        # call to wait (which will get our injected message) the loop will shut
53
 
        # down. This also checks we use the consume_tag correctly.
54
 
        old_wait = receiver_channel.wait
55
 
        def new_wait(allowed_methods=None):
56
 
            receiver.stopping = True
57
 
            return old_wait(allowed_methods=allowed_methods)
58
 
        receiver_channel.wait = new_wait
 
76
        # down.
 
77
        def patching_factory():
 
78
            connection = self.connection_factory()
 
79
            old_channel = connection.channel
 
80
            def new_channel():
 
81
                result = old_channel()
 
82
                old_wait = result.wait
 
83
                def new_wait(allowed_methods=None):
 
84
                    receiver.stopping = True
 
85
                    return old_wait(allowed_methods=allowed_methods)
 
86
                result.wait = new_wait
 
87
                return result
 
88
            connection.channel = new_channel
 
89
            return connection
 
90
        receiver = Receiver(config, patching_factory, queue.queue_name)
59
91
        receiver.run_forever()
60
92
        self.assertEqual([expected_report], reports)
61
93
 
62
94
    def test_run_forever(self):
63
 
        # run_forever subscribes and then calls wait.
 
95
        # run_forever subscribes and then calls wait in a loop.
64
96
        config = None
 
97
        calls = []
65
98
        class FakeChannel:
66
 
            def __init__(self):
67
 
                self.calls = []
 
99
            def __init__(self, calls):
 
100
                self.calls = calls
68
101
            def basic_consume(self, queue_name, callback=None):
69
102
                self.calls.append(('basic_consume', queue_name, callback))
 
103
                return 'tag'
70
104
            def wait(self):
71
105
                self.calls.append(('wait',))
72
 
        channel = FakeChannel()
73
 
        receiver = Receiver(None, channel, 'foo')
 
106
                if len(self.calls) > 2:
 
107
                    receiver.stopping = True
 
108
            def basic_cancel(self, tag):
 
109
                self.calls.append(('basic_cancel', tag))
 
110
            def close(self):
 
111
                pass
 
112
        class FakeConnection:
 
113
            def channel(self):
 
114
                return FakeChannel(calls)
 
115
            def close(self):
 
116
                pass
 
117
        receiver = Receiver(None, FakeConnection, 'foo')
74
118
        receiver.run_forever()
75
119
        self.assertEqual(
76
 
            [('basic_consume', 'foo', receiver.handle_report), ('wait',)],
77
 
            channel.calls)
 
120
            [('basic_consume', 'foo', receiver.handle_report),
 
121
             ('wait',),
 
122
             ('wait',),
 
123
             ('basic_cancel', 'tag')],
 
124
            calls)
 
125
 
 
126
    def test_tolerates_amqp_trouble(self):
 
127
        # If the AMQP server is unavailable for a short period, the receiver
 
128
        # will automatically reconnect.
 
129
        # Break a connection to raise socket.error (which we know from the 
 
130
        # publisher tests is what leaks through when rabbit is shutdown).
 
131
        # We raise it the first time on each amqp method call.
 
132
        reports = []
 
133
        def capture(report):
 
134
            reports.append(report)
 
135
            return report['id']
 
136
        expected_report = {'id': 'foo', 'otherkey': 42}
 
137
        message = amqp.Message(bson.dumps(expected_report))
 
138
        channel = self.useFixture(
 
139
            ChannelFixture(self.connection_factory)).channel
 
140
        queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
 
141
        channel.basic_publish(message, queue.exchange_name, routing_key="")
 
142
        config = Config()
 
143
        config.publishers.append(capture)
 
144
        state = {}
 
145
        def error_once(func):
 
146
            def wrapped(*args, **kwargs):
 
147
                func_ref = func.func_code
 
148
                if func_ref in state:
 
149
                    return func(*args, **kwargs)
 
150
                else:
 
151
                    state[func_ref] = True
 
152
                    # Use EPIPE because the close() code checks that (though
 
153
                    # the rest doesn't)
 
154
                    raise socket.error(errno.EPIPE, "booyah")
 
155
            return wrapped
 
156
        @error_once
 
157
        def patching_factory():
 
158
            connection = self.connection_factory()
 
159
            old_channel = connection.channel
 
160
            @error_once
 
161
            def new_channel():
 
162
                result = old_channel()
 
163
                result.wait = error_once(result.wait)
 
164
                result.basic_consume = error_once(result.basic_consume)
 
165
                result.basic_cancel = error_once(result.basic_cancel)
 
166
                result.close = error_once(result.close)
 
167
                return result
 
168
            connection.channel = new_channel
 
169
            connection.close = error_once(connection.close)
 
170
            return connection
 
171
        receiver = Receiver(config, patching_factory, queue.queue_name)
 
172
        receiver.sentinel = "arhh"
 
173
        channel.basic_publish(
 
174
            amqp.Message("arhh"), queue.exchange_name, routing_key="")
 
175
        receiver.run_forever()
 
176
        self.assertEqual([expected_report], reports)