~ntt-pf-lab/nova/monkey_patch_notification

« back to all changes in this revision

Viewing changes to vendor/boto/boto/sqs/queue.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2006-2009 Mitch Garnaat http://garnaat.org/
 
2
#
 
3
# Permission is hereby granted, free of charge, to any person obtaining a
 
4
# copy of this software and associated documentation files (the
 
5
# "Software"), to deal in the Software without restriction, including
 
6
# without limitation the rights to use, copy, modify, merge, publish, dis-
 
7
# tribute, sublicense, and/or sell copies of the Software, and to permit
 
8
# persons to whom the Software is furnished to do so, subject to the fol-
 
9
# lowing conditions:
 
10
#
 
11
# The above copyright notice and this permission notice shall be included
 
12
# in all copies or substantial portions of the Software.
 
13
#
 
14
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 
15
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
 
16
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
 
17
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 
 
18
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 
19
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 
20
# IN THE SOFTWARE.
 
21
 
 
22
"""
 
23
Represents an SQS Queue
 
24
"""
 
25
 
 
26
import urlparse
 
27
from boto.sqs.message import Message
 
28
 
 
29
 
 
30
class Queue:
 
31
 
 
32
    def __init__(self, connection=None, url=None, message_class=Message):
 
33
        self.connection = connection
 
34
        self.url = url
 
35
        self.message_class = message_class
 
36
        self.visibility_timeout = None
 
37
 
 
38
    def _id(self):
 
39
        if self.url:
 
40
            val = urlparse.urlparse(self.url)[2]
 
41
        else:
 
42
            val = self.url
 
43
        return val
 
44
    id = property(_id)
 
45
 
 
46
    def _name(self):
 
47
        if self.url:
 
48
            val = urlparse.urlparse(self.url)[2].split('/')[2]
 
49
        else:
 
50
            val = self.url
 
51
        return  val
 
52
    name = property(_name)
 
53
 
 
54
    def startElement(self, name, attrs, connection):
 
55
        return None
 
56
 
 
57
    def endElement(self, name, value, connection):
 
58
        if name == 'QueueUrl':
 
59
            self.url = value
 
60
        elif name == 'VisibilityTimeout':
 
61
            self.visibility_timeout = int(value)
 
62
        else:
 
63
            setattr(self, name, value)
 
64
 
 
65
    def set_message_class(self, message_class):
 
66
        """
 
67
        Set the message class that should be used when instantiating messages read
 
68
        from the queue.  By default, the class boto.sqs.message.Message is used but
 
69
        this can be overriden with any class that behaves like a message.
 
70
 
 
71
        :type message_class: Message-like class
 
72
        :param message_class:  The new Message class
 
73
        """
 
74
        self.message_class = message_class
 
75
 
 
76
    def get_attributes(self, attributes='All'):
 
77
        """
 
78
        Retrieves attributes about this queue object and returns
 
79
        them in an Attribute instance (subclass of a Dictionary).
 
80
 
 
81
        :type attributes: string
 
82
        :param attributes: String containing one of:
 
83
                           ApproximateNumberOfMessages,
 
84
                           ApproximateNumberOfMessagesNotVisible,
 
85
                           VisibilityTimeout,
 
86
                           CreatedTimestamp,
 
87
                           LastModifiedTimestamp,
 
88
                           Policy
 
89
        :rtype: Attribute object
 
90
        :return: An Attribute object which is a mapping type holding the
 
91
                 requested name/value pairs
 
92
        """
 
93
        return self.connection.get_queue_attributes(self, attributes)
 
94
 
 
95
    def set_attribute(self, attribute, value):
 
96
        """
 
97
        Set a new value for an attribute of the Queue.
 
98
        
 
99
        :type attribute: String
 
100
        :param attribute: The name of the attribute you want to set.  The
 
101
                           only valid value at this time is: VisibilityTimeout
 
102
        :type value: int
 
103
        :param value: The new value for the attribute.
 
104
                      For VisibilityTimeout the value must be an
 
105
                      integer number of seconds from 0 to 86400.
 
106
 
 
107
        :rtype: bool
 
108
        :return: True if successful, otherwise False.
 
109
        """
 
