~smoser/ubuntu/lucid/python-boto/debian-1.9b-merge

« back to all changes in this revision

Viewing changes to boto/services/service.py

  • Committer: Bazaar Package Importer
  • Author(s): Eric Evans
  • Date: 2007-07-16 17:17:48 UTC
  • Revision ID: james.westby@ubuntu.com-20070716171748-bsw9rlyu0yuui9lb
Tags: upstream-0.9b
ImportĀ upstreamĀ versionĀ 0.9b

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
11# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/
 
2
#
 
3
# Permission is hereby granted, free of charge, to any person obtaining a
 
4
# copy of this software and associated documentation files (the
 
5
# "Software"), to deal in the Software without restriction, including
 
6
# without limitation the rights to use, copy, modify, merge, publish, dis-
 
7
# tribute, sublicense, and/or sell copies of the Software, and to permit
 
8
# persons to whom the Software is furnished to do so, subject to the fol-
 
9
# lowing conditions:
 
10
#
 
11
# The above copyright notice and this permission notice shall be included
 
12
# in all copies or substantial portions of the Software.
 
13
#
 
14
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 
15
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
 
16
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
 
17
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 
 
18
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 
19
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 
20
# IN THE SOFTWARE.
 
21
 
 
22
from boto.s3.connection import S3Connection
 
23
from boto.sqs.connection import SQSConnection
 
24
from boto.ec2.connection import EC2Connection
 
25
from boto.s3.key import Key
 
26
from boto.sqs.message import MHMessage
 
27
from boto.exception import SQSError, S3ResponseError
 
28
import boto.utils
 
29
import StringIO
 
30
import time
 
31
import os
 
32
import sys, traceback
 
33
import md5
 
34
from socket import gethostname
 
35
import mimetypes
 
36
 
 
37
# Timezone formats
 
38
ISO8601 = '%Y-%m-%dT%H:%M:%SZ'
 
39
RFC1123 = '%a, %d %b %Y %X GMT'
 
40
 
 
41
class Service:
 
42
 
 
43
    # Number of times to retry failed requests to S3 or SQS
 
44
    RetryCount = 5
 
45
 
 
46
    # Number of seconds to wait between Retries
 
47
    RetryDelay = 5
 
48
 
 
49
    # Number of times to retry queue read when no messages are available
 
50
    MainLoopRetryCount = 5
 
51
    
 
52
    # Number of seconds to wait before retrying queue read in main loop
 
53
    MainLoopDelay = 30
 
54
 
 
55
    # Time required to process a transaction
 
56
    ProcessingTime = 60
 
57
 
 
58
    # Number of successful queue reads before spawning helpers
 
59
    SpawnCount = 10
 
60
 
 
61
    def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
 
62
                 input_queue_name=None, output_queue_name=None,
 
63
                 on_completion='shutdown', notify_email=None,
 
64
                 read_userdata=True, working_dir=None, log_queue_name=None):
 
65
        self.meta_data = {}
 
66
        self.queue_cache = {}
 
67
        self.bucket_cache = {}
 
68
        self.aws_access_key_id = aws_access_key_id
 
69
        self.aws_secret_access_key = aws_secret_access_key
 
70
        self.input_queue_name = input_queue_name
 
71
        self.output_queue_name = output_queue_name
 
72
        self.log_queue_name = log_queue_name
 
73
        self.notify_email = notify_email
 
74
        self.on_completion = on_completion
 
75
        # now override any values with instance user data passed on startup
 
76
        if read_userdata:
 
77
            self.get_userdata()
 
78
        self.create_connections()
 
79
        self.create_working_dir(working_dir)
 
80
 
 
81
    def get_userdata(self):
 
82
        self.meta_data = boto.utils.get_instance_metadata()
 
83
        d = boto.utils.get_instance_userdata(sep='|')
 
84
        if d:
 
85
            for key in d.keys():
 
86
                setattr(self, key, d[key])
 
87
 
 
88
    def create_connections(self):
 
89
        self.sqs_conn = SQSConnection(self.aws_access_key_id,
 
90
                                      self.aws_secret_access_key)
 
91
        if self.input_queue_name:
 
92
            self.input_queue = self.get_queue(self.input_queue_name)
 
93
        self.s3_conn = S3Connection(self.aws_access_key_id,
 
94
                                    self.aws_secret_access_key)
 
