~oubiwann/pika/master

« back to all changes in this revision

Viewing changes to tests/unit/connection_tests.py

  • Committer: Gavin M. Roy
  • Date: 2016-04-01 12:46:22 UTC
  • mfrom: (924.1.2)
  • Revision ID: git-v1:914a73548f288a8508aad47dd890769e9441f17a
Merge pull request #712 from vitaly-krugl/channel-cleanup

Fixed Channel-related bugs in Channel and Connection classes

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
try:
18
18
    import mock
19
19
except ImportError:
20
 
    from unittest import mock
 
20
    from unittest import mock  # pylint: disable=E0611
21
21
 
22
22
import random
23
23
try:
39
39
    pass
40
40
 
41
41
 
42
 
class ConnectionTests(unittest.TestCase):
 
42
class ConnectionTests(unittest.TestCase):  # pylint: disable=R0904
43
43
 
44
44
    def setUp(self):
 
45
        class ChannelTemplate(channel.Channel):
 
46
            channel_number = None
 
47
 
45
48
        with mock.patch('pika.connection.Connection.connect'):
46
49
            self.connection = connection.Connection()
47
50
            self.connection._set_connection_state(
48
51
                connection.Connection.CONNECTION_OPEN)
49
52
 
50
 
        self.channel = mock.Mock(spec=channel.Channel)
 
53
        self.channel = mock.Mock(spec=ChannelTemplate)
 
54
        self.channel.channel_number = 1
51
55
        self.channel.is_open = True
52
 
        self.connection._channels[1] = self.channel
 
56
        self.channel.is_closing = False
 
57
        self.channel.is_closed = False
 
58
        self.connection._channels[self.channel.channel_number] = self.channel
53
59
 
54
60
    def tearDown(self):
55
61
        del self.connection
56
62
        del self.channel
57
63
 
58
 
    @mock.patch('pika.connection.Connection._send_connection_close')
59
 
    def test_close_closes_open_channels(self, send_connection_close):
60
 
        self.connection.close()
61
 
        self.channel.close.assert_called_once_with(200, 'Normal shutdown')
62
 
 
63
 
    @mock.patch('pika.connection.Connection._send_connection_close')
64
 
    def test_close_ignores_closed_channels(self, send_connection_close):
 
64
    @mock.patch('pika.connection.Connection._on_close_ready')
 
65
    def test_close_calls_on_close_ready_when_no_channels(
 
66
        self,
 
67
        on_close_ready_mock):
 
68
        self.connection._channels = dict()
 
69
        self.connection.close()
 
70
        self.assertTrue(on_close_ready_mock.called,
 
71
                        'on_close_ready_mock should have been called')
 
72
 
 
73
    @mock.patch('pika.connection.Connection._on_close_ready')
 
74
    def test_close_closes_open_channels(self, on_close_ready):
 
75
        self.connection.close()
 
76
        self.channel.close.assert_called_once_with(200, 'Normal shutdown')
 
77
        self.assertFalse(on_close_ready.called)
 
78
 
 
79
    @mock.patch('pika.connection.Connection._on_close_ready')
 
80
    def test_close_closes_opening_channels(self, on_close_ready):
 
81
        self.channel.is_open = False
 
82
        self.channel.is_closing = False
 
83
        self.channel.is_closed = False
 
84
        self.connection.close()
 
85
        self.channel.close.assert_called_once_with(200, 'Normal shutdown')
 
86
        self.assertFalse(on_close_ready.called)
 
87
 
 
88
    @mock.patch('pika.connection.Connection._on_close_ready')
 
89
    def test_close_does_not_close_closing_channels(self, on_close_ready):
 
90
        self.channel.is_open = False
 
91
        self.channel.is_closing = True
 
92
        self.channel.is_closed = False
 
93
        self.connection.close()
 
94
        self.assertFalse(self.channel.close.called)
 
95
        self.assertFalse(on_close_ready.called)
 
96
 
 
97
    @mock.patch('pika.connection.Connection._close_channels')
 
98
    def test_close_bails_out_if_already_closed_or_closing(
 
99
            self, close_channels):
65
100
        for closed_state in (self.connection.CONNECTION_CLOSED,
66
101
                             self.connection.CONNECTION_CLOSING):
