~ubuntu-branches/ubuntu/lucid/python-boto/lucid

« back to all changes in this revision

Viewing changes to boto/sqs/connection.py

  • Committer: Bazaar Package Importer
  • Author(s): Eric Evans
  • Date: 2008-04-20 04:41:52 UTC
  • mfrom: (1.1.2 upstream)
  • Revision ID: james.westby@ubuntu.com-20080420044152-trgt1ygxt1yumbg4
Tags: 1.2a-1
New Upstream Version

Show diffs side-by-side

added added

removed removed

Lines of Context:
19
19
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20
20
# IN THE SOFTWARE.
21
21
 
22
 
from boto.connection import AWSAuthConnection, AWSQueryConnection
 
22
from boto.connection import AWSQueryConnection
23
23
import xml.sax
24
24
from boto.sqs.queue import Queue
 
25
from boto.sqs.message import Message
25
26
from boto.sqs.attributes import Attributes
26
27
from boto import handler
27
28
from boto.resultset import ResultSet
28
29
from boto.exception import SQSError
29
30
 
30
 
class SQSQueryConnection(AWSQueryConnection):
 
31
PERM_ReceiveMessage = 'ReceiveMessage'
 
32
PERM_SendMessage = 'SendMessage'
 
33
PERM_FullControl = 'FullControl'
 
34
 
 
35
AllPermissions = [PERM_ReceiveMessage, PERM_SendMessage, PERM_FullControl]
 
36
                 
 
37
class SQSConnection(AWSQueryConnection):
31
38
 
32
39
    """
33
 
    This class uses the Query API (boo!) to SQS to access some of the
34
 
    new features which have not yet been added to the REST api (yeah!).
 
40
    A subclass of the original SQSQueryConnection targeting the 2008-01-01 SQS API.
35
41
    """
36
42
    
37
43
    DefaultHost = 'queue.amazonaws.com'
38
 
    APIVersion = '2007-05-01'
 
44
    APIVersion = '2008-01-01'
39
45
    SignatureVersion = '1'
40
46
    DefaultContentType = 'text/plain'
 
47
    ResponseError = SQSError
41
48
    
42
49
    def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
43
50
                 is_secure=False, port=None, proxy=None, proxy_port=None,
44
 
                 host=DefaultHost, debug=0, https_connection_factory=None):
45
 
        AWSQueryConnection.__init__(self, aws_access_key_id,
46
 
                                    aws_secret_access_key,
47
 
                                    is_secure, port, proxy, proxy_port,
 
51
                 proxy_user=None, proxy_pass=None, host=DefaultHost, debug=0,
 
52
                 https_connection_factory=None):
 
53
        AWSQueryConnection.__init__(self, aws_access_key_id, aws_secret_access_key,
 
54
                                    is_secure, port, proxy, proxy_port, proxy_user, proxy_pass,
48
55
                                    host, debug, https_connection_factory)
49
56
 
50
 
    def get_queue_attributes(self, queue_url, attribute='All'):
51
 
        params = {'Attribute' : attribute}
52
 
        response = self.make_request('GetQueueAttributes', params, queue_url)
53
 
        body = response.read()
54
 
        if response.status == 200:
55
 
            attrs = Attributes()
56
 
            h = handler.XmlHandler(attrs, self)
57
 
            xml.sax.parseString(body, h)
58
 
            return attrs
59
 
        else:
60
 
            raise SQSError(response.status, response.reason, body)
61
 
 
62
 
    def set_queue_attribute(self, queue_url, attribute, value):
63
 
        params = {'Attribute' : attribute, 'Value' : value}
64
 
        response = self.make_request('SetQueueAttributes', params, queue_url)
65
 
        body = response.read()
66
 
        if response.status == 200:
67
 
            rs = ResultSet()
68
 
            h = handler.XmlHandler(rs, self)
69
 
            xml.sax.parseString(body, h)
70
 
            return rs.status
71
 
        else:
72
 
            raise SQSError(response.status, response.reason, body)
73
 
 
74
 
    def change_message_visibility(self, queue_url, message_id, vtimeout):
75
 
        params = {'MessageId' : message_id,
76
 
                  'VisibilityTimeout' : vtimeout}
77
 
        response = self.make_request('ChangeMessageVisibility', params,
78
 
                                     queue_url)
79
 
        body = response.read()
80
 
        if response.status == 200:
81
 
            rs = ResultSet()
