~ubuntu-branches/ubuntu/trusty/python-boto/trusty

« back to all changes in this revision

Viewing changes to tests/sqs/test_connection.py

  • Committer: Package Import Robot
  • Author(s): Eric Evans
  • Date: 2011-11-13 11:58:40 UTC
  • mfrom: (14.1.1 experimental)
  • Revision ID: package-import@ubuntu.com-20111113115840-ckzyt3h17uh8s41y
Tags: 2.0-2
Promote new upstream to unstable (Closes: #638931).

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2006-2010 Mitch Garnaat http://garnaat.org/
 
2
# Copyright (c) 2010, Eucalyptus Systems, Inc.
 
3
# All rights reserved.
 
4
#
 
5
# Permission is hereby granted, free of charge, to any person obtaining a
 
6
# copy of this software and associated documentation files (the
 
7
# "Software"), to deal in the Software without restriction, including
 
8
# without limitation the rights to use, copy, modify, merge, publish, dis-
 
9
# tribute, sublicense, and/or sell copies of the Software, and to permit
 
10
# persons to whom the Software is furnished to do so, subject to the fol-
 
11
# lowing conditions:
 
12
#
 
13
# The above copyright notice and this permission notice shall be included
 
14
# in all copies or substantial portions of the Software.
 
15
#
 
16
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 
17
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
 
18
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
 
19
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 
 
20
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 
21
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 
22
# IN THE SOFTWARE.
 
23
 
 
24
"""
 
25
Some unit tests for the SQSConnection
 
26
"""
 
27
 
 
28
import unittest
 
29
import time
 
30
from boto.sqs.connection import SQSConnection
 
31
from boto.sqs.message import MHMessage
 
32
from boto.exception import SQSError
 
33
 
 
34
class SQSConnectionTest (unittest.TestCase):
 
35
 
 
36
    def test_1_basic(self):
 
37
        print '--- running SQSConnection tests ---'
 
38
        c = SQSConnection()
 
39
        rs = c.get_all_queues()
 
40
        num_queues = 0
 
41
        for q in rs:
 
42
            num_queues += 1
 
43
    
 
44
        # try illegal name
 
45
        try:
 
46
            queue = c.create_queue('bad*queue*name')
 
47
            self.fail('queue name should have been bad')
 
48
        except SQSError:
 
49
            pass
 
50
        
 
51
        # now create one that should work and should be unique (i.e. a new one)
 
52
        queue_name = 'test%d' % int(time.time())
 
53
        timeout = 60
 
54
        queue = c.create_queue(queue_name, timeout)
 
55
        time.sleep(60)
 
56
        rs  = c.get_all_queues()
 
57
        i = 0
 
58
        for q in rs:
 
59
            i += 1
 
60
        assert i == num_queues+1
 
61
        assert queue.count_slow() == 0
 
62
 
 
63
        # check the visibility timeout
 
64
        t = queue.get_timeout()
 
65
        assert t == timeout, '%d != %d' % (t, timeout)
 
66
 
 
67
        # now try to get queue attributes
 
68
        a = q.get_attributes()
 
69
        assert a.has_key('ApproximateNumberOfMessages')
 
70
        assert a.has_key('VisibilityTimeout')
 
71
        a = q.get_attributes('ApproximateNumberOfMessages')
 
72
        assert a.has_key('ApproximateNumberOfMessages')
 
73
        assert not a.has_key('VisibilityTimeout')
 
74
        a = q.get_attributes('VisibilityTimeout')
 
75
        assert not a.has_key('ApproximateNumberOfMessages')
 
76
        assert a.has_key('VisibilityTimeout')
 
77
 
 
78
        # now change the visibility timeout
 
79
        timeout = 45
 
80
        queue.set_timeout(timeout)
 
81
        time.sleep(60)
 
82
        t = queue.get_timeout()
 
83
        assert t == timeout, '%d != %d' % (t, timeout)
 
84
    
 
85
        # now add a message
 
86
        message_body = 'This is a test\n'
 
87
        message = queue.new_message(message_body)
 
88
        queue.write(message)
 
89
        time.sleep(60)
 
90
        assert queue.count_slow() == 1
 
91
        time.sleep(90)
 
92
 
 
93
        # now read the message from the queue with a 10 second timeout
 
94
        message = queue.read(visibility_timeout=10)
 
95
        assert message
 
96
        assert message.get_body() == message_body
 
97
 
 
98
        # now immediately try another read, shouldn't find anything
 
99
        message = queue.read()
 
100
        assert message == None
 
101
 
 
102
        # now wait 30 seconds and try again
 
103
        time.sleep(30)
 
104
        message = queue.read()
 
105
        assert message
 
106
 
 
107
        # now delete the message
 
108
        queue.delete_message(message)
 
109
        time.sleep(30)
 
110
        assert queue.count_slow() == 0
 
111
 
 
112
        # create another queue so we can test force deletion
 
113
        # we will also test MHMessage with this queue
 
114
        queue_name = 'test%d' % int(time.time())
 
115
        timeout = 60
 
116
        queue = c.create_queue(queue_name, timeout)
 
117
        queue.set_message_class(MHMessage)
 
118
        time.sleep(30)
 
119
        
 
120
        # now add a couple of messages
 
121
        message = queue.new_message()
 
122
        message['foo'] = 'bar'
 
123
        queue.write(message)
 
124
        message_body = {'fie' : 'baz', 'foo' : 'bar'}
 
125
        message = queue.new_message(body=message_body)
 
126
        queue.write(message)
 
127
        time.sleep(30)
 
128
 
 
129
        m = queue.read()
 
130
        assert m['foo'] == 'bar'
 
131
 
 
132
        # now delete that queue and messages
 
133
        c.delete_queue(queue, True)
 
134
 
 
135
        print '--- tests completed ---'
 
136