19
19
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22
from boto.connection import AWSAuthConnection, AWSQueryConnection
22
from boto.connection import AWSQueryConnection
24
24
from boto.sqs.queue import Queue
25
from boto.sqs.message import Message
25
26
from boto.sqs.attributes import Attributes
26
27
from boto import handler
27
28
from boto.resultset import ResultSet
28
29
from boto.exception import SQSError
30
class SQSQueryConnection(AWSQueryConnection):
31
PERM_ReceiveMessage = 'ReceiveMessage'
32
PERM_SendMessage = 'SendMessage'
33
PERM_FullControl = 'FullControl'
35
AllPermissions = [PERM_ReceiveMessage, PERM_SendMessage, PERM_FullControl]
37
class SQSConnection(AWSQueryConnection):
33
This class uses the Query API (boo!) to SQS to access some of the
34
new features which have not yet been added to the REST api (yeah!).
40
A subclass of the original SQSQueryConnection targeting the 2008-01-01 SQS API.
37
43
DefaultHost = 'queue.amazonaws.com'
38
APIVersion = '2007-05-01'
44
APIVersion = '2008-01-01'
39
45
SignatureVersion = '1'
40
46
DefaultContentType = 'text/plain'
47
ResponseError = SQSError
42
49
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
43
50
is_secure=False, port=None, proxy=None, proxy_port=None,
44
host=DefaultHost, debug=0, https_connection_factory=None):
45
AWSQueryConnection.__init__(self, aws_access_key_id,
46
aws_secret_access_key,
47
is_secure, port, proxy, proxy_port,
51
proxy_user=None, proxy_pass=None, host=DefaultHost, debug=0,
52
https_connection_factory=None):
53
AWSQueryConnection.__init__(self, aws_access_key_id, aws_secret_access_key,
54
is_secure, port, proxy, proxy_port, proxy_user, proxy_pass,
48
55
host, debug, https_connection_factory)
50
def get_queue_attributes(self, queue_url, attribute='All'):
51
params = {'Attribute' : attribute}
52
response = self.make_request('GetQueueAttributes', params, queue_url)
53
body = response.read()
54
if response.status == 200:
56
h = handler.XmlHandler(attrs, self)
57
xml.sax.parseString(body, h)
60
raise SQSError(response.status, response.reason, body)
62
def set_queue_attribute(self, queue_url, attribute, value):
63
params = {'Attribute' : attribute, 'Value' : value}
64
response = self.make_request('SetQueueAttributes', params, queue_url)
65
body = response.read()
66
if response.status == 200:
68
h = handler.XmlHandler(rs, self)
69
xml.sax.parseString(body, h)
72
raise SQSError(response.status, response.reason, body)
74
def change_message_visibility(self, queue_url, message_id, vtimeout):
75
params = {'MessageId' : message_id,
76
'VisibilityTimeout' : vtimeout}
77
response = self.make_request('ChangeMessageVisibility', params,
79
body = response.read()
80
if response.status == 200:
82
h = handler.XmlHandler(rs, self)
83
xml.sax.parseString(body, h)
86
raise SQSError(response.status, response.reason, body)
88
class SQSConnection(AWSAuthConnection):
90
DefaultHost = 'queue.amazonaws.com'
91
DefaultVersion = '2007-05-01'
92
DefaultContentType = 'text/plain'
94
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
95
is_secure=False, port=None, proxy=None, proxy_port=None,
96
host=DefaultHost, debug=0, https_connection_factory=None):
97
AWSAuthConnection.__init__(self, host,
98
aws_access_key_id, aws_secret_access_key,
99
is_secure, port, proxy, proxy_port, debug,
100
https_connection_factory)
101
self.query_conn = None
103
def make_request(self, method, path, headers=None, data=''):
108
if not headers.has_key('AWS-Version'):
109
headers['AWS-Version'] = self.DefaultVersion
111
if not headers.has_key('Content-Type'):
112
headers['Content-Type'] = self.DefaultContentType
114
return AWSAuthConnection.make_request(self, method, path,
117
def get_query_connection(self):
118
if not self.query_conn:
119
self.query_conn = SQSQueryConnection(self.aws_access_key_id,
120
self.aws_secret_access_key,
121
self.is_secure, self.port,
122
self.proxy, self.proxy_port,
123
self.server, self.debug,
124
self.https_connection_factory)
125
return self.query_conn
127
def get_all_queues(self, prefix=''):
129
path = '/?QueueNamePrefix=%s' % prefix
132
response = self.make_request('GET', path)
133
body = response.read()
134
if response.status >= 300:
135
raise SQSError(response.status, response.reason, body)
136
rs = ResultSet([('QueueUrl', Queue)])
137
h = handler.XmlHandler(rs, self)
138
xml.sax.parseString(body, h)
141
def get_queue(self, queue_name):
143
rs = self.get_all_queues(queue_name)
151
def get_queue_attributes(self, queue_url, attribute='All'):
153
Performs a GetQueueAttributes request and returns an Attributes
154
instance (subclass of a Dictionary) holding the requested
155
attribute name/value pairs.
157
queue_url - the URL of the desired SQS queue
158
attribute - All|ApproximateNumberOfMessages|VisibilityTimeout
159
Default value is "All"
161
An Attribute object which is a mapping type holding the
162
requested name/value pairs
164
qc = self.get_query_connection()
165
return qc.get_queue_attributes(queue_url, attribute)
167
def set_queue_attribute(self, queue_url, attribute, value):
169
Performs a SetQueueAttributes request.
171
queue_url - The URL of the desired SQS queue
172
attribute - The name of the attribute you want to set. The
173
only valid value at this time is: VisibilityTimeout
174
value - The new value for the attribute.
175
For VisibilityTimeout the value must be an
176
integer number of seconds from 0 to 86400.
178
Boolean True if successful, otherwise False.
180
qc = self.get_query_connection()
181
return qc.set_queue_attribute(queue_url, attribute, value)
183
def change_message_visibility(self, queue_url, message_id, vtimeout):
185
Change the VisibilityTimeout for an individual message.
187
queue_url - The URL of the desired SQS queue
188
message_id - The ID of the message whose timeout will be changed
189
vtimeout - The new VisibilityTimeout value, in seconds
191
Boolean True if successful, otherwise False
192
Note: This functionality is also available as a method of the
195
qc = self.get_query_connection()
196
return qc.change_message_visibility(queue_url, message_id, vtimeout)
198
57
def create_queue(self, queue_name, visibility_timeout=None):
202
queue_name - The name of the new queue
203
visibility_timeout - (Optional) The default visibility
204
timeout for the new queue.
206
A new Queue object representing the newly created queue.
208
path = '/?QueueName=%s' % queue_name
58
params = {'QueueName': queue_name}
209
59
if visibility_timeout:
210
path = path + '&DefaultVisibilityTimeout=%d' % visibility_timeout
211
response = self.make_request('POST', path)
212
body = response.read()
213
if response.status >= 300:
214
raise SQSError(response.status, response.reason, body)
216
h = handler.XmlHandler(q, self)
217
xml.sax.parseString(body, h)
60
params['DefaultVisibilityTimeout'] = '%d' % (visibility_timeout,)
61
return self.get_object('CreateQueue', params, Queue)
220
63
def delete_queue(self, queue, force_deletion=False):
234
77
should probably return a Boolean indicating success or
238
path = 'DELETE?ForceDeletion=true'
241
response = self.make_request(path, queue.id)
242
body = response.read()
243
if response.status >= 300:
244
raise SQSError(response.status, response.reason, body)
246
h = handler.XmlHandler(rs, self)
247
xml.sax.parseString(body, h)
80
return self.get_status('DeleteQueue', None, queue.id)
82
def get_queue_attributes(self, queue_url, attribute='All'):
83
params = {'AttributeName' : attribute}
84
return self.get_object('GetQueueAttributes', params, Attributes, queue_url)
86
def set_queue_attribute(self, queue_url, attribute, value):
87
params = {'Attribute.Name' : attribute, 'Attribute.Value' : value}
88
return self.get_status('SetQueueAttributes', params, queue_url)
90
def receive_message(self, queue_url, number_messages=1,
91
visibility_timeout=None, message_class=Message):
92
params = {'MaxNumberOfMessages' : number_messages}
93
if visibility_timeout:
94
params['VisibilityTimeout'] = visibility_timeout
95
return self.get_list('ReceiveMessage', params, [('Message', message_class)], queue_url)
97
def delete_message(self, queue_url, message_id, receipt_handle):
98
params = {'ReceiptHandle' : receipt_handle}
99
return self.get_status('DeleteMessage', params, queue_url)
101
def send_message(self, queue_url, message_content):
102
params = {'MessageBody' : message_content}
103
return self.get_status('SendMessage', params, queue_url)
105
def get_all_queues(self, prefix=''):
108
params['QueueNamePrefix'] = prefix
109
return self.get_list('ListQueues', params, [('QueueUrl', Queue)])
111
def get_queue(self, queue_name):
112
rs = self.get_all_queues(queue_name)