82
 
            h = handler.XmlHandler(rs, self)
83
 
            xml.sax.parseString(body, h)
84
 
            return rs.status
85
 
        else:
86
 
            raise SQSError(response.status, response.reason, body)
87
 
        
88
 
class SQSConnection(AWSAuthConnection):
89
 
    
90
 
    DefaultHost = 'queue.amazonaws.com'
91
 
    DefaultVersion = '2007-05-01'
92
 
    DefaultContentType = 'text/plain'
93
 
    
94
 
    def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
95
 
                 is_secure=False, port=None, proxy=None, proxy_port=None,
96
 
                 host=DefaultHost, debug=0, https_connection_factory=None):
97
 
        AWSAuthConnection.__init__(self, host,
98
 
                                   aws_access_key_id, aws_secret_access_key,
99
 
                                   is_secure, port, proxy, proxy_port, debug,
100
 
                                   https_connection_factory)
101
 
        self.query_conn = None
102
 
 
103
 
    def make_request(self, method, path, headers=None, data=''):
104
 
        # add auth header
105
 
        if headers == None:
106
 
            headers = {}
107
 
 
108
 
        if not headers.has_key('AWS-Version'):
109
 
            headers['AWS-Version'] = self.DefaultVersion
110
 
 
111
 
        if not headers.has_key('Content-Type'):
112
 
            headers['Content-Type'] = self.DefaultContentType
113
 
 
114
 
        return AWSAuthConnection.make_request(self, method, path,
115
 
                                              headers, data)
116
 
 
117
 
    def get_query_connection(self):
118
 
        if not self.query_conn:
119
 
            self.query_conn = SQSQueryConnection(self.aws_access_key_id,
120
 
                                                 self.aws_secret_access_key,
121
 
                                                 self.is_secure, self.port,
122
 
                                                 self.proxy, self.proxy_port,
123
 
                                                 self.server, self.debug,
124
 
                                                 self.https_connection_factory)
125
 
        return self.query_conn
126
 
 
127
 
    def get_all_queues(self, prefix=''):
128
 
        if prefix:
129
 
            path = '/?QueueNamePrefix=%s' % prefix
130
 
        else:
131
 
            path = '/'
132
 
        response = self.make_request('GET', path)
133
 
        body = response.read()
134
 
        if response.status >= 300:
135
 
            raise SQSError(response.status, response.reason, body)
136
 
        rs = ResultSet([('QueueUrl', Queue)])
137
 
        h = handler.XmlHandler(rs, self)
138
 
        xml.sax.parseString(body, h)
139
 
        return rs
140
 
 
141
 
    def get_queue(self, queue_name):
142
 
        i = 0
143
 
        rs = self.get_all_queues(queue_name)
144
 
        for q in rs:
145
 
            i += 1
146
 
        if i != 1:
147
 
            return None
148
 
        else:
149
 
            return q
150
 
 
151
 
    def get_queue_attributes(self, queue_url, attribute='All'):
152
 
        """
153
 
        Performs a GetQueueAttributes request and returns an Attributes
154
 
        instance (subclass of a Dictionary) holding the requested
155
 
        attribute name/value pairs.
156
 
        Inputs:
157
 
            queue_url - the URL of the desired SQS queue
158
 
            attribute - All|ApproximateNumberOfMessages|VisibilityTimeout
159
 
                        Default value is "All"
160
 
        Returns:
161
 
            An Attribute object which is a mapping type holding the
162
 
            requested name/value pairs
163
 
        """
164
 
        qc = self.get_query_connection()
165
 
        return qc.get_queue_attributes(queue_url, attribute)
166
 
    
167
 
    def set_queue_attribute(self, queue_url, attribute, value):
168
 
        """
169
 
        Performs a SetQueueAttributes request.
170
 
        Inputs:
171
 
            queue_url - The URL of the desired SQS queue
172
 
            attribute - The name of the attribute you want to set.  The
173
 
                        only valid value at this time is: VisibilityTimeout
174
 
                value - The new value for the attribute.
175
 
                        For VisibilityTimeout the value must be an
176
 
                        integer number of seconds from 0 to 86400.
177
 
        Returns:
178
 
            Boolean True if successful, otherwise False.
179
 
        """
180
 
        qc = self.get_query_connection()
181
 
        return qc.set_queue_attribute(queue_url, attribute, value)
182
 
 
183
 
    def change_message_visibility(self, queue_url, message_id, vtimeout):