67
102
            self.connection.connection_state = closed_state
68
103
            self.connection.close()
69
104
            self.assertFalse(self.channel.close.called)
70
 
 
71
 
    @mock.patch('pika.connection.Connection._on_close_ready')
72
 
    def test_on_close_ready_open_channels(self, on_close_ready):
73
 
        """if open channels _on_close_ready shouldn't be called"""
74
 
        self.connection.close()
75
 
        self.assertFalse(on_close_ready.called,
76
 
                         '_on_close_ready should not have been called')
77
 
 
78
 
    @mock.patch('pika.connection.Connection._on_close_ready')
79
 
    def test_on_close_ready_no_open_channels(self, on_close_ready):
80
 
        self.connection._channels = dict()
81
 
        self.connection.close()
82
 
        self.assertTrue(on_close_ready.called,
83
 
                        '_on_close_ready should have been called')
84
 
 
85
 
    @mock.patch('pika.connection.Connection._on_close_ready')
86
 
    def test_on_channel_cleanup_no_open_channels(self, on_close_ready):
87
 
        """Should call _on_close_ready if connection is closing and there are
88
 
        no open channels
89
 
 
90
 
        """
91
 
        self.connection._channels = dict()
92
 
        self.connection.close()
93
 
        self.assertTrue(on_close_ready.called,
94
 
                        '_on_close_ready should been called')
95
 
 
96
 
    @mock.patch('pika.connection.Connection._on_close_ready')
97
 
    def test_on_channel_cleanup_open_channels(self, on_close_ready):
98
 
        """if connection is closing but channels remain open do not call
 
105
            self.assertEqual(self.connection.connection_state, closed_state)
 
106
 
 
107
    @mock.patch('logging.Logger.critical')
 
108
    def test_deliver_frame_to_channel_with_frame_for_unknown_channel(
 
109
            self,
 
110
            critical_mock):
 
111
        unknown_channel_num = 99
 
112
        self.assertNotIn(unknown_channel_num, self.connection._channels)
 
113
 
 
114
        unexpected_frame = frame.Method(unknown_channel_num, mock.Mock())
 
115
        self.connection._deliver_frame_to_channel(unexpected_frame)
 
116
 
 
117
        critical_mock.assert_called_once_with(
 
118
            'Received %s frame for unregistered channel %i on %s',
 
119
            unexpected_frame.NAME, unknown_channel_num, self.connection)
 
120
 
 
121
    @mock.patch('pika.connection.Connection._on_close_ready')
 
122
    def test_on_channel_cleanup_with_closing_channels(self, on_close_ready):
 
123
        """if connection is closing but closing channels remain, do not call \
99
124
        _on_close_ready
100
125
 
101
126
        """
 
127
        self.channel.is_open = False
 
128
        self.channel.is_closing = True
 
129
        self.channel.is_closed = False
 
130
 
102
131
        self.connection.close()
103
132
        self.assertFalse(on_close_ready.called,
104
133
                         '_on_close_ready should not have been called')
105
134
 
106
135
    @mock.patch('pika.connection.Connection._on_close_ready')
 
136
    def test_on_channel_cleanup_closing_state_last_channel_calls_on_close_ready(
 
137
            self,
 
138
            on_close_ready_mock):
 
139
        self.connection.connection_state = self.connection.CONNECTION_CLOSING
 
140
 
 
141
        self.connection._on_channel_cleanup(self.channel)
 
142
 
 
143
        self.assertTrue(on_close_ready_mock.called,
 
144
                         '_on_close_ready should have been called')
 
145
 
 
146
    @mock.patch('pika.connection.Connection._on_close_ready')
 
147
    def test_on_channel_cleanup_closing_state_more_channels_no_on_close_ready(
 
148
            self,
 
149
            on_close_ready_mock):
 
150
        self.connection.connection_state = self.connection.CONNECTION_CLOSING
 
151
        channel_mock = mock.Mock(channel_number=99, is_closing=True)
 
152
        self.connection._channels[99] = channel_mock
 
153
 
 
154
        self.connection._on_channel_cleanup(self.channel)
 
155
 
 
156
        self.assertFalse(on_close_ready_mock.called,
 
157
                         '_on_close_ready should not have been called')
 
158
 
 
159
    @mock.patch('pika.connection.Connection._on_close_ready')
107
160
    def test_on_channel_cleanup_non_closing_state(self, on_close_ready):
108
161
        """if connection isn't closing _on_close_ready should not be called"""
