~0x44/nova/bug838466

« back to all changes in this revision

Viewing changes to vendor/boto/boto/mapreduce/pdb_upload

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
# Copyright (c) 2006-2008 Mitch Garnaat http://garnaat.org/
 
3
#
 
4
# Permission is hereby granted, free of charge, to any person obtaining a
 
5
# copy of this software and associated documentation files (the
 
6
# "Software"), to deal in the Software without restriction, including
 
7
# without limitation the rights to use, copy, modify, merge, publish, dis-
 
8
# tribute, sublicense, and/or sell copies of the Software, and to permit
 
9
# persons to whom the Software is furnished to do so, subject to the fol-
 
10
# lowing conditions:
 
11
#
 
12
# The above copyright notice and this permission notice shall be included
 
13
# in all copies or substantial portions of the Software.
 
14
#
 
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 
16
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
 
17
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
 
18
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 
 
19
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 
20
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 
21
# IN THE SOFTWARE.
 
22
import queuetools, os, signal, sys
 
23
import subprocess
 
24
import time
 
25
from optparse import OptionParser
 
26
from boto.mapreduce.partitiondb import PartitionDB, Partition, Version
 
27
from lqs import LQSServer, LQSHandler
 
28
from boto.exception import SDBPersistenceError
 
29
from boto.sdb.persist import get_manager
 
30
 
 
31
USAGE = """
 
32
  SYNOPSIS
 
33
    %prog [options]
 
34
  DESCRIPTION
 
35
    Upload partition files to a PartitionDB.
 
36
    Called with no options, all PartitionDB objects defined in your default
 
37
    domain (as specified in the "default_domain" option in the "[Persist]"
 
38
    section of your boto config file) will be listed.
 
39
    When called with a particular PartitionDB name (using -p option) all
 
40
    Version objects of that PartitionDB object will be listed.
 
41
    When called with the -p option and a particular Version name specified
 
42
    (using the -v option) all Partitions in that Version object will be listed.
 
43
"""
 
44
class Client:
 
45
 
 
46
    def __init__(self, queue_name):
 
47
        self.q = queuetools.get_queue(queue_name)
 
48
        self.q.connect()
 
49
        self.manager = get_manager()
 
50
        self.process()
 
51
 
 
52
    def process(self):
 
53
        m = self.q.get()
 
54
        if m['item']:
 
55
            v = Version(m['args']['v_id'], self.manager)
 
56
            bucket_name = v.pdb.bucket_name
 
57
        while m['item']:
 
58
            print 'Uploading: %s' % m['item']
 
59
            p = v.add_partition(name=m['item'])
 
60
            p.upload(os.path.join(m['args']['path'], m['item']), bucket_name)
 
61
            self.q.delete(m)
 
62
            m = self.q.get()
 
63
        print 'client processing complete'
 
64
 
 
65
class Server:
 
66
 
 
67
    def __init__(self, path, pdb_name, bucket_name=None, domain_name=None):
 
68
        self.path = path
 
69
        self.pdb_name = pdb_name
 
70
        self.bucket_name = bucket_name
 
71
        self.manager = get_manager(domain_name)
 
72
        self.get_pdb()
 
73
        self.serve()
 
74
 
 
75
    def get_pdb(self):
 
76
        try:
 
77
            self.pdb = PartitionDB.get(name=self.pdb_name)
 
78
        except SDBPersistenceError:
 
79
            self.pdb = PartitionDB(manager=self.manager, name=self.pdb_name, bucket_name=self.bucket_name)
 
80
            self.pdb.save()
 
81
            
 
82
    def serve(self):
 
83
        v = self.pdb.add_version()
 
84
        args = {'path' : self.path,
 
85
                'v_id' : v.id}
 
86
        l = os.listdir(self.path)
 
87
        s = LQSServer('', LQSHandler, iter(l), args)
 
88
        s.serve_forever()
 
89
    
 
90
class Upload:
 
91
 
 
92
    Usage = "usage: %prog [options] command"
 
