~ubuntu-branches/ubuntu/trusty/python-boto/trusty

« back to all changes in this revision

Viewing changes to tests/integration/sqs/test_connection.py

  • Committer: Package Import Robot
  • Author(s): Eric Evans
  • Date: 2013-05-10 23:38:14 UTC
  • mfrom: (1.1.10) (14.1.2 experimental)
  • Revision ID: package-import@ubuntu.com-20130510233814-701dvlop7xfh88i7
Tags: 2.9.2-1
New upstream release (Closes: #700743).

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2006-2010 Mitch Garnaat http://garnaat.org/
 
2
# Copyright (c) 2010, Eucalyptus Systems, Inc.
 
3
# All rights reserved.
 
4
#
 
5
# Permission is hereby granted, free of charge, to any person obtaining a
 
6
# copy of this software and associated documentation files (the
 
7
# "Software"), to deal in the Software without restriction, including
 
8
# without limitation the rights to use, copy, modify, merge, publish, dis-
 
9
# tribute, sublicense, and/or sell copies of the Software, and to permit
 
10
# persons to whom the Software is furnished to do so, subject to the fol-
 
11
# lowing conditions:
 
12
#
 
13
# The above copyright notice and this permission notice shall be included
 
14
# in all copies or substantial portions of the Software.
 
15
#
 
16
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 
17
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
 
18
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
 
19
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
 
20
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 
21
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 
22
# IN THE SOFTWARE.
 
23
 
 
24
"""
 
25
Some unit tests for the SQSConnection
 
26
"""
 
27
 
 
28
import time
 
29
from threading import Timer
 
30
from tests.unit import unittest
 
31
 
 
32
from boto.sqs.connection import SQSConnection
 
33
from boto.sqs.message import Message
 
34
from boto.sqs.message import MHMessage
 
35
from boto.exception import SQSError
 
36
 
 
37
 
 
38
class SQSConnectionTest(unittest.TestCase):
 
39
 
 
40
    sqs = True
 
41
 
 
42
    def test_1_basic(self):
 
43
        print '--- running SQSConnection tests ---'
 
44
        c = SQSConnection()
 
45
        rs = c.get_all_queues()
 
46
        num_queues = 0
 
47
        for q in rs:
 
48
            num_queues += 1
 
49
 
 
50
        # try illegal name
 
51
        try:
 
52
            queue = c.create_queue('bad*queue*name')
 
53
            self.fail('queue name should have been bad')
 
54
        except SQSError:
 
55
            pass
 
56
 
 
57
        # now create one that should work and should be unique (i.e. a new one)
 
58
        queue_name = 'test%d' % int(time.time())
 
59
        timeout = 60
 
60
        queue_1 = c.create_queue(queue_name, timeout)
 
61
        self.addCleanup(c.delete_queue, queue_1, True)
 
62
        time.sleep(60)
 
63
        rs = c.get_all_queues()
 
64
        i = 0
 
65
        for q in rs:
 
66
            i += 1
 
67
        assert i == num_queues + 1
 
68
        assert queue_1.count_slow() == 0
 
69
 
 
70
        # check the visibility timeout
 
71
        t = queue_1.get_timeout()
 
72
        assert t == timeout, '%d != %d' % (t, timeout)
 
73
 
 
74
        # now try to get queue attributes
 
75
        a = q.get_attributes()
 
76
        assert 'ApproximateNumberOfMessages' in a
 
77
        assert 'VisibilityTimeout' in a
 
78
        a = q.get_attributes('ApproximateNumberOfMessages')
 
79
        assert 'ApproximateNumberOfMessages' in a
 
80
        assert 'VisibilityTimeout' not in a
 
81
        a = q.get_attributes('VisibilityTimeout')
 
82
        assert 'ApproximateNumberOfMessages' not in a
 
83
        assert 'VisibilityTimeout' in a
 
84
 
 
85
        # now change the visibility timeout
 
86
        timeout = 45
 
87
        queue_1.set_timeout(timeout)
 
88
        time.sleep(60)
 
89
        t = queue_1.get_timeout()
 
90
        assert t == timeout, '%d != %d' % (t, timeout)
 
91
 
 
92
        # now add a message
 
93
        message_body = 'This is a test\n'
 
94
        message = queue_1.new_message(message_body)
 
95
        queue_1.write(message)
 
96
        time.sleep(60)
 
97
        assert queue_1.count_slow() == 1
 
98
        time.sleep(90)
 
99
 
 
100
        # now read the message from the queue with a 10 second timeout
 
101
        message = queue_1.read(visibility_timeout=10)
 
102
        assert message
 
103
        assert message.get_body() == message_body
 
104
 
 
105
        # now immediately try another read, shouldn't find anything
 
106
        message = queue_1.read()
 
107
        assert message == None
 
108
 
 
109
        # now wait 30 seconds and try again
 
110
        time.sleep(30)
 
111
        message = queue_1.read()
 
112
        assert message
 
113
 
 
114
        # now delete the message
 
115
        queue_1.delete_message(message)
 
116
        time.sleep(30)
 
117
        assert queue_1.count_slow() == 0
 
118
 
 
119
        # try a batch write
 
120
        num_msgs = 10
 
121
        msgs = [(i, 'This is message %d' % i, 0) for i in range(num_msgs)]
 
122
        queue_1.write_batch(msgs)
 
123
 
 
124
        # try to delete all of the messages using batch delete
 
125
        deleted = 0
 
126
        while deleted < num_msgs:
 
127
            time.sleep(5)
 
128
            msgs = queue_1.get_messages(num_msgs)
 
129
            if msgs:
 
130
                br = queue_1.delete_message_batch(msgs)
 
131
                deleted += len(br.results)
 
132
 
 
133
        # create another queue so we can test force deletion
 
134
        # we will also test MHMessage with this queue
 
135
        queue_name = 'test%d' % int(time.time())
 
136
        timeout = 60
 
137
        queue_2 = c.create_queue(queue_name, timeout)
 
138
        self.addCleanup(c.delete_queue, queue_2, True)
 
139
        queue_2.set_message_class(MHMessage)
 
140
        time.sleep(30)
 
141
 
 
142
        # now add a couple of messages
 
143
        message = queue_2.new_message()
 
144
        message['foo'] = 'bar'
 
145
        queue_2.write(message)
 
146
        message_body = {'fie': 'baz', 'foo': 'bar'}
 
147
        message = queue_2.new_message(body=message_body)
 
148
        queue_2.write(message)
 
149
        time.sleep(30)
 
150
 
 
151
        m = queue_2.read()
 
152
        assert m['foo'] == 'bar'
 
153
 
 
154
        print '--- tests completed ---'
 
155
 
 
156
    def test_sqs_timeout(self):
 
157
        c = SQSConnection()
 
158
        queue_name = 'test_sqs_timeout_%s' % int(time.time())
 
159
        queue = c.create_queue(queue_name)
 
160
        self.addCleanup(c.delete_queue, queue, True)
 
161
        start = time.time()
 
162
        poll_seconds = 2
 
163
        response = queue.read(visibility_timeout=None,
 
164
                              wait_time_seconds=poll_seconds)
 
165
        total_time = time.time() - start
 
166
        self.assertTrue(total_time > poll_seconds,
 
167
                        "SQS queue did not block for at least %s seconds: %s" %
 
168
                        (poll_seconds, total_time))
 
169
        self.assertIsNone(response)
 
170
 
 
171
        # Now that there's an element in the queue, we should not block for 2
 
172
        # seconds.
 
173
        c.send_message(queue, 'test message')
 
174
        start = time.time()
 
175
        poll_seconds = 2
 
176
        message = c.receive_message(
 
177
            queue, number_messages=1,
 
178
            visibility_timeout=None, attributes=None,
 
179
            wait_time_seconds=poll_seconds)[0]
 
180
        total_time = time.time() - start
 
181
        self.assertTrue(total_time < poll_seconds,
 
182
                        "SQS queue blocked longer than %s seconds: %s" %
 
183
                        (poll_seconds, total_time))
 
184
        self.assertEqual(message.get_body(), 'test message')
 
185
 
 
186
        attrs = c.get_queue_attributes(queue, 'ReceiveMessageWaitTimeSeconds')
 
187
        self.assertEqual(attrs['ReceiveMessageWaitTimeSeconds'], '0')
 
188
 
 
189
    def test_sqs_longpoll(self):
 
190
        c = SQSConnection()
 
191
        queue_name = 'test_sqs_longpoll_%s' % int(time.time())
 
192
        queue = c.create_queue(queue_name)
 
193
        self.addCleanup(c.delete_queue, queue, True)
 
194
        messages = []
 
195
 
 
196
        # The basic idea is to spawn a timer thread that will put something
 
197
        # on the queue in 5 seconds and verify that our long polling client
 
198
        # sees the message after waiting for approximately that long.
 
199
        def send_message():
 
200
            messages.append(
 
201
                queue.write(queue.new_message('this is a test message')))
 
202
 
 
203
        t = Timer(5.0, send_message)
 
204
        t.start()
 
205
        self.addCleanup(t.join)
 
206
 
 
207
        start = time.time()
 
208
        response = queue.read(wait_time_seconds=10)
 
209
        end = time.time()
 
210
 
 
211
        t.join()
 
212
        self.assertEqual(response.id, messages[0].id)
 
213
        self.assertEqual(response.get_body(), messages[0].get_body())
 
214
        # The timer thread should send the message in 5 seconds, so
 
215
        # we're giving +- .5 seconds for the total time the queue
 
216
        # was blocked on the read call.
 
217
        self.assertTrue(4.5 <= (end - start) <= 5.5)
 
218
 
 
219
    def test_queue_deletion_affects_full_queues(self):
 
220
        conn = SQSConnection()
 
221
        initial_count = len(conn.get_all_queues())
 
222
 
 
223
        empty = conn.create_queue('empty%d' % int(time.time()))
 
224
        full = conn.create_queue('full%d' % int(time.time()))
 
225
        time.sleep(60)
 
226
        # Make sure they're both around.
 
227
        self.assertEqual(len(conn.get_all_queues()), initial_count + 2)
 
228
 
 
229
        # Put a message in the full queue.
 
230
        m1 = Message()
 
231
        m1.set_body('This is a test message.')
 
232
        full.write(m1)
 
233
        self.assertEqual(full.count(), 1)
 
234
 
 
235
        self.assertTrue(conn.delete_queue(empty))
 
236
        # Here's the regression for the docs. SQS will delete a queue with
 
237
        # messages in it, no ``force_deletion`` needed.
 
238
        self.assertTrue(conn.delete_queue(full))
 
239
        # Wait long enough for SQS to finally remove the queues.
 
240
        time.sleep(90)
 
241
        self.assertEqual(len(conn.get_all_queues()), initial_count)