61
61
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
62
62
input_queue_name=None, output_queue_name=None,
63
63
on_completion='shutdown', notify_email=None,
64
read_userdata=True, working_dir=None, log_queue_name=None):
64
read_userdata=True, working_dir=None, log_queue_name=None,
65
mimetype_files=None, preserve_file_name=False):
65
66
self.meta_data = {}
66
67
self.queue_cache = {}
67
68
self.bucket_cache = {}
72
73
self.log_queue_name = log_queue_name
73
74
self.notify_email = notify_email
74
75
self.on_completion = on_completion
76
self.preserve_file_name = preserve_file_name
75
77
# now override any values with instance user data passed on startup
77
79
self.get_userdata()
78
80
self.create_connections()
79
81
self.create_working_dir(working_dir)
83
mimetypes.init(mimetype_files)
81
85
def get_userdata(self):
82
86
self.meta_data = boto.utils.get_instance_metadata()
149
153
if bucket_name in self.bucket_cache.keys():
150
154
return self.bucket_cache[bucket_name]
152
bucket = self.s3_conn.create_bucket(bucket_name)
156
bucket = self.s3_conn.get_bucket(bucket_name)
153
157
self.bucket_cache[bucket_name] = bucket
156
def key_exists(self, bucket_name, key):
157
bucket = self.get_bucket(bucket_name)
158
return bucket.lookup(key)
160
def create_msg(self, key, params=None):
160
def create_msg(self, key, params=None, bucket_name=None):
161
161
m = self.input_queue.new_message()
176
176
m['FileModifiedDate'] = time.strftime(ISO8601, t)
177
177
t = time.gmtime(s[9])
178
178
m['FileCreateDate'] = time.strftime(ISO8601, t)
180
m['OriginalFileName'] = key.name
181
m['OriginalLocation'] = key.bucket.name
182
m['ContentType'] = key.content_type
179
183
m['Date'] = time.strftime(RFC1123, time.gmtime())
180
184
m['Host'] = gethostname()
181
m['Bucket'] = key.bucket.name
182
m['InputKey'] = key.key
186
m['Bucket'] = bucket_name
188
m['Bucket'] = key.bucket.name
189
m['InputKey'] = key.name
183
190
m['Size'] = key.size
210
221
time.sleep(self.RetryDelay)
212
def get_result(self, path, original_name=False,
213
default_ext='.bin', delete_msg=True, get_file=True):
223
def get_result(self, path, delete_msg=True, get_file=True):
214
224
q = self.get_queue(self.output_queue_name)
218
228
outputs = m['OutputKey'].split(',')
219
229
for output in outputs:
220
230
key_name, type = output.split(';')
221
mime_type = type.split('=')[1]
223
file_name = m.get('OriginalFileName', key_name)
224
file_name, ext = os.path.splitext(file_name)
225
ext = mimetypes.guess_extension(mime_type)
228
file_name = file_name + ext
231
231
bucket = self.get_bucket(m['Bucket'])
232
232
key = bucket.lookup(key_name)
233
print 'retrieving file: %s' % file_name
234
key.get_contents_to_filename(os.path.join(path, file_name))
233
print 'retrieving file: %s' % key_name
234
key.get_contents_to_filename(os.path.join(path, key_name))
236
236
q.delete_message(m)
260
260
# retrieve the source file from S3
261
def get_file(self, bucket_name, key, file_name):
261
def get_file(self, bucket_name, key_name, file_name):
262
262
self.log(method='get_file', bucket_name=bucket_name,
263
key=key, file_name=file_name)
263
key=key_name, file_name=file_name)
264
264
successful = False
266
266
while not successful and num_tries < self.RetryCount:
269
print 'getting file %s.%s' % (bucket_name, key)
269
print 'getting file %s.%s' % (bucket_name, key_name)
270
270
bucket = self.get_bucket(bucket_name)
273
k.get_contents_to_filename(file_name)
273
key.get_contents_to_filename(file_name)
274
274
successful = True
275
275
except S3ResponseError, e:
276
276
print 'caught S3Error[%s]'
284
284
# store result file in S3
285
def put_file(self, bucket_name, file_name):
285
def put_file(self, bucket_name, file_path):
286
286
self.log(method='put_file', bucket_name=bucket_name,
288
288
successful = False
290
290
while not successful and num_tries < self.RetryCount:
293
293
bucket = self.get_bucket(bucket_name)
295
k.set_contents_from_filename(file_name)
296
print 'putting file %s as %s.%s' % (file_name, bucket_name, k.key)
294
if self.preserve_file_name:
295
key_name = os.path.split(file_path)[1]
298
key = bucket.new_key(key_name)
299
key.set_contents_from_filename(file_path)
300
print 'putting file %s as %s.%s' % (file_path, bucket_name,
297
302
successful = True
298
303
except S3ResponseError, e:
299
304
print 'caught S3Error'
301
306
time.sleep(self.RetryDelay)
309
def _write_message(self, queue, message):
312
while not successful and num_tries < self.RetryCount:
318
print 'caught SQSError'
320
time.sleep(self.RetryDelay)
304
322
# write message to each output queue
305
323
def write_message(self, message):
306
self.log(method='write_message')
307
message['Service-Write'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT",
309
message['Server'] = self.__class__.__name__
310
if os.environ.has_key('HOSTNAME'):
311
message['Host'] = os.environ['HOSTNAME']
313
message['Host'] = 'unknown'
314
queue = self.get_queue(self.output_queue_name)
315
print 'writing message to %s' % queue.id
318
while not successful and num_tries < self.RetryCount:
321
print message.get_body()
325
print 'caught SQSError'
327
time.sleep(self.RetryDelay)
324
if self.output_queue_name:
325
self.log(method='write_message')
326
message['Service-Write'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT",
328
message['Server'] = self.__class__.__name__
329
if os.environ.has_key('HOSTNAME'):
330
message['Host'] = os.environ['HOSTNAME']
332
message['Host'] = 'unknown'
333
queue = self.get_queue(self.output_queue_name)
334
print 'writing message to %s' % queue.id
335
self._write_message(queue, message)
329
337
# delete message from input queue
330
338
def delete_message(self, message):
367
375
successful_reads += 1
368
376
output_message = MHMessage(None, input_message.get_body())
369
377
in_key = input_message['InputKey']
378
in_file_name = input_message.get('OriginalFileName',
370
380
self.get_file(input_message['Bucket'], in_key,
371
os.path.join(self.working_dir,'in_file'))
381
os.path.join(self.working_dir,in_file_name))
372
382
results = self.process_file(os.path.join(self.working_dir,
376
for file, type in results:
377
key = self.put_file(input_message['Bucket'], file)
378
output_keys.append('%s;type=%s' % (key.key, type))
379
output_message['OutputKey'] = ','.join(output_keys)
380
self.write_message(output_message)
381
self.delete_message(input_message)
387
for file, type in results:
388
if input_message.has_key('OutputBucket'):
389
output_bucket = input_message['OutputBucket']
391
output_bucket = input_message['Bucket']
392
key = self.put_file(output_bucket, file)
393
output_keys.append('%s;type=%s' % (key.name, type))
394
output_message['OutputKey'] = ','.join(output_keys)
395
self.write_message(output_message)
396
self.delete_message(input_message)