~cbehrens/nova/lp844160-build-works-with-zones

« back to all changes in this revision

Viewing changes to vendor/boto/boto/services/service.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/
 
2
#
 
3
# Permission is hereby granted, free of charge, to any person obtaining a
 
4
# copy of this software and associated documentation files (the
 
5
# "Software"), to deal in the Software without restriction, including
 
6
# without limitation the rights to use, copy, modify, merge, publish, dis-
 
7
# tribute, sublicense, and/or sell copies of the Software, and to permit
 
8
# persons to whom the Software is furnished to do so, subject to the fol-
 
9
# lowing conditions:
 
10
#
 
11
# The above copyright notice and this permission notice shall be included
 
12
# in all copies or substantial portions of the Software.
 
13
#
 
14
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 
15
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
 
16
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
 
17
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 
 
18
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 
19
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 
20
# IN THE SOFTWARE.
 
21
 
 
22
import boto
 
23
from boto.services.message import ServiceMessage
 
24
from boto.services.servicedef import ServiceDef
 
25
from boto.pyami.scriptbase import ScriptBase
 
26
from boto.utils import get_ts
 
27
import time
 
28
import os
 
29
import mimetypes
 
30
 
 
31
 
 
32
class Service(ScriptBase):
 
33
 
 
34
    # Time required to process a transaction
 
35
    ProcessingTime = 60
 
36
 
 
37
    def __init__(self, config_file=None, mimetype_files=None):
 
38
        ScriptBase.__init__(self, config_file)
 
39
        self.name = self.__class__.__name__
 
40
        self.working_dir = boto.config.get('Pyami', 'working_dir')
 
41
        self.sd = ServiceDef(config_file)
 
42
        self.retry_count = self.sd.getint('retry_count', 5)
 
43
        self.loop_delay = self.sd.getint('loop_delay', 30)
 
44
        self.processing_time = self.sd.getint('processing_time', 60)
 
45
        self.input_queue = self.sd.get_obj('input_queue')
 
46
        self.output_queue = self.sd.get_obj('output_queue')
 
47
        self.output_domain = self.sd.get_obj('output_domain')
 
48
        if mimetype_files:
 
49
            mimetypes.init(mimetype_files)
 
50
 
 
51
    def split_key(key):
 
52
        if key.find(';') < 0:
 
53
            t = (key, '')
 
54
        else:
 
55
            key, type = key.split(';')
 
56
            label, mtype = type.split('=')
 
57
            t = (key, mtype)
 
58
        return t
 
59
 
 
60
    def read_message(self):
 
61
        boto.log.info('read_message')
 
62
        message = self.input_queue.read(self.processing_time)
 
63
        if message:
 
64
            boto.log.info(message.get_body())
 
65
            key = 'Service-Read'
 
66
            message[key] = get_ts()
 
67
        return message
 
68
 
 
69
    # retrieve the source file from S3
 
70
    def get_file(self, message):
 
71
        bucket_name = message['Bucket']
 
72
        key_name = message['InputKey']
 
73
        file_name = os.path.join(self.working_dir, message.get('OriginalFileName', 'in_file'))
 
74
        boto.log.info('get_file: %s/%s to %s' % (bucket_name, key_name, file_name))
 
75
        bucket = boto.lookup('s3', bucket_name)
 
76
        key = bucket.new_key(key_name)
 
77
        key.get_contents_to_filename(os.path.join(self.working_dir, file_name))
 
78
        return file_name
 
79
 
 
80
    # process source file, return list of output files
 
81
    def process_file(self, in_file_name, msg):
 
82
        return []
 
83
 
 
84
    # store result file in S3
 
85
    def put_file(self, bucket_name, file_path, key_name=None):
 
86
        boto.log.info('putting file %s as %s.%s' % (file_path, bucket_name, key_name))
 
87
        bucket = boto.lookup('s3', bucket_name)
 
88
        key = bucket.new_key(key_name)
 
89
        key.set_contents_from_filename(file_path)
 
90
        return key
 
91
 
 
92
    def save_results(self, results, input_message, output_message):
 
93
        output_keys = []
 
94
        for file, type in results:
 
95
            if input_message.has_key('OutputBucket'):
 
96
                output_bucket = input_message['OutputBucket']
 
97
            else:
 
98
                output_bucket = input_message['Bucket']
 
99
            key_name = os.path.split(file)[1]
 
100
            key = self.put_file(output_bucket, file, key_name)
 
101
            output_keys.append('%s;type=%s' % (key.name, type))
 
102
        output_message['OutputKey'] = ','.join(output_keys)
 
103
            
 
104
    # write message to each output queue
 
105
    def write_message(self, message):
 
106
        message['Service-Write'] = get_ts()
 
107
        message['Server'] = self.name
 
108
        if os.environ.has_key('HOSTNAME'):
 
109
            message['Host'] = os.environ['HOSTNAME']
 
110
        else:
 
111
            message['Host'] = 'unknown'
 
112
        message['Instance-ID'] = self.instance_id
 
113
        if self.output_queue:
 
114
            boto.log.info('Writing message to SQS queue: %s' % self.output_queue.id)
 
115
            self.output_queue.write(message)
 
116
        if self.output_domain:
 
117
            boto.log.info('Writing message to SDB domain: %s' % self.output_domain.name)
 
118
            item_name = '/'.join([message['Service-Write'], message['Bucket'], message['InputKey']])
 
119
            self.output_domain.put_attributes(item_name, message)
 
120
 
 
121
    # delete message from input queue
 
122
    def delete_message(self, message):
 
123
        boto.log.info('deleting message from %s' % self.input_queue.id)
 
124
        self.input_queue.delete_message(message)
 
125
 
 
126
    # to clean up any files, etc. after each iteration
 
127
    def cleanup(self):
 
128
        pass
 
129
 
 
130
    def shutdown(self):
 
131
        on_completion = self.sd.get('on_completion', 'shutdown')
 
132
        if on_completion == 'shutdown':
 
133
            if self.instance_id:
 
134
                time.sleep(60)
 
135
                c = boto.connect_ec2()
 
136
                c.terminate_instances([self.instance_id])
 
137
 
 
138
    def main(self, notify=False):
 
139
        self.notify('Service: %s Starting' % self.name)
 
140
        empty_reads = 0
 
141
        while self.retry_count < 0 or empty_reads < self.retry_count:
 
142
            try:
 
143
                input_message = self.read_message()
 
144
                if input_message:
 
145
                    empty_reads = 0
 
146
                    output_message = ServiceMessage(None, input_message.get_body())
 
147
                    input_file = self.get_file(input_message)
 
148
                    results = self.process_file(input_file, output_message)
 
149
                    self.save_results(results, input_message, output_message)
 
150
                    self.write_message(output_message)
 
151
                    self.delete_message(input_message)
 
152
                    self.cleanup()
 
153
                else:
 
154
                    empty_reads += 1
 
155
                    time.sleep(self.loop_delay)
 
156
            except Exception:
 
157
                boto.log.exception('Service Failed')
 
158
                empty_reads += 1
 
159
        self.notify('Service: %s Shutting Down' % self.name)
 
160
        self.shutdown()
 
161