110
        return self.connection.set_queue_attribute(self, attribute, value)
 
111
 
 
112
    def get_timeout(self):
 
113
        """
 
114
        Get the visibility timeout for the queue.
 
115
        
 
116
        :rtype: int
 
117
        :return: The number of seconds as an integer.
 
118
        """
 
119
        a = self.get_attributes('VisibilityTimeout')
 
120
        return int(a['VisibilityTimeout'])
 
121
 
 
122
    def set_timeout(self, visibility_timeout):
 
123
        """
 
124
        Set the visibility timeout for the queue.
 
125
 
 
126
        :type visibility_timeout: int
 
127
        :param visibility_timeout: The desired timeout in seconds
 
128
        """
 
129
        retval = self.set_attribute('VisibilityTimeout', visibility_timeout)
 
130
        if retval:
 
131
            self.visibility_timeout = visibility_timeout
 
132
        return retval
 
133
 
 
134
    def add_permission(self, label, aws_account_id, action_name):
 
135
        """
 
136
        Add a permission to a queue.
 
137
 
 
138
        :type label: str or unicode
 
139
        :param label: A unique identification of the permission you are setting.
 
140
                      Maximum of 80 characters ``[0-9a-zA-Z_-]``
 
141
                      Example, AliceSendMessage
 
142
 
 
143
        :type aws_account_id: str or unicode
 
144
        :param principal_id: The AWS account number of the principal who will be given
 
145
                             permission.  The principal must have an AWS account, but
 
146
                             does not need to be signed up for Amazon SQS. For information
 
147
                             about locating the AWS account identification.
 
148
 
 
149
        :type action_name: str or unicode
 
150
        :param action_name: The action.  Valid choices are:
 
151
                            \*|SendMessage|ReceiveMessage|DeleteMessage|
 
152
                            ChangeMessageVisibility|GetQueueAttributes
 
153
 
 
154
        :rtype: bool
 
155
        :return: True if successful, False otherwise.
 
156
 
 
157
        """
 
158
        return self.connection.add_permission(self, label, aws_account_id, action_name)
 
159
 
 
160
    def remove_permission(self, label):
 
161
        """
 
162
        Remove a permission from a queue.
 
163
 
 
164
        :type label: str or unicode
 
165
        :param label: The unique label associated with the permission being removed.
 
166
 
 
167
        :rtype: bool
 
168
        :return: True if successful, False otherwise.
 
169
        """
 
170
        return self.connection.remove_permission(self, label)
 
171
    
 
172
    def read(self, visibility_timeout=None):
 
173
        """
 
174
        Read a single message from the queue.
 
175
        
 
176
        :type visibility_timeout: int
 
177
        :param visibility_timeout: The timeout for this message in seconds
 
178
 
 
179
        :rtype: :class:`boto.sqs.message.Message`
 
180
        :return: A single message or None if queue is empty
 
181
        """
 
182
        rs = self.get_messages(1, visibility_timeout)
 
183
        if len(rs) == 1:
 
184
            return rs[0]
 
185
        else:
 
186
            return None
 
187
 
 
188
    def write(self, message):
 
189
        """
 
190
        Add a single message to the queue.
 
191
 
 
192
        :type message: Message
 
193
        :param message: The message to be written to the queue
 
194
 
 
195
        :rtype: :class:`boto.sqs.message.Message`
 
196
        :return: The :class:`boto.sqs.message.Message` object that was written.
 
197
        """
 
198
        new_msg = self.connection.send_message(self, message.get_body_encoded())
 
199
        message.id = new_msg.id
 
200
        message.md5 = new_msg.md5
 
201
        return message
 
202
 
 
203
    def new_message(self, body=''):
 
