110
112
:type reducer: str
111
113
:param reducer: The reducer URI
112
114
:type combiner: str
113
:param combiner: The combiner URI. Only works for Hadoop 0.20 and later!
115
:param combiner: The combiner URI. Only works for Hadoop 0.20
114
117
:type action_on_failure: str
115
:param action_on_failure: An action, defined in the EMR docs to take on failure.
118
:param action_on_failure: An action, defined in the EMR docs to
116
120
:type cache_files: list(str)
117
121
:param cache_files: A list of cache files to be bundled with the job
118
122
:type cache_archives: list(str)
119
:param cache_archives: A list of jar archives to be bundled with the job
123
:param cache_archives: A list of jar archives to be bundled with
120
125
:type step_args: list(str)
121
126
:param step_args: A list of arguments to pass to the step
122
127
:type input: str or a list of str
191
197
self.name, self.mapper, self.reducer, self.action_on_failure,
192
198
self.cache_files, self.cache_archives, self.step_args,
193
199
self.input, self.output, self._jar)
202
class ScriptRunnerStep(JarStep):
204
ScriptRunnerJar = 's3n://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar'
206
def __init__(self, name, **kw):
207
JarStep.__init__(self, name, self.ScriptRunnerJar, **kw)
210
class PigBase(ScriptRunnerStep):
212
BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/pig/pig-script',
213
'--base-path', 's3n://us-east-1.elasticmapreduce/libs/pig/']
216
class InstallPigStep(PigBase):
218
Install pig on emr step
221
InstallPigName = 'Install Pig'
223
def __init__(self, pig_versions='latest'):
225
step_args.extend(self.BaseArgs)
226
step_args.extend(['--install-pig'])
227
step_args.extend(['--pig-versions', pig_versions])
228
ScriptRunnerStep.__init__(self, self.InstallPigName, step_args=step_args)
231
class PigStep(PigBase):
236
def __init__(self, name, pig_file, pig_versions='latest', pig_args=[]):
238
step_args.extend(self.BaseArgs)
239
step_args.extend(['--pig-versions', pig_versions])
240
step_args.extend(['--run-pig-script', '--args', '-f', pig_file])
241
step_args.extend(pig_args)
242
ScriptRunnerStep.__init__(self, name, step_args=step_args)
245
class HiveBase(ScriptRunnerStep):
247
BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/hive/hive-script',
248
'--base-path', 's3n://us-east-1.elasticmapreduce/libs/hive/']
251
class InstallHiveStep(HiveBase):
253
Install Hive on EMR step
255
InstallHiveName = 'Install Hive'
257
def __init__(self, hive_versions='latest', hive_site=None):
259
step_args.extend(self.BaseArgs)
260
step_args.extend(['--install-hive'])
261
step_args.extend(['--hive-versions', hive_versions])
262
if hive_site is not None:
263
step_args.extend(['--hive-site=%s' % hive_site])
264
ScriptRunnerStep.__init__(self, self.InstallHiveName,
268
class HiveStep(HiveBase):
273
def __init__(self, name, hive_file, hive_versions='latest',
276
step_args.extend(self.BaseArgs)
277
step_args.extend(['--hive-versions', hive_versions])
278
step_args.extend(['--run-hive-script', '--args', '-f', hive_file])
279
if hive_args is not None:
280
step_args.extend(hive_args)
281
ScriptRunnerStep.__init__(self, name, step_args=step_args)