93
 
 
94
    Commands = {'client' : 'Start an Upload client',
 
95
                'server' : 'Start an Upload server'}
 
96
 
 
97
    def __init__(self):
 
98
        self.parser = OptionParser(usage=self.Usage)
 
99
        self.parser.add_option("--help-commands", action="store_true", dest="help_commands",
 
100
                               help="provides help on the available commands")
 
101
        self.parser.add_option('-d', '--domain-name', action='store', type='string',
 
102
                               help='name of the SimpleDB domain where PDB objects are stored')
 
103
        self.parser.add_option('-n', '--num-processes', action='store', type='int', dest='num_processes',
 
104
                               help='the number of client processes launched')
 
105
        self.parser.set_defaults(num_processes=2)
 
106
        self.parser.add_option('-i', '--input-path', action='store', type='string',
 
107
                               help='the path to directory to upload')
 
108
        self.parser.add_option('-p', '--pdb-name', action='store', type='string',
 
109
                               help='name of the PDB in which to store files (will create if necessary)')
 
110
        self.parser.add_option('-b', '--bucket-name', action='store', type='string',
 
111
                               help='name of S3 bucket (only needed if creating new PDB)')
 
112
        self.options, self.args = self.parser.parse_args()
 
113
        self.prog_name = sys.argv[0]
 
114
 
 
115
    def print_command_help(self):
 
116
        print '\nCommands:'
 
117
        for key in self.Commands.keys():
 
118
            print '  %s\t\t%s' % (key, self.Commands[key])
 
119
 
 
120
    def do_server(self):
 
121
        if not self.options.input_path:
 
122
            self.parser.error('No path provided')
 
123
        if not os.path.isdir(self.options.input_path):
 
124
            self.parser.error('Invalid path (%s)' % self.options.input_path)
 
125
        if not self.options.pdb_name:
 
126
            self.parser.error('No PDB name provided')
 
127
        s = Server(self.options.input_path, self.options.pdb_name,
 
128
                   self.options.bucket_name, self.options.domain_name)
 
129
 
 
130
    def do_client(self):
 
131
        c = Client('localhost')
 
132
        
 
133
    def main(self):
 
134
        if self.options.help_commands:
 
135
            self.print_command_help()
 
136
            sys.exit(0)
 
137
        if len(self.args) == 0:
 
138
            if not self.options.input_path:
 
139
                self.parser.error('No path provided')
 
140
            if not os.path.isdir(self.options.input_path):
 
141
                self.parser.error('Invalid path (%s)' % self.options.input_path)
 
142
            if not self.options.pdb_name:
 
143
                self.parser.error('No PDB name provided')
 
144
            server_command = '%s -p %s -i %s' % (self.prog_name, self.options.pdb_name, self.options.input_path)
 
145
            if self.options.bucket_name:
 
146
                server_command += ' -b %s' % self.options.bucket_name
 
147
            server_command += ' server'
 
148
            client_command = '%s client' % self.prog_name
 
149
            server = subprocess.Popen(server_command, shell=True)
 
150
            print 'server pid: %s' % server.pid
 
151
            time.sleep(5)
 
152
            clients = []
 
153
            for i in range(0, self.options.num_processes):
 
154
                client = subprocess.Popen(client_command, shell=True)
 
155
                clients.append(client)
 
156
            print 'waiting for clients to finish'
 
157
            for client in clients:
 
158
                client.wait()
 
159
            os.kill(server.pid, signal.SIGTERM)
 
160
        elif len(self.args) == 1:
 
161
            self.command = self.args[0]
 
162
            if hasattr(self, 'do_%s' % self.command):
 
163
                method = getattr(self, 'do_%s' % self.command)
 
164
                method()
 
165
            else:
 
166
                self.parser.error('command (%s) not recognized' % self.command)
 
167
        else:
 
168
            self.parser.error('unrecognized commands')
 
169
            
 
170
if __name__ == "__main__":
 
171
    upload = Upload()
 
172
    upload.main()