204
        """
 
205
        Create new message of appropriate class.
 
206
 
 
207
        :type body: message body
 
208
        :param body: The body of the newly created message (optional).
 
209
 
 
210
        :rtype: :class:`boto.sqs.message.Message`
 
211
        :return: A new Message object
 
212
        """
 
213
        m = self.message_class(self, body)
 
214
        m.queue = self
 
215
        return m
 
216
 
 
217
    # get a variable number of messages, returns a list of messages
 
218
    def get_messages(self, num_messages=1, visibility_timeout=None,
 
219
                     attributes=None):
 
220
        """
 
221
        Get a variable number of messages.
 
222
 
 
223
        :type num_messages: int
 
224
        :param num_messages: The maximum number of messages to read from the queue.
 
225
        
 
226
        :type visibility_timeout: int
 
227
        :param visibility_timeout: The VisibilityTimeout for the messages read.
 
228
 
 
229
        :type attributes: str
 
230
        :param attributes: The name of additional attribute to return with response
 
231
                           or All if you want all attributes.  The default is to
 
232
                           return no additional attributes.  Valid values:
 
233
                           All
 
234
                           SenderId
 
235
                           SentTimestamp
 
236
                           ApproximateReceiveCount
 
237
                           ApproximateFirstReceiveTimestamp
 
238
                           
 
239
        :rtype: list
 
240
        :return: A list of :class:`boto.sqs.message.Message` objects.
 
241
        """
 
242
        return self.connection.receive_message(self, number_messages=num_messages,
 
243
                                               visibility_timeout=visibility_timeout,
 
244
                                               attributes=attributes)
 
245
 
 
246
    def delete_message(self, message):
 
247
        """
 
248
        Delete a message from the queue.
 
249
 
 
250
        :type message: :class:`boto.sqs.message.Message`
 
251
        :param message: The :class:`boto.sqs.message.Message` object to delete.
 
252
 
 
253
        :rtype: bool
 
254
        :return: True if successful, False otherwise
 
255
        """
 
256
        return self.connection.delete_message(self, message)
 
257
 
 
258
    def delete(self):
 
259
        """
 
260
        Delete the queue.
 
261
        """
 
262
        return self.connection.delete_queue(self)
 
263
 
 
264
    def clear(self, page_size=10, vtimeout=10):
 
265
        """Utility function to remove all messages from a queue"""
 
266
        n = 0
 
267
        l = self.get_messages(page_size, vtimeout)
 
268
        while l:
 
269
            for m in l:
 
270
                self.delete_message(m)
 
271
                n += 1
 
272
            l = self.get_messages(page_size, vtimeout)
 
273
        return n
 
274
 
 
275
    def count(self, page_size=10, vtimeout=10):
 
276
        """
 
277
        Utility function to count the number of messages in a queue.
 
278
        Note: This function now calls GetQueueAttributes to obtain
 
279
        an 'approximate' count of the number of messages in a queue.
 
280
        """
 
281
        a = self.get_attributes('ApproximateNumberOfMessages')
 
282
        return int(a['ApproximateNumberOfMessages'])
 
283
    
 
284
    def count_slow(self, page_size=10, vtimeout=10):
 
285
        """
 
286
        Deprecated.  This is the old 'count' method that actually counts
 
287
        the messages by reading them all.  This gives an accurate count but
 
288
        is very slow for queues with non-trivial number of messasges.
 
289
        Instead, use get_attribute('ApproximateNumberOfMessages') to take
 
290
        advantage of the new SQS capability.  This is retained only for
 
291
        the unit tests.
 
292
        """
 
293
        n = 0
 
294
        l = self.get_messages(page_size, vtimeout)
 
295
        while l:
 
296
            for m in l:
 
297
                n += 1
 
298
            l = self.get_messages(page_size, vtimeout)
 
299
        return n
 
300
    
 
301
    def dump_(self, file_name, page_size=10, vtimeout=10, sep='\n'):
 
302
        """Utility function to dump the messages in a queue to a file
 
303
        NOTE: Page size must be < 10 else SQS errors"""
 