109
162
        self.connection._on_channel_cleanup(mock.Mock())
116
169
        self.connection.heartbeat = heartbeat
117
170
        self.connection._adapter_disconnect = mock.Mock()
118
171
 
119
 
        self.connection._on_terminate(0, 'Undefined')
 
172
        self.connection._on_terminate(-1, 'Undefined')
120
173
 
121
174
        heartbeat.stop.assert_called_once_with()
122
175
        self.connection._adapter_disconnect.assert_called_once_with()
123
176
 
124
 
        self.assertTrue(self.channel._on_close.called,
125
 
                        'channel._on_close should have been called')
126
 
        method_frame = self.channel._on_close.call_args[0][0]
127
 
        self.assertEqual(method_frame.method.reply_code, 0)
128
 
        self.assertEqual(method_frame.method.reply_text, 'Undefined')
 
177
        self.channel._on_close_meta.assert_called_once_with(-1, 'Undefined')
129
178
 
130
179
        self.assertTrue(self.connection.is_closed)
131
180
 
150
199
                mock.ANY)
151
200
 
152
201
    def test_on_terminate_invokes_protocol_on_connection_error_and_closed(self):
153
 
        """_on_terminate invokes `ON_CONNECTION_ERROR` with `IncompatibleProtocolError` and `ON_CONNECTION_CLOSED` callbacks"""
 
202
        """_on_terminate invokes `ON_CONNECTION_ERROR` with \
 
203
        `IncompatibleProtocolError` and `ON_CONNECTION_CLOSED` callbacks"""
154
204
        with mock.patch.object(self.connection.callbacks, 'process'):
155
205
 
156
206
            self.connection._adapter_disconnect = mock.Mock()
177
227
                1, 'error text')
178
228
 
179
229
    def test_on_terminate_invokes_auth_on_connection_error_and_closed(self):
180
 
        """_on_terminate invokes `ON_CONNECTION_ERROR` with `ProbableAuthenticationError` and `ON_CONNECTION_CLOSED` callbacks"""
 
230
        """_on_terminate invokes `ON_CONNECTION_ERROR` with \
 
231
        `ProbableAuthenticationError` and `ON_CONNECTION_CLOSED` callbacks"""
181
232
        with mock.patch.object(self.connection.callbacks, 'process'):
182
233
 
183
234
            self.connection._adapter_disconnect = mock.Mock()
206
257
 
207
258
    def test_on_terminate_invokes_access_denied_on_connection_error_and_closed(
208
259
            self):
209
 
        """_on_terminate invokes `ON_CONNECTION_ERROR` with `ProbableAccessDeniedError` and `ON_CONNECTION_CLOSED` callbacks"""
 
260
        """_on_terminate invokes `ON_CONNECTION_ERROR` with \
 
261
        `ProbableAccessDeniedError` and `ON_CONNECTION_CLOSED` callbacks"""
210
262
        with mock.patch.object(self.connection.callbacks, 'process'):
211
263
 
212
264
            self.connection._adapter_disconnect = mock.Mock()
292
344
        self.connection._add_channel_callbacks.assert_called_once_with(42)
293
345
        test_channel.open.assert_called_once_with()
294
346
 
 
347
    def test_channel_on_closed_connection_raises_connection_closed(self):
 
348
        self.connection.connection_state = self.connection.CONNECTION_CLOSED
 
349
        with self.assertRaises(exceptions.ConnectionClosed):
 
350
            self.connection.channel(lambda *args: None)
 
351
 
 
352
    def test_channel_on_closing_connection_raises_connection_closed(self):
 
353
        self.connection.connection_state = self.connection.CONNECTION_CLOSING
 
354
        with self.assertRaises(exceptions.ConnectionClosed):
 
355
            self.connection.channel(lambda *args: None)
 
356
 
 
357
    def test_channel_on_init_connection_raises_connection_closed(self):
 
358
        self.connection.connection_state = self.connection.CONNECTION_INIT
 
