1
# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/
3
# Permission is hereby granted, free of charge, to any person obtaining a
4
# copy of this software and associated documentation files (the
5
# "Software"), to deal in the Software without restriction, including
6
# without limitation the rights to use, copy, modify, merge, publish, dis-
7
# tribute, sublicense, and/or sell copies of the Software, and to permit
8
# persons to whom the Software is furnished to do so, subject to the fol-
11
# The above copyright notice and this permission notice shall be included
12
# in all copies or substantial portions of the Software.
14
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23
from boto.services.message import ServiceMessage
24
from boto.services.servicedef import ServiceDef
25
from boto.pyami.scriptbase import ScriptBase
26
from boto.utils import get_ts
32
class Service(ScriptBase):
34
# Time required to process a transaction
37
def __init__(self, config_file=None, mimetype_files=None):
38
ScriptBase.__init__(self, config_file)
39
self.name = self.__class__.__name__
40
self.working_dir = boto.config.get('Pyami', 'working_dir')
41
self.sd = ServiceDef(config_file)
42
self.retry_count = self.sd.getint('retry_count', 5)
43
self.loop_delay = self.sd.getint('loop_delay', 30)
44
self.processing_time = self.sd.getint('processing_time', 60)
45
self.input_queue = self.sd.get_obj('input_queue')
46
self.output_queue = self.sd.get_obj('output_queue')
47
self.output_domain = self.sd.get_obj('output_domain')
49
mimetypes.init(mimetype_files)
55
key, type = key.split(';')
56
label, mtype = type.split('=')
60
def read_message(self):
61
boto.log.info('read_message')
62
message = self.input_queue.read(self.processing_time)
64
boto.log.info(message.get_body())
66
message[key] = get_ts()
69
# retrieve the source file from S3
70
def get_file(self, message):
71
bucket_name = message['Bucket']
72
key_name = message['InputKey']
73
file_name = os.path.join(self.working_dir, message.get('OriginalFileName', 'in_file'))
74
boto.log.info('get_file: %s/%s to %s' % (bucket_name, key_name, file_name))
75
bucket = boto.lookup('s3', bucket_name)
76
key = bucket.new_key(key_name)
77
key.get_contents_to_filename(os.path.join(self.working_dir, file_name))
80
# process source file, return list of output files
81
def process_file(self, in_file_name, msg):
84
# store result file in S3
85
def put_file(self, bucket_name, file_path, key_name=None):
86
boto.log.info('putting file %s as %s.%s' % (file_path, bucket_name, key_name))
87
bucket = boto.lookup('s3', bucket_name)
88
key = bucket.new_key(key_name)
89
key.set_contents_from_filename(file_path)
92
def save_results(self, results, input_message, output_message):
94
for file, type in results:
95
if input_message.has_key('OutputBucket'):
96
output_bucket = input_message['OutputBucket']
98
output_bucket = input_message['Bucket']
99
key_name = os.path.split(file)[1]
100
key = self.put_file(output_bucket, file, key_name)
101
output_keys.append('%s;type=%s' % (key.name, type))
102
output_message['OutputKey'] = ','.join(output_keys)
104
# write message to each output queue
105
def write_message(self, message):
106
message['Service-Write'] = get_ts()
107
message['Server'] = self.name
108
if os.environ.has_key('HOSTNAME'):
109
message['Host'] = os.environ['HOSTNAME']
111
message['Host'] = 'unknown'
112
message['Instance-ID'] = self.instance_id
113
if self.output_queue:
114
boto.log.info('Writing message to SQS queue: %s' % self.output_queue.id)
115
self.output_queue.write(message)
116
if self.output_domain:
117
boto.log.info('Writing message to SDB domain: %s' % self.output_domain.name)
118
item_name = '/'.join([message['Service-Write'], message['Bucket'], message['InputKey']])
119
self.output_domain.put_attributes(item_name, message)
121
# delete message from input queue
122
def delete_message(self, message):
123
boto.log.info('deleting message from %s' % self.input_queue.id)
124
self.input_queue.delete_message(message)
126
# to clean up any files, etc. after each iteration
131
on_completion = self.sd.get('on_completion', 'shutdown')
132
if on_completion == 'shutdown':
135
c = boto.connect_ec2()
136
c.terminate_instances([self.instance_id])
138
def main(self, notify=False):
139
self.notify('Service: %s Starting' % self.name)
141
while self.retry_count < 0 or empty_reads < self.retry_count:
143
input_message = self.read_message()
146
output_message = ServiceMessage(None, input_message.get_body())
147
input_file = self.get_file(input_message)
148
results = self.process_file(input_file, output_message)
149
self.save_results(results, input_message, output_message)
150
self.write_message(output_message)
151
self.delete_message(input_message)
155
time.sleep(self.loop_delay)
157
boto.log.exception('Service Failed')
159
self.notify('Service: %s Shutting Down' % self.name)