304
        fp = open(file_name, 'wb')
 
305
        n = 0
 
306
        l = self.get_messages(page_size, vtimeout)
 
307
        while l:
 
308
            for m in l:
 
309
                fp.write(m.get_body())
 
310
                if sep:
 
311
                    fp.write(sep)
 
312
                n += 1
 
313
            l = self.get_messages(page_size, vtimeout)
 
314
        fp.close()
 
315
        return n
 
316
 
 
317
    def save_to_file(self, fp, sep='\n'):
 
318
        """
 
319
        Read all messages from the queue and persist them to file-like object.
 
320
        Messages are written to the file and the 'sep' string is written
 
321
        in between messages.  Messages are deleted from the queue after
 
322
        being written to the file.
 
323
        Returns the number of messages saved.
 
324
        """
 
325
        n = 0
 
326
        m = self.read()
 
327
        while m:
 
328
            n += 1
 
329
            fp.write(m.get_body())
 
330
            if sep:
 
331
                fp.write(sep)
 
332
            self.delete_message(m)
 
333
            m = self.read()
 
334
        return n
 
335
    
 
336
    def save_to_filename(self, file_name, sep='\n'):
 
337
        """
 
338
        Read all messages from the queue and persist them to local file.
 
339
        Messages are written to the file and the 'sep' string is written
 
340
        in between messages.  Messages are deleted from the queue after
 
341
        being written to the file.
 
342
        Returns the number of messages saved.
 
343
        """
 
344
        fp = open(file_name, 'wb')
 
345
        n = self.save_to_file(fp, sep)
 
346
        fp.close()
 
347
        return n
 
348
 
 
349
    # for backwards compatibility
 
350
    save = save_to_filename
 
351
 
 
352
    def save_to_s3(self, bucket):
 
353
        """
 
354
        Read all messages from the queue and persist them to S3.
 
355
        Messages are stored in the S3 bucket using a naming scheme of::
 
356
        
 
357
            <queue_id>/<message_id>
 
358
        
 
359
        Messages are deleted from the queue after being saved to S3.
 
360
        Returns the number of messages saved.
 
361
        """
 
362
        n = 0
 
363
        m = self.read()
 
364
        while m:
 
365
            n += 1
 
366
            key = bucket.new_key('%s/%s' % (self.id, m.id))
 
367
            key.set_contents_from_string(m.get_body())
 
368
            self.delete_message(m)
 
369
            m = self.read()
 
370
        return n
 
371
 
 
372
    def load_from_s3(self, bucket, prefix=None):
 
373
        """
 
374
        Load messages previously saved to S3.
 
375
        """
 
376
        n = 0
 
377
        if prefix:
 
378
            prefix = '%s/' % prefix
 
379
        else:
 
380
            prefix = '%s/' % self.id[1:]
 
381
        rs = bucket.list(prefix=prefix)
 
382
        for key in rs:
 
383
            n += 1
 
384
            m = self.new_message(key.get_contents_as_string())
 
385
            self.write(m)
 
386
        return n
 
387
 
 
388
    def load_from_file(self, fp, sep='\n'):
 
389
        """Utility function to load messages from a file-like object to a queue"""
 
390
        n = 0
 
391
        body = ''
 
392
        l = fp.readline()
 
393
        while l:
 
394
            if l == sep:
 
395
                m = Message(self, body)
 
396
                self.write(m)
 
397
                n += 1
 
398
                print 'writing message %d' % n
 
399
                body = ''
 
400
            else:
 
401
                body = body + l
 
402
            l = fp.readline()
 
403
        return n
 
404
    
 
405
    def load_from_filename(self, file_name, sep='\n'):
 
406
        """Utility function to load messages from a local filename to a queue"""
 
407
        fp = open(file_name, 'rb')
 
408
        n = self.load_file_file(fp, sep)
 
409
        fp.close()
 
410
        return n
 
411
 
 
412
    # for backward compatibility
 
413
    load = load_from_filename
 
414