184
 
        """
185
 
        Change the VisibilityTimeout for an individual message.
186
 
        Inputs:
187
 
            queue_url - The URL of the desired SQS queue
188
 
            message_id - The ID of the message whose timeout will be changed
189
 
            vtimeout - The new VisibilityTimeout value, in seconds
190
 
        Returns:
191
 
            Boolean True if successful, otherwise False
192
 
        Note: This functionality is also available as a method of the
193
 
              Message object.
194
 
        """
195
 
        qc = self.get_query_connection()
196
 
        return qc.change_message_visibility(queue_url, message_id, vtimeout)
197
 
    
198
57
    def create_queue(self, queue_name, visibility_timeout=None):
199
 
        """
200
 
        Create a new queue.
201
 
        Inputs:
202
 
            queue_name - The name of the new queue
203
 
            visibility_timeout - (Optional) The default visibility
204
 
                                 timeout for the new queue.
205
 
        Returns:
206
 
            A new Queue object representing the newly created queue.
207
 
        """
208
 
        path = '/?QueueName=%s' % queue_name
 
58
        params = {'QueueName': queue_name}
209
59
        if visibility_timeout:
210
 
            path = path + '&DefaultVisibilityTimeout=%d' % visibility_timeout
211
 
        response = self.make_request('POST', path)
212
 
        body = response.read()
213
 
        if response.status >= 300:
214
 
            raise SQSError(response.status, response.reason, body)
215
 
        q = Queue(self)
216
 
        h = handler.XmlHandler(q, self)
217
 
        xml.sax.parseString(body, h)
218
 
        return q
 
60
            params['DefaultVisibilityTimeout'] = '%d' % (visibility_timeout,)
 
61
        return self.get_object('CreateQueue', params, Queue)
219
62
 
220
63
    def delete_queue(self, queue, force_deletion=False):
221
64
        """
234
77
            should probably return a Boolean indicating success or
235
78
            failure.
236
79
        """
237
 
        if force_deletion:
238
 
            path = 'DELETE?ForceDeletion=true'
239
 
        else:
240
 
            path = 'DELETE'
241
 
        response = self.make_request(path, queue.id)
242
 
        body = response.read()
243
 
        if response.status >= 300:
244
 
            raise SQSError(response.status, response.reason, body)
245
 
        rs = ResultSet()
246
 
        h = handler.XmlHandler(rs, self)
247
 
        xml.sax.parseString(body, h)
248
 
        return rs
 
80
        return self.get_status('DeleteQueue', None, queue.id)
 
81
 
 
82
    def get_queue_attributes(self, queue_url, attribute='All'):
 
83
        params = {'AttributeName' : attribute}
 
84
        return self.get_object('GetQueueAttributes', params, Attributes, queue_url)
 
85
 
 
86
    def set_queue_attribute(self, queue_url, attribute, value):
 
87
        params = {'Attribute.Name' : attribute, 'Attribute.Value' : value}
 
88
        return self.get_status('SetQueueAttributes', params, queue_url)
 
89
 
 
90
    def receive_message(self, queue_url, number_messages=1,
 
91
                        visibility_timeout=None, message_class=Message):
 
92
        params = {'MaxNumberOfMessages' : number_messages}
 
93
        if visibility_timeout:
 
94
            params['VisibilityTimeout'] = visibility_timeout
 
95
        return self.get_list('ReceiveMessage', params, [('Message', message_class)], queue_url)
 
96
 
 
97
    def delete_message(self, queue_url, message_id, receipt_handle):
 
98
        params = {'ReceiptHandle' : receipt_handle}
 
99
        return self.get_status('DeleteMessage', params, queue_url)
 
100
 
 
101
    def send_message(self, queue_url, message_content):
 
102
        params = {'MessageBody' : message_content}
 
103
        return self.get_status('SendMessage', params, queue_url)
 
104
 
 
105
    def get_all_queues(self, prefix=''):
 
106
        params = {}
 
107
        if prefix:
 
108
            params['QueueNamePrefix'] = prefix
 
109
        return self.get_list('ListQueues', params, [('QueueUrl', Queue)])
 
110
        
 
111
    def get_queue(self, queue_name):
 
112
        rs = self.get_all_queues(queue_name)
 
113
        if len(rs) == 1:
 
114
            return rs[0]
 
115
        return None
 
116
 
 
117
    lookup = get_queue
249
118