95
 
 
96
    def create_working_dir(self, working_dir):
 
97
        self.log(method='create_working_dir', working_dir=working_dir)
 
98
        if working_dir:
 
99
            self.working_dir = working_dir
 
100
        else:
 
101
            self.working_dir = os.path.expanduser('~/work')
 
102
        if not os.path.exists(self.working_dir):
 
103
            os.mkdir(self.working_dir)
 
104
        os.chdir(self.working_dir)
 
105
 
 
106
    def log(self, **params):
 
107
        if self.log_queue_name == None:
 
108
            return
 
109
        lq = self.get_queue(self.log_queue_name)
 
110
        m = lq.new_message()
 
111
        m['Timestamp'] = time.strftime(ISO8601, time.gmtime())
 
112
        for key in params:
 
113
            m[key] = params[key]
 
114
        lq.write(m)
 
115
        
 
116
    def notify(self, msg):
 
117
        if self.notify_email:
 
118
            import smtplib, socket
 
119
            subject = "Message from Server - %s" % self.__class__.__name__
 
120
            body = "From: %s\r\n" % self.notify_email
 
121
            body = body + "To: %s\r\n" % self.notify_email
 
122
            body = body + "Subject: " + subject + '\r\n\r\n'
 
123
            body = body + 'Server: %s\n' % self.__class__.__name__
 
124
            body = body + 'Host: %s\n' % socket.gethostname()
 
125
            body = body + msg
 
126
            server = smtplib.SMTP('localhost')
 
127
            server.sendmail(self.notify_email, self.notify_email, body)
 
128
            server.quit()
 
129
        
 
130
    def split_key(key):
 
131
        if key.find(';') < 0:
 
132
            t = (key, '')
 
133
        else:
 
134
            key, type = key.split(';')
 
135
            label, mtype = type.split('=')
 
136
            t = (key, mtype)
 
137
        return t
 
138
 
 
139
    def get_queue(self, queue_name):
 
140
        if queue_name in self.queue_cache.keys():
 
141
            return self.queue_cache[queue_name]
 
142
        else:
 
143
            queue = self.sqs_conn.create_queue(queue_name)
 
144
            queue.set_message_class(MHMessage)
 
145
            self.queue_cache[queue_name] = queue
 
146
            return queue
 
147
 
 
148
    def get_bucket(self, bucket_name):
 
149
        if bucket_name in self.bucket_cache.keys():
 
150
            return self.bucket_cache[bucket_name]
 
151
        else:
 
152
            bucket = self.s3_conn.create_bucket(bucket_name)
 
153
            self.bucket_cache[bucket_name] = bucket
 
154
            return bucket
 
155
 
 
156
    def key_exists(self, bucket_name, key):
 
157
        bucket = self.get_bucket(bucket_name)
 
158
        return bucket.lookup(key)
 
159
 
 
160
    def create_msg(self, key, params=None):
 
161
        m = self.input_queue.new_message()
 
162
        if params:
 
163
            m.update(params)
 
164
        if key.path:
 
165
            t = os.path.split(key.path)
 
166
            m['OriginalLocation'] = t[0]
 
167
            m['OriginalFileName'] = t[1]
 
168
            mime_type = mimetypes.guess_type(t[1])[0]
 
169
            if mime_type == None:
 
170
                mime_type = 'application/octet-stream'
 
171
            m['Content-Type'] = mime_type
 
172
            s = os.stat(key.path)
 
173
            t = time.gmtime(s[7])
 
174
            m['FileAccessedDate'] = time.strftime(ISO8601, t)
 
175
            t = time.gmtime(s[8])
 
176
            m['FileModifiedDate'] = time.strftime(ISO8601, t)
 
177
            t = time.gmtime(s[9])
 
178
            m['FileCreateDate'] = time.strftime(ISO8601, t)
 
179
        m['Date'] = time.strftime(RFC1123, time.gmtime())
 
180
        m['Host'] = gethostname()
 
181
        m['Bucket'] = key.bucket.name
 
182
        m['InputKey'] = key.key
 
183
        m['Size'] = key.size
 
184
        return m
 
185
 
 
186
    def submit_file(self, path, bucket_name, metadata=None, cb=None, num_cb=0):
 
187
        if not metadata:
 
188
            metadata = {}
 
