~ubuntu-branches/ubuntu/karmic/python-boto/karmic

« back to all changes in this revision

Viewing changes to boto/services/service.py

  • Committer: Bazaar Package Importer
  • Author(s): Eric Evans
  • Date: 2007-11-24 17:12:40 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20071124171240-11841t56xlqco4pw
Tags: 0.9d-1
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
61
61
    def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
62
62
                 input_queue_name=None, output_queue_name=None,
63
63
                 on_completion='shutdown', notify_email=None,
64
 
                 read_userdata=True, working_dir=None, log_queue_name=None):
 
64
                 read_userdata=True, working_dir=None, log_queue_name=None,
 
65
                 mimetype_files=None, preserve_file_name=False):
65
66
        self.meta_data = {}
66
67
        self.queue_cache = {}
67
68
        self.bucket_cache = {}
72
73
        self.log_queue_name = log_queue_name
73
74
        self.notify_email = notify_email
74
75
        self.on_completion = on_completion
 
76
        self.preserve_file_name = preserve_file_name
75
77
        # now override any values with instance user data passed on startup
76
78
        if read_userdata:
77
79
            self.get_userdata()
78
80
        self.create_connections()
79
81
        self.create_working_dir(working_dir)
 
82
        if mimetype_files:
 
83
            mimetypes.init(mimetype_files)
80
84
 
81
85
    def get_userdata(self):
82
86
        self.meta_data = boto.utils.get_instance_metadata()
149
153
        if bucket_name in self.bucket_cache.keys():
150
154
            return self.bucket_cache[bucket_name]
151
155
        else:
152
 
            bucket = self.s3_conn.create_bucket(bucket_name)
 
156
            bucket = self.s3_conn.get_bucket(bucket_name)
153
157
            self.bucket_cache[bucket_name] = bucket
154
158
            return bucket
155
159
 
156
 
    def key_exists(self, bucket_name, key):
157
 
        bucket = self.get_bucket(bucket_name)
158
 
        return bucket.lookup(key)
159
 
 
160
 
    def create_msg(self, key, params=None):
 
160
    def create_msg(self, key, params=None, bucket_name=None):
161
161
        m = self.input_queue.new_message()
162
162
        if params:
163
163
            m.update(params)
176
176
            m['FileModifiedDate'] = time.strftime(ISO8601, t)
177
177
            t = time.gmtime(s[9])
178
178
            m['FileCreateDate'] = time.strftime(ISO8601, t)
 
179
        else:
 
180
            m['OriginalFileName'] = key.name
 
181
            m['OriginalLocation'] = key.bucket.name
 
182
            m['ContentType'] = key.content_type
179
183
        m['Date'] = time.strftime(RFC1123, time.gmtime())
180
184
        m['Host'] = gethostname()
181
 
        m['Bucket'] = key.bucket.name
182
 
        m['InputKey'] = key.key
 
185
        if bucket_name:
 
186
            m['Bucket'] = bucket_name
 
187
        else:
 
188
            m['Bucket'] = key.bucket.name
 
189
        m['InputKey'] = key.name
183
190
        m['Size'] = key.size
184
191
        return m
185
192
 
187
194
        if not metadata:
188
195
            metadata = {}
189
196
        bucket = self.get_bucket(bucket_name)
190
 
        k = bucket.new_key()
 
197
        if self.preserve_file_name:
 
198
            key_name = os.path.split(path)[1]
 
199
        else:
 
200
            key_name = None
 
201
        k = bucket.new_key(key_name)
191
202
        k.update_metadata(metadata)
192
203
        successful = False
193
204
        num_tries = 0
209
220
                print e
210
221
                time.sleep(self.RetryDelay)
211
222
 
212
 
    def get_result(self, path, original_name=False,
213
 
                   default_ext='.bin', delete_msg=True, get_file=True):
 
223
    def get_result(self, path, delete_msg=True, get_file=True):
214
224
        q = self.get_queue(self.output_queue_name)
215
225
        m = q.read()
216
226
        if m:
218
228
                outputs = m['OutputKey'].split(',')
219
229
                for output in outputs:
220
230
                    key_name, type = output.split(';')
221
 
                    mime_type = type.split('=')[1]
222
 
                    if original_name:
223
 
                        file_name = m.get('OriginalFileName', key_name)
224
 
                        file_name, ext = os.path.splitext(file_name)
225
 
                        ext = mimetypes.guess_extension(mime_type)
226
 
                        if not ext:
227
 
                            ext = default_ext
228
 
                        file_name = file_name + ext
229
 
                    else:
230
 
                        file_name = key_name
231
231
                    bucket = self.get_bucket(m['Bucket'])
232
232
                    key = bucket.lookup(key_name)
233
 
                    print 'retrieving file: %s' % file_name
234
 
                    key.get_contents_to_filename(os.path.join(path, file_name))
 
233
                    print 'retrieving file: %s' % key_name
 
234
                    key.get_contents_to_filename(os.path.join(path, key_name))
235
235
            if delete_msg:
236
236
                q.delete_message(m)
237
237
        return m
258
258
        return message
259
259
 
260
260
    # retrieve the source file from S3
261
 
    def get_file(self, bucket_name, key, file_name):
 
261
    def get_file(self, bucket_name, key_name, file_name):
262
262
        self.log(method='get_file', bucket_name=bucket_name,
263
 
                 key=key, file_name=file_name)
 
263
                 key=key_name, file_name=file_name)
264
264
        successful = False
265
265
        num_tries = 0
266
266
        while not successful and num_tries < self.RetryCount:
267
267
            try:
268
268
                num_tries += 1
269
 
                print 'getting file %s.%s' % (bucket_name, key)
 
