1
# Copyright (c) 2010 Spotify AB
3
# Permission is hereby granted, free of charge, to any person obtaining a
4
# copy of this software and associated documentation files (the
5
# "Software"), to deal in the Software without restriction, including
6
# without limitation the rights to use, copy, modify, merge, publish, dis-
7
# tribute, sublicense, and/or sell copies of the Software, and to permit
8
# persons to whom the Software is furnished to do so, subject to the fol-
11
# The above copyright notice and this permission notice shall be included
12
# in all copies or substantial portions of the Software.
14
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
24
Jobflow Step base class
29
:return: URI to the jar
31
raise NotImplemented()
36
:return: List of arguments for the step
38
raise NotImplemented()
43
:return: The main class name
45
raise NotImplemented()
52
def __init__(self, name, jar, main_class,
53
action_on_failure='TERMINATE_JOB_FLOW', step_args=None):
55
A elastic mapreduce step that executes a jar
58
:param name: The name of the step
60
:param jar: S3 URI to the Jar file
62
:param main_class: The class to execute in the jar
63
:type action_on_failure: str
64
:param action_on_failure: An action, defined in the EMR docs to take on failure.
65
:type step_args: list(str)
66
:param step_args: A list of arguments to pass to the step
70
self._main_class = main_class
71
self.action_on_failure = action_on_failure
73
if isinstance(step_args, basestring):
74
step_args = [step_args]
76
self.step_args = step_args
85
args.extend(self.step_args)
90
return self._main_class
93
class StreamingStep(Step):
97
def __init__(self, name, mapper, reducer,
98
action_on_failure='TERMINATE_JOB_FLOW',
99
cache_files=None, cache_archives=None,
100
step_args=None, input=None, output=None):
102
A hadoop streaming elastic mapreduce step
105
:param name: The name of the step
107
:param mapper: The mapper URI
109
:param reducer: The reducer URI
110
:type action_on_failure: str
111
:param action_on_failure: An action, defined in the EMR docs to take on failure.
112
:type cache_files: list(str)
113
:param cache_files: A list of cache files to be bundled with the job
114
:type cache_archives: list(str)
115
:param cache_archives: A list of jar archives to be bundled with the job
116
:type step_args: list(str)
117
:param step_args: A list of arguments to pass to the step
119
:param input: The input uri
121
:param output: The output uri
125
self.reducer = reducer
126
self.action_on_failure = action_on_failure
127
self.cache_files = cache_files
128
self.cache_archives = cache_archives
132
if isinstance(step_args, basestring):
133
step_args = [step_args]
135
self.step_args = step_args
138
return '/home/hadoop/contrib/streaming/hadoop-0.18-streaming.jar'
140
def main_class(self):
144
args = ['-mapper', self.mapper,
145
'-reducer', self.reducer]
148
if isinstance(self.input, list):
149
for input in self.input:
150
args.extend(('-input', input))
152
args.extend(('-input', self.input))
154
args.extend(('-output', self.output))
157
for cache_file in self.cache_files:
158
args.extend(('-cacheFile', cache_file))
160
if self.cache_archives:
161
for cache_archive in self.cache_archives:
162
args.extend(('-cacheArchive', cache_archive))
165
args.extend(self.step_args)