189
        bucket = self.get_bucket(bucket_name)
 
190
        k = bucket.new_key()
 
191
        k.update_metadata(metadata)
 
192
        successful = False
 
193
        num_tries = 0
 
194
        while not successful and num_tries < self.RetryCount:
 
195
            try:
 
196
                num_tries += 1
 
197
                print 'submitting file: %s' % path
 
198
                k.set_contents_from_filename(path, replace=False,
 
199
                                             cb=cb, num_cb=num_cb)
 
200
                m = self.create_msg(k, metadata)
 
201
                self.input_queue.write(m)
 
202
                successful = True
 
203
            except S3ResponseError, e:
 
204
                print 'caught S3Error'
 
205
                print e
 
206
                time.sleep(self.RetryDelay)
 
207
            except SQSError, e:
 
208
                print 'caught SQSError'
 
209
                print e
 
210
                time.sleep(self.RetryDelay)
 
211
 
 
212
    def get_result(self, path, original_name=False,
 
213
                   default_ext='.bin', delete_msg=True, get_file=True):
 
214
        q = self.get_queue(self.output_queue_name)
 
215
        m = q.read()
 
216
        if m:
 
217
            if get_file:
 
218
                outputs = m['OutputKey'].split(',')
 
219
                for output in outputs:
 
220
                    key_name, type = output.split(';')
 
221
                    mime_type = type.split('=')[1]
 
222
                    if original_name:
 
223
                        file_name = m.get('OriginalFileName', key_name)
 
224
                        file_name, ext = os.path.splitext(file_name)
 
225
                        ext = mimetypes.guess_extension(mime_type)
 
226
                        if not ext:
 
227
                            ext = default_ext
 
228
                        file_name = file_name + ext
 
229
                    else:
 
230
                        file_name = key_name
 
231
                    bucket = self.get_bucket(m['Bucket'])
 
232
                    key = bucket.lookup(key_name)
 
233
                    print 'retrieving file: %s' % file_name
 
234
                    key.get_contents_to_filename(os.path.join(path, file_name))
 
235
            if delete_msg:
 
236
                q.delete_message(m)
 
237
        return m
 
238
 
 
239
    def read_message(self):
 
240
        self.log(method='read_message')
 
241
        message = None
 
242
        successful = False
 
243
        num_tries = 0
 
244
        while not successful and num_tries < self.RetryCount:
 
245
            try:
 
246
                num_tries += 1
 
247
                message = self.input_queue.read(self.ProcessingTime)
 
248
                if message:
 
249
                    print message.get_body()
 
250
                    key = 'Service-Read'
 
251
                    message[key] = time.strftime("%a, %d %b %Y %H:%M:%S GMT",
 
252
                                                 time.gmtime())
 
253
                successful = True
 
254
            except SQSError, e:
 
255
                print 'caught SQSError'
 
256
                print e
 
257
                time.sleep(self.RetryDelay)
 
258
        return message
 
259
 
 
260
    # retrieve the source file from S3
 
261
    def get_file(self, bucket_name, key, file_name):
 
262
        self.log(method='get_file', bucket_name=bucket_name,
 
263
                 key=key, file_name=file_name)
 
264
        successful = False
 
265
        num_tries = 0
 
266
        while not successful and num_tries < self.RetryCount:
 
267
            try:
 
268
                num_tries += 1
 
269
                print 'getting file %s.%s' % (bucket_name, key)
 
270
                bucket = self.get_bucket(bucket_name)
 
271
                k = Key(bucket)
 
272
                k.key = key
 
273
                k.get_contents_to_filename(file_name)
 
274
                successful = True
 
275
            except S3ResponseError, e:
 
276
                print 'caught S3Error[%s]'
 
277
                print e
 
278
                time.sleep(self.RetryDelay)
 
279
 
 
280
    # process source file, return list of output files
 
281
    def process_file(self, in_file_name, msg):
 
282
        return []
 
283
 
 
284
    # store result file in S3
 
285
    def put_file(self, bucket_name, file_name):
 
286
        self.log(method='put_file', bucket_name=bucket_name,
 
287
                 file_name=file_name)
 
288
        successful = False
 
289
        num_tries = 0
 
290
        while not successful and num_tries < self.RetryCount:
 
291
            try:
 
292
                num_tries += 1
 