269
                print 'getting file %s.%s' % (bucket_name, key_name)
270
270
                bucket = self.get_bucket(bucket_name)
271
 
                k = Key(bucket)
272
 
                k.key = key
273
 
                k.get_contents_to_filename(file_name)
 
271
                key = Key(bucket)
 
272
                key.name = key_name
 
273
                key.get_contents_to_filename(file_name)
274
274
                successful = True
275
275
            except S3ResponseError, e:
276
276
                print 'caught S3Error[%s]'
282
282
        return []
283
283
 
284
284
    # store result file in S3
285
 
    def put_file(self, bucket_name, file_name):
 
285
    def put_file(self, bucket_name, file_path):
286
286
        self.log(method='put_file', bucket_name=bucket_name,
287
 
                 file_name=file_name)
 
287
                 file_path=file_path)
288
288
        successful = False
289
289
        num_tries = 0
290
290
        while not successful and num_tries < self.RetryCount:
291
291
            try:
292
292
                num_tries += 1
293
293
                bucket = self.get_bucket(bucket_name)
294
 
                k = bucket.new_key()
295
 
                k.set_contents_from_filename(file_name)
296
 
                print 'putting file %s as %s.%s' % (file_name, bucket_name, k.key)
 
294
                if self.preserve_file_name:
 
295
                    key_name = os.path.split(file_path)[1]
 
296
                else:
 
297
                    key_name = None
 
298
                key = bucket.new_key(key_name)
 
299
                key.set_contents_from_filename(file_path)
 
300
                print 'putting file %s as %s.%s' % (file_path, bucket_name,
 
301
                                                    key.name)
297
302
                successful = True
298
303
            except S3ResponseError, e:
299
304
                print 'caught S3Error'
300
305
                print e
301
306
                time.sleep(self.RetryDelay)
302
 
        return k
 
307
        return key
 
308
 
 
309
    def _write_message(self, queue, message):
 
310
        successful = False
 
311
        num_tries = 0
 
312
        while not successful and num_tries < self.RetryCount:
 
313
            try:
 
314
                num_tries += 1
 
315
                queue.write(message)
 
316
                successful = True
 
317
            except SQSError, e:
 
318
                print 'caught SQSError'
 
319
                print e
 
320
                time.sleep(self.RetryDelay)
303
321
 
304
322
    # write message to each output queue
305
323
    def write_message(self, message):
306
 
        self.log(method='write_message')
307
 
        message['Service-Write'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT",
308
 
                                                 time.gmtime())
309
 
        message['Server'] = self.__class__.__name__
310
 
        if os.environ.has_key('HOSTNAME'):
311
 
            message['Host'] = os.environ['HOSTNAME']
312
 
        else:
313
 
            message['Host'] = 'unknown'
314
 
        queue = self.get_queue(self.output_queue_name)
315
 
        print 'writing message to %s' % queue.id
316
 
        successful = False
317
 
        num_tries = 0
318
 
        while not successful and num_tries < self.RetryCount:
319
 
            try:
320
 
                num_tries += 1
321
 
                print message.get_body()
322
 
                queue.write(message)
323
 
                successful = True
324
 
            except SQSError, e:
325
 
                print 'caught SQSError'
326
 
                print e
327
 
                time.sleep(self.RetryDelay)
 
324
        if self.output_queue_name:
 
325
            self.log(method='write_message')
 
326
            message['Service-Write'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT",
 
327
                                                     time.gmtime())
 
328
            message['Server'] = self.__class__.__name__
 
329
            if os.environ.has_key('HOSTNAME'):
 
330
                message['Host'] = os.environ['HOSTNAME']
 
331
            else:
 
332
                message['Host'] = 'unknown'
 
333
            queue = self.get_queue(self.output_queue_name)
 
334
            print 'writing message to %s' % queue.id
 
335
            self._write_message(queue, message)
328
336
 
329
337
    # delete message from input queue
330
338
    def delete_message(self, message):
367
375
                    successful_reads += 1
368
376
                    output_message = MHMessage(None, input_message.get_body())
369
377
                    in_key = input_message['InputKey']
 
378
                    in_file_name = input_message.get('OriginalFileName',
 
379
                                                     'in_file')
370
380
                    self.get_file(input_message['Bucket'], in_key,
371
 
                                  os.path.join(self.working_dir,'in_file'))
 
381
                                  os.path.join(self.working_dir,in_file_name))
372
382
                    results = self.process_file(os.path.join(self.working_dir,
373
 
                                                             'in_file'),
 
383
                                                             in_file_name),
374
384
                                                output_message)
375
 
                    output_keys = []
376
 
                    for file, type in results:
377
 
                        key = self.put_file(input_message['Bucket'], file)
378
 
                        output_keys.append('%s;type=%s' % (key.key, type))
379
 
                    output_message['OutputKey'] = ','.join(output_keys)
380
 
                    self.write_message(output_message)
381
 
                    self.delete_message(input_message)
 
385
                    if results != None:
 
386
                        output_keys = []
 
387
                        for file, type in results:
 
388
                            if input_message.has_key('OutputBucket'):
 
389
                                output_bucket = input_message['OutputBucket']
 
390
                            else:
 
391
                                output_bucket = input_message['Bucket']
 
392
                            key = self.put_file(output_bucket, file)
 
393
                            output_keys.append('%s;type=%s' % (key.name, type))
 
394
                        output_message['OutputKey'] = ','.join(output_keys)
 
395
                        self.write_message(output_message)
 
396
                        self.delete_message(input_message)
382
397
                    self.cleanup()
383
398
                else:
384
399
                    empty_reads += 1