359
        with self.assertRaises(exceptions.ConnectionClosed):
 
360
            self.connection.channel(lambda *args: None)
 
361
 
 
362
    def test_channel_on_start_connection_raises_connection_closed(self):
 
363
        self.connection.connection_state = self.connection.CONNECTION_START
 
364
        with self.assertRaises(exceptions.ConnectionClosed):
 
365
            self.connection.channel(lambda *args: None)
 
366
 
 
367
    def test_channel_on_protocol_connection_raises_connection_closed(self):
 
368
        self.connection.connection_state = self.connection.CONNECTION_PROTOCOL
 
369
        with self.assertRaises(exceptions.ConnectionClosed):
 
370
            self.connection.channel(lambda *args: None)
 
371
 
 
372
    def test_channel_on_tune_connection_raises_connection_closed(self):
 
373
        self.connection.connection_state = self.connection.CONNECTION_TUNE
 
374
        with self.assertRaises(exceptions.ConnectionClosed):
 
375
            self.connection.channel(lambda *args: None)
 
376
 
295
377
    @mock.patch('pika.frame.ProtocolHeader')
296
378
    def test_connect(self, frame_protocol_header):
297
379
        """make sure the connect method sets the state and sends a frame"""
342
424
 
343
425
    def test_set_backpressure_multiplier(self):
344
426
        """test setting the backpressure multiplier"""
345
 
        self.connection._backpressure = None
 
427
        self.connection._backpressure_multiplier = None
346
428
        self.connection.set_backpressure_multiplier(value=5)
347
 
        self.assertEqual(5, self.connection._backpressure)
 
429
        self.assertEqual(5, self.connection._backpressure_multiplier)
348
430
 
349
431
    def test_close_channels(self):
350
432
        """test closing all channels"""
351
433
        self.connection.connection_state = self.connection.CONNECTION_OPEN
352
434
        self.connection.callbacks = mock.Mock(spec=self.connection.callbacks)
353
 
        open_channel = mock.Mock(is_open=True)
354
 
        closed_channel = mock.Mock(is_open=False)
355
 
        self.connection._channels = {'oc': open_channel, 'cc': closed_channel}
356
 
        self.connection._close_channels('reply code', 'reply text')
357
 
        open_channel.close.assert_called_once_with('reply code', 'reply text')
358
 
        self.assertTrue('oc' in self.connection._channels)
359
 
        self.assertTrue('cc' not in self.connection._channels)
360
 
        self.connection.callbacks.cleanup.assert_called_once_with('cc')
361
 
        #Test on closed channel
 
435
 
 
436
        opening_channel = mock.Mock(is_open=False,
 
437
                                    is_closed=False,
 
438
                                    is_closing=False)
 
439
        open_channel = mock.Mock(is_open=True,
 
440
                                 is_closed=False,
 
441
                                 is_closing=False)
 
442
        closing_channel = mock.Mock(is_open=False,
 
443
                                    is_closed=False,
 
444
                                    is_closing=True)
 
445
        self.connection._channels = {
 
446
            'openingc': opening_channel,
 
447
            'openc': open_channel,
 
448
            'closingc': closing_channel}
 
449
 
 
450
        self.connection._close_channels(400, 'reply text')
 
451
 
 
452
        opening_channel.close.assert_called_once_with(400, 'reply text')
 
453
        open_channel.close.assert_called_once_with(400, 'reply text')
 
454
        self.assertFalse(closing_channel.close.called)
 
455
 
 
456
        self.assertTrue('openingc' in self.connection._channels)
 
457
        self.assertTrue('openc' in self.connection._channels)
 
458
        self.assertTrue('closingc' in self.connection._channels)
 
459
 
 
460
        self.assertFalse(self.connection.callbacks.cleanup.called)
 
461
 
 
462
        # Test on closed connection
362
463
        self.connection.connection_state = self.connection.CONNECTION_CLOSED
363
 
        self.connection._close_channels('reply code', 'reply text')
364
 
        self.assertEqual({}, self.connection._channels)
 
464
        with self.assertRaises(AssertionError):
 
465
            self.connection._close_channels(200, 'reply text')
365
466
 
366
467
    def test_on_connection_start(self):
367
468
        """make sure starting a connection sets the correct class vars"""