1
11# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/
3
# Permission is hereby granted, free of charge, to any person obtaining a
4
# copy of this software and associated documentation files (the
5
# "Software"), to deal in the Software without restriction, including
6
# without limitation the rights to use, copy, modify, merge, publish, dis-
7
# tribute, sublicense, and/or sell copies of the Software, and to permit
8
# persons to whom the Software is furnished to do so, subject to the fol-
11
# The above copyright notice and this permission notice shall be included
12
# in all copies or substantial portions of the Software.
14
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22
from boto.s3.connection import S3Connection
23
from boto.sqs.connection import SQSConnection
24
from boto.ec2.connection import EC2Connection
25
from boto.s3.key import Key
26
from boto.sqs.message import MHMessage
27
from boto.exception import SQSError, S3ResponseError
34
from socket import gethostname
38
ISO8601 = '%Y-%m-%dT%H:%M:%SZ'
39
RFC1123 = '%a, %d %b %Y %X GMT'
43
# Number of times to retry failed requests to S3 or SQS
46
# Number of seconds to wait between Retries
49
# Number of times to retry queue read when no messages are available
50
MainLoopRetryCount = 5
52
# Number of seconds to wait before retrying queue read in main loop
55
# Time required to process a transaction
58
# Number of successful queue reads before spawning helpers
61
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
62
input_queue_name=None, output_queue_name=None,
63
on_completion='shutdown', notify_email=None,
64
read_userdata=True, working_dir=None, log_queue_name=None):
67
self.bucket_cache = {}
68
self.aws_access_key_id = aws_access_key_id
69
self.aws_secret_access_key = aws_secret_access_key
70
self.input_queue_name = input_queue_name
71
self.output_queue_name = output_queue_name
72
self.log_queue_name = log_queue_name
73
self.notify_email = notify_email
74
self.on_completion = on_completion
75
# now override any values with instance user data passed on startup
78
self.create_connections()
79
self.create_working_dir(working_dir)
81
def get_userdata(self):
82
self.meta_data = boto.utils.get_instance_metadata()
83
d = boto.utils.get_instance_userdata(sep='|')
86
setattr(self, key, d[key])
88
def create_connections(self):
89
self.sqs_conn = SQSConnection(self.aws_access_key_id,
90
self.aws_secret_access_key)
91
if self.input_queue_name:
92
self.input_queue = self.get_queue(self.input_queue_name)
93
self.s3_conn = S3Connection(self.aws_access_key_id,
94
self.aws_secret_access_key)
96
def create_working_dir(self, working_dir):
97
self.log(method='create_working_dir', working_dir=working_dir)
99
self.working_dir = working_dir
101
self.working_dir = os.path.expanduser('~/work')
102
if not os.path.exists(self.working_dir):
103
os.mkdir(self.working_dir)
104
os.chdir(self.working_dir)
106
def log(self, **params):
107
if self.log_queue_name == None:
109
lq = self.get_queue(self.log_queue_name)
111
m['Timestamp'] = time.strftime(ISO8601, time.gmtime())
116
def notify(self, msg):
117
if self.notify_email:
118
import smtplib, socket
119
subject = "Message from Server - %s" % self.__class__.__name__
120
body = "From: %s\r\n" % self.notify_email
121
body = body + "To: %s\r\n" % self.notify_email
122
body = body + "Subject: " + subject + '\r\n\r\n'
123
body = body + 'Server: %s\n' % self.__class__.__name__
124
body = body + 'Host: %s\n' % socket.gethostname()
126
server = smtplib.SMTP('localhost')
127
server.sendmail(self.notify_email, self.notify_email, body)
131
if key.find(';') < 0:
134
key, type = key.split(';')
135
label, mtype = type.split('=')
139
def get_queue(self, queue_name):
140
if queue_name in self.queue_cache.keys():
141
return self.queue_cache[queue_name]
143
queue = self.sqs_conn.create_queue(queue_name)
144
queue.set_message_class(MHMessage)
145
self.queue_cache[queue_name] = queue
148
def get_bucket(self, bucket_name):
149
if bucket_name in self.bucket_cache.keys():
150
return self.bucket_cache[bucket_name]
152
bucket = self.s3_conn.create_bucket(bucket_name)
153
self.bucket_cache[bucket_name] = bucket
156
def key_exists(self, bucket_name, key):
157
bucket = self.get_bucket(bucket_name)
158
return bucket.lookup(key)
160
def create_msg(self, key, params=None):
161
m = self.input_queue.new_message()
165
t = os.path.split(key.path)
166
m['OriginalLocation'] = t[0]
167
m['OriginalFileName'] = t[1]
168
mime_type = mimetypes.guess_type(t[1])[0]
169
if mime_type == None:
170
mime_type = 'application/octet-stream'
171
m['Content-Type'] = mime_type
172
s = os.stat(key.path)
173
t = time.gmtime(s[7])
174
m['FileAccessedDate'] = time.strftime(ISO8601, t)
175
t = time.gmtime(s[8])
176
m['FileModifiedDate'] = time.strftime(ISO8601, t)
177
t = time.gmtime(s[9])
178
m['FileCreateDate'] = time.strftime(ISO8601, t)
179
m['Date'] = time.strftime(RFC1123, time.gmtime())
180
m['Host'] = gethostname()
181
m['Bucket'] = key.bucket.name
182
m['InputKey'] = key.key
186
def submit_file(self, path, bucket_name, metadata=None, cb=None, num_cb=0):
189
bucket = self.get_bucket(bucket_name)
191
k.update_metadata(metadata)
194
while not successful and num_tries < self.RetryCount:
197
print 'submitting file: %s' % path
198
k.set_contents_from_filename(path, replace=False,
199
cb=cb, num_cb=num_cb)
200
m = self.create_msg(k, metadata)
201
self.input_queue.write(m)
203
except S3ResponseError, e:
204
print 'caught S3Error'
206
time.sleep(self.RetryDelay)
208
print 'caught SQSError'
210
time.sleep(self.RetryDelay)
212
def get_result(self, path, original_name=False,
213
default_ext='.bin', delete_msg=True, get_file=True):
214
q = self.get_queue(self.output_queue_name)
218
outputs = m['OutputKey'].split(',')
219
for output in outputs:
220
key_name, type = output.split(';')
221
mime_type = type.split('=')[1]
223
file_name = m.get('OriginalFileName', key_name)
224
file_name, ext = os.path.splitext(file_name)
225
ext = mimetypes.guess_extension(mime_type)
228
file_name = file_name + ext
231
bucket = self.get_bucket(m['Bucket'])
232
key = bucket.lookup(key_name)
233
print 'retrieving file: %s' % file_name
234
key.get_contents_to_filename(os.path.join(path, file_name))
239
def read_message(self):
240
self.log(method='read_message')
244
while not successful and num_tries < self.RetryCount:
247
message = self.input_queue.read(self.ProcessingTime)
249
print message.get_body()
251
message[key] = time.strftime("%a, %d %b %Y %H:%M:%S GMT",
255
print 'caught SQSError'
257
time.sleep(self.RetryDelay)
260
# retrieve the source file from S3
261
def get_file(self, bucket_name, key, file_name):
262
self.log(method='get_file', bucket_name=bucket_name,
263
key=key, file_name=file_name)
266
while not successful and num_tries < self.RetryCount:
269
print 'getting file %s.%s' % (bucket_name, key)
270
bucket = self.get_bucket(bucket_name)
273
k.get_contents_to_filename(file_name)
275
except S3ResponseError, e:
276
print 'caught S3Error[%s]'
278
time.sleep(self.RetryDelay)
280
# process source file, return list of output files
281
def process_file(self, in_file_name, msg):
284
# store result file in S3
285
def put_file(self, bucket_name, file_name):
286
self.log(method='put_file', bucket_name=bucket_name,
290
while not successful and num_tries < self.RetryCount:
293
bucket = self.get_bucket(bucket_name)
295
k.set_contents_from_filename(file_name)
296
print 'putting file %s as %s.%s' % (file_name, bucket_name, k.key)
298
except S3ResponseError, e:
299
print 'caught S3Error'
301
time.sleep(self.RetryDelay)
304
# write message to each output queue
305
def write_message(self, message):
306
self.log(method='write_message')
307
message['Service-Write'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT",
309
message['Server'] = self.__class__.__name__
310
if os.environ.has_key('HOSTNAME'):
311
message['Host'] = os.environ['HOSTNAME']
313
message['Host'] = 'unknown'
314
queue = self.get_queue(self.output_queue_name)
315
print 'writing message to %s' % queue.id
318
while not successful and num_tries < self.RetryCount:
321
print message.get_body()
325
print 'caught SQSError'
327
time.sleep(self.RetryDelay)
329
# delete message from input queue
330
def delete_message(self, message):
331
self.log(method='delete_message')
332
print 'deleting message from %s' % self.input_queue.id
335
while not successful and num_tries < self.RetryCount:
338
self.input_queue.delete_message(message)
341
print 'caught SQSError'
343
time.sleep(self.RetryDelay)
346
# to clean up any files, etc. after each iteration
351
if self.on_completion == 'shutdown':
352
if self.meta_data.has_key('instance-id'):
354
c = EC2Connection(self.aws_access_key_id,
355
self.aws_secret_access_key)
356
c.terminate_instances([self.meta_data['instance-id']])
358
def run(self, notify=False):
359
self.notify('Service Starting')
362
while empty_reads < self.MainLoopRetryCount:
364
input_message = self.read_message()
367
successful_reads += 1
368
output_message = MHMessage(None, input_message.get_body())
369
in_key = input_message['InputKey']
370
self.get_file(input_message['Bucket'], in_key,
371
os.path.join(self.working_dir,'in_file'))
372
results = self.process_file(os.path.join(self.working_dir,
376
for file, type in results:
377
key = self.put_file(input_message['Bucket'], file)
378
output_keys.append('%s;type=%s' % (key.key, type))
379
output_message['OutputKey'] = ','.join(output_keys)
380
self.write_message(output_message)
381
self.delete_message(input_message)
386
time.sleep(self.MainLoopDelay)
390
fp = StringIO.StringIO()
391
traceback.print_exc(None, fp)
393
self.notify('Service failed\n%s' % s)
394
self.create_connections()
395
self.notify('Service Shutting Down')