~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/boto/boto/tests/test_sqsconnection.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
 
 
3
# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/
 
4
#
 
5
# Permission is hereby granted, free of charge, to any person obtaining a
 
6
# copy of this software and associated documentation files (the
 
7
# "Software"), to deal in the Software without restriction, including
 
8
# without limitation the rights to use, copy, modify, merge, publish, dis-
 
9
# tribute, sublicense, and/or sell copies of the Software, and to permit
 
10
# persons to whom the Software is furnished to do so, subject to the fol-
 
11
# lowing conditions:
 
12
#
 
13
# The above copyright notice and this permission notice shall be included
 
14
# in all copies or substantial portions of the Software.
 
15
#
 
16
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 
17
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
 
18
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
 
19
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 
 
20
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 
21
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 
22
# IN THE SOFTWARE.
 
23
 
 
24
"""
 
25
Some unit tests for the SQSConnection
 
26
"""
 
27
 
 
28
import unittest
 
29
import time
 
30
from boto.sqs.connection import SQSConnection
 
31
from boto.sqs.message import MHMessage
 
32
from boto.exception import SQSError
 
33
 
 
34
class SQSConnectionTest (unittest.TestCase):
 
35
 
 
36
    def test_1_basic(self):
 
37
        print '--- running SQSConnection tests ---'
 
38
        c = SQSConnection()
 
39
        rs = c.get_all_queues()
 
40
        num_queues = 0
 
41
        for q in rs:
 
42
            num_queues += 1
 
43
    
 
44
        # try illegal name
 
45
        try:
 
46
            queue = c.create_queue('bad_queue_name')
 
47
        except SQSError:
 
48
            pass
 
49
        
 
50
        # now create one that should work and should be unique (i.e. a new one)
 
51
        queue_name = 'test%d' % int(time.time())
 
52
        timeout = 60
 
53
        queue = c.create_queue(queue_name, timeout)
 
54
        time.sleep(60)
 
55
        rs  = c.get_all_queues()
 
56
        i = 0
 
57
        for q in rs:
 
58
            i += 1
 
59
        assert i == num_queues+1
 
60
        assert queue.count_slow() == 0
 
61
 
 
62
        # check the visibility timeout
 
63
        t = queue.get_timeout()
 
64
        assert t == timeout, '%d != %d' % (t, timeout)
 
65
 
 
66
        # now try to get queue attributes
 
67
        a = q.get_attributes()
 
68
        assert a.has_key('ApproximateNumberOfMessages')
 
69
        assert a.has_key('VisibilityTimeout')
 
70
        a = q.get_attributes('ApproximateNumberOfMessages')
 
71
        assert a.has_key('ApproximateNumberOfMessages')
 
72
        assert not a.has_key('VisibilityTimeout')
 
73
        a = q.get_attributes('VisibilityTimeout')
 
74
        assert not a.has_key('ApproximateNumberOfMessages')
 
75
        assert a.has_key('VisibilityTimeout')
 
76
 
 
77
        # now change the visibility timeout
 
78
        timeout = 45
 
79
        queue.set_timeout(timeout)
 
80
        time.sleep(60)
 
81
        t = queue.get_timeout()
 
82
        assert t == timeout, '%d != %d' % (t, timeout)
 
83
    
 
84
        # now add a message
 
85
        message_body = 'This is a test\n'
 
86
        message = queue.new_message(message_body)
 
87
        queue.write(message)
 
88
        time.sleep(30)
 
89
        assert queue.count_slow() == 1
 
90
        time.sleep(30)
 
91
 
 
92
        # now read the message from the queue with a 10 second timeout
 
93
        message = queue.read(visibility_timeout=10)
 
94
        assert message
 
95
        assert message.get_body() == message_body
 
96
 
 
97
        # now immediately try another read, shouldn't find anything
 
98
        message = queue.read()
 
99
        assert message == None
 
100
 
 
101
        # now wait 30 seconds and try again
 
102
        time.sleep(30)
 
103
        message = queue.read()
 
104
        assert message
 
105
 
 
106
        if c.APIVersion == '2007-05-01':
 
107
            # now terminate the visibility timeout for this message
 
108
            message.change_visibility(0)
 
109
            # now see if we can read it in the queue
 
110
            message = queue.read()
 
111
            assert message
 
112
 
 
113
        # now delete the message
 
114
        queue.delete_message(message)
 
115
        time.sleep(30)
 
116
        assert queue.count_slow() == 0
 
117
 
 
118
        # create another queue so we can test force deletion
 
119
        # we will also test MHMessage with this queue
 
120
        queue_name = 'test%d' % int(time.time())
 
121
        timeout = 60
 
122
        queue = c.create_queue(queue_name, timeout)
 
123
        queue.set_message_class(MHMessage)
 
124
        time.sleep(30)
 
125
        
 
126
        # now add a couple of messages
 
127
        message = queue.new_message()
 
128
        message['foo'] = 'bar'
 
129
        queue.write(message)
 
130
        message_body = {'fie' : 'baz', 'foo' : 'bar'}
 
131
        message = queue.new_message(body=message_body)
 
132
        queue.write(message)
 
133
        time.sleep(30)
 
134
 
 
135
        m = queue.read()
 
136
        assert m['foo'] == 'bar'
 
137
 
 
138
        # now delete that queue and messages
 
139
        c.delete_queue(queue, True)
 
140
 
 
141
        print '--- tests completed ---'
 
142