1
# Copyright (c) 2010 Spotify AB
2
# Copyright (c) 2010-2011 Yelp
4
# Permission is hereby granted, free of charge, to any person obtaining a
5
# copy of this software and associated documentation files (the
6
# "Software"), to deal in the Software without restriction, including
7
# without limitation the rights to use, copy, modify, merge, publish, dis-
8
# tribute, sublicense, and/or sell copies of the Software, and to permit
9
# persons to whom the Software is furnished to do so, subject to the fol-
12
# The above copyright notice and this permission notice shall be included
13
# in all copies or substantial portions of the Software.
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
25
Jobflow Step base class
30
:return: URI to the jar
32
raise NotImplemented()
37
:return: List of arguments for the step
39
raise NotImplemented()
44
:return: The main class name
46
raise NotImplemented()
53
def __init__(self, name, jar, main_class=None,
54
action_on_failure='TERMINATE_JOB_FLOW', step_args=None):
56
A elastic mapreduce step that executes a jar
59
:param name: The name of the step
61
:param jar: S3 URI to the Jar file
63
:param main_class: The class to execute in the jar
64
:type action_on_failure: str
65
:param action_on_failure: An action, defined in the EMR docs to take on failure.
66
:type step_args: list(str)
67
:param step_args: A list of arguments to pass to the step
71
self._main_class = main_class
72
self.action_on_failure = action_on_failure
74
if isinstance(step_args, basestring):
75
step_args = [step_args]
77
self.step_args = step_args
86
args.extend(self.step_args)
91
return self._main_class
94
class StreamingStep(Step):
98
def __init__(self, name, mapper, reducer=None,
99
action_on_failure='TERMINATE_JOB_FLOW',
100
cache_files=None, cache_archives=None,
101
step_args=None, input=None, output=None,
102
jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar'):
104
A hadoop streaming elastic mapreduce step
107
:param name: The name of the step
109
:param mapper: The mapper URI
111
:param reducer: The reducer URI
112
:type action_on_failure: str
113
:param action_on_failure: An action, defined in the EMR docs to take on failure.
114
:type cache_files: list(str)
115
:param cache_files: A list of cache files to be bundled with the job
116
:type cache_archives: list(str)
117
:param cache_archives: A list of jar archives to be bundled with the job
118
:type step_args: list(str)
119
:param step_args: A list of arguments to pass to the step
120
:type input: str or a list of str
121
:param input: The input uri
123
:param output: The output uri
125
:param jar: The hadoop streaming jar. This can be either a local path on the master node, or an s3:// URI.
129
self.reducer = reducer
130
self.action_on_failure = action_on_failure
131
self.cache_files = cache_files
132
self.cache_archives = cache_archives
137
if isinstance(step_args, basestring):
138
step_args = [step_args]
140
self.step_args = step_args
145
def main_class(self):
151
# put extra args BEFORE -mapper and -reducer so that e.g. -libjar
154
args.extend(self.step_args)
156
args.extend(['-mapper', self.mapper])
159
args.extend(['-reducer', self.reducer])
161
args.extend(['-jobconf', 'mapred.reduce.tasks=0'])
164
if isinstance(self.input, list):
165
for input in self.input:
166
args.extend(('-input', input))
168
args.extend(('-input', self.input))
170
args.extend(('-output', self.output))
173
for cache_file in self.cache_files:
174
args.extend(('-cacheFile', cache_file))
176
if self.cache_archives:
177
for cache_archive in self.cache_archives:
178
args.extend(('-cacheArchive', cache_archive))
183
return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cache_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r, jar=%r)' % (
184
self.__class__.__module__, self.__class__.__name__,
185
self.name, self.mapper, self.reducer, self.action_on_failure,
186
self.cache_files, self.cache_archives, self.step_args,
187
self.input, self.output, self._jar)