293
                bucket = self.get_bucket(bucket_name)
 
294
                k = bucket.new_key()
 
295
                k.set_contents_from_filename(file_name)
 
296
                print 'putting file %s as %s.%s' % (file_name, bucket_name, k.key)
 
297
                successful = True
 
298
            except S3ResponseError, e:
 
299
                print 'caught S3Error'
 
300
                print e
 
301
                time.sleep(self.RetryDelay)
 
302
        return k
 
303
 
 
304
    # write message to each output queue
 
305
    def write_message(self, message):
 
306
        self.log(method='write_message')
 
307
        message['Service-Write'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT",
 
308
                                                 time.gmtime())
 
309
        message['Server'] = self.__class__.__name__
 
310
        if os.environ.has_key('HOSTNAME'):
 
311
            message['Host'] = os.environ['HOSTNAME']
 
312
        else:
 
313
            message['Host'] = 'unknown'
 
314
        queue = self.get_queue(self.output_queue_name)
 
315
        print 'writing message to %s' % queue.id
 
316
        successful = False
 
317
        num_tries = 0
 
318
        while not successful and num_tries < self.RetryCount:
 
319
            try:
 
320
                num_tries += 1
 
321
                print message.get_body()
 
322
                queue.write(message)
 
323
                successful = True
 
324
            except SQSError, e:
 
325
                print 'caught SQSError'
 
326
                print e
 
327
                time.sleep(self.RetryDelay)
 
328
 
 
329
    # delete message from input queue
 
330
    def delete_message(self, message):
 
331
        self.log(method='delete_message')
 
332
        print 'deleting message from %s' % self.input_queue.id
 
333
        successful = False
 
334
        num_tries = 0
 
335
        while not successful and num_tries < self.RetryCount:
 
336
            try:
 
337
                num_tries += 1
 
338
                self.input_queue.delete_message(message)
 
339
                successful = True
 
340
            except SQSError, e:
 
341
                print 'caught SQSError'
 
342
                print e
 
343
                time.sleep(self.RetryDelay)
 
344
                
 
345
 
 
346
    # to clean up any files, etc. after each iteration
 
347
    def cleanup(self):
 
348
        pass
 
349
 
 
350
    def shutdown(self):
 
351
        if self.on_completion == 'shutdown':
 
352
            if self.meta_data.has_key('instance-id'):
 
353
                time.sleep(60)
 
354
                c = EC2Connection(self.aws_access_key_id,
 
355
                                  self.aws_secret_access_key)
 
356
                c.terminate_instances([self.meta_data['instance-id']])
 
357
 
 
358
    def run(self, notify=False):
 
359
        self.notify('Service Starting')
 
360
        successful_reads = 0
 
361
        empty_reads = 0
 
362
        while empty_reads < self.MainLoopRetryCount:
 
363
            try:
 
364
                input_message = self.read_message()
 
365
                if input_message:
 
366
                    empty_reads = 0
 
367
                    successful_reads += 1
 
368
                    output_message = MHMessage(None, input_message.get_body())
 
369
                    in_key = input_message['InputKey']
 
370
                    self.get_file(input_message['Bucket'], in_key,
 
371
                                  os.path.join(self.working_dir,'in_file'))
 
372
                    results = self.process_file(os.path.join(self.working_dir,
 
373
                                                             'in_file'),
 
374
                                                output_message)
 
375
                    output_keys = []
 
376
                    for file, type in results:
 
377
                        key = self.put_file(input_message['Bucket'], file)
 
378
                        output_keys.append('%s;type=%s' % (key.key, type))
 
379
                    output_message['OutputKey'] = ','.join(output_keys)
 
380
                    self.write_message(output_message)
 
381
                    self.delete_message(input_message)
 
382
                    self.cleanup()
 
383
                else:
 
384
                    empty_reads += 1
 
385
                    successful_reads = 0
 
386
                    time.sleep(self.MainLoopDelay)
 
387
            except Exception, e:
 
388
                empty_reads += 1
 
389
                successful_reads = 0
 
390
                fp = StringIO.StringIO()
 
391
                traceback.print_exc(None, fp)
 
392
                s = fp.getvalue()
 
393
                self.notify('Service failed\n%s' % s)
 
394
                self.create_connections()
 
395
        self.notify('Service Shutting Down')
 
396
        self.shutdown()
 
397