2
# Copyright (c) 2006-2008 Mitch Garnaat http://garnaat.org/
4
# Permission is hereby granted, free of charge, to any person obtaining a
5
# copy of this software and associated documentation files (the
6
# "Software"), to deal in the Software without restriction, including
7
# without limitation the rights to use, copy, modify, merge, publish, dis-
8
# tribute, sublicense, and/or sell copies of the Software, and to permit
9
# persons to whom the Software is furnished to do so, subject to the fol-
12
# The above copyright notice and this permission notice shall be included
13
# in all copies or substantial portions of the Software.
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22
import queuetools, os, signal, sys
25
from optparse import OptionParser
26
from boto.mapreduce.partitiondb import PartitionDB, Partition, Version
27
from lqs import LQSServer, LQSHandler
28
from boto.exception import SDBPersistenceError
29
from boto.sdb.persist import get_manager
35
Upload partition files to a PartitionDB.
36
Called with no options, all PartitionDB objects defined in your default
37
domain (as specified in the "default_domain" option in the "[Persist]"
38
section of your boto config file) will be listed.
39
When called with a particular PartitionDB name (using -p option) all
40
Version objects of that PartitionDB object will be listed.
41
When called with the -p option and a particular Version name specified
42
(using the -v option) all Partitions in that Version object will be listed.
46
def __init__(self, queue_name):
47
self.q = queuetools.get_queue(queue_name)
49
self.manager = get_manager()
55
v = Version(m['args']['v_id'], self.manager)
56
bucket_name = v.pdb.bucket_name
58
print 'Uploading: %s' % m['item']
59
p = v.add_partition(name=m['item'])
60
p.upload(os.path.join(m['args']['path'], m['item']), bucket_name)
63
print 'client processing complete'
67
def __init__(self, path, pdb_name, bucket_name=None, domain_name=None):
69
self.pdb_name = pdb_name
70
self.bucket_name = bucket_name
71
self.manager = get_manager(domain_name)
77
self.pdb = PartitionDB.get(name=self.pdb_name)
78
except SDBPersistenceError:
79
self.pdb = PartitionDB(manager=self.manager, name=self.pdb_name, bucket_name=self.bucket_name)
83
v = self.pdb.add_version()
84
args = {'path' : self.path,
86
l = os.listdir(self.path)
87
s = LQSServer('', LQSHandler, iter(l), args)
92
Usage = "usage: %prog [options] command"
94
Commands = {'client' : 'Start an Upload client',
95
'server' : 'Start an Upload server'}
98
self.parser = OptionParser(usage=self.Usage)
99
self.parser.add_option("--help-commands", action="store_true", dest="help_commands",
100
help="provides help on the available commands")
101
self.parser.add_option('-d', '--domain-name', action='store', type='string',
102
help='name of the SimpleDB domain where PDB objects are stored')
103
self.parser.add_option('-n', '--num-processes', action='store', type='int', dest='num_processes',
104
help='the number of client processes launched')
105
self.parser.set_defaults(num_processes=2)
106
self.parser.add_option('-i', '--input-path', action='store', type='string',
107
help='the path to directory to upload')
108
self.parser.add_option('-p', '--pdb-name', action='store', type='string',
109
help='name of the PDB in which to store files (will create if necessary)')
110
self.parser.add_option('-b', '--bucket-name', action='store', type='string',
111
help='name of S3 bucket (only needed if creating new PDB)')
112
self.options, self.args = self.parser.parse_args()
113
self.prog_name = sys.argv[0]
115
def print_command_help(self):
117
for key in self.Commands.keys():
118
print ' %s\t\t%s' % (key, self.Commands[key])
121
if not self.options.input_path:
122
self.parser.error('No path provided')
123
if not os.path.isdir(self.options.input_path):
124
self.parser.error('Invalid path (%s)' % self.options.input_path)
125
if not self.options.pdb_name:
126
self.parser.error('No PDB name provided')
127
s = Server(self.options.input_path, self.options.pdb_name,
128
self.options.bucket_name, self.options.domain_name)
131
c = Client('localhost')
134
if self.options.help_commands:
135
self.print_command_help()
137
if len(self.args) == 0:
138
if not self.options.input_path:
139
self.parser.error('No path provided')
140
if not os.path.isdir(self.options.input_path):
141
self.parser.error('Invalid path (%s)' % self.options.input_path)
142
if not self.options.pdb_name:
143
self.parser.error('No PDB name provided')
144
server_command = '%s -p %s -i %s' % (self.prog_name, self.options.pdb_name, self.options.input_path)
145
if self.options.bucket_name:
146
server_command += ' -b %s' % self.options.bucket_name
147
server_command += ' server'
148
client_command = '%s client' % self.prog_name
149
server = subprocess.Popen(server_command, shell=True)
150
print 'server pid: %s' % server.pid
153
for i in range(0, self.options.num_processes):
154
client = subprocess.Popen(client_command, shell=True)
155
clients.append(client)
156
print 'waiting for clients to finish'
157
for client in clients:
159
os.kill(server.pid, signal.SIGTERM)
160
elif len(self.args) == 1:
161
self.command = self.args[0]
162
if hasattr(self, 'do_%s' % self.command):
163
method = getattr(self, 'do_%s' % self.command)
166
self.parser.error('command (%s) not recognized' % self.command)
168
self.parser.error('unrecognized commands')
170
if __name__ == "__main__":