~ubuntu-branches/ubuntu/trusty/python-boto/trusty

« back to all changes in this revision

Viewing changes to boto/emr/step.py

  • Committer: Package Import Robot
  • Author(s): Eric Evans
  • Date: 2013-05-10 23:38:14 UTC
  • mfrom: (1.1.10) (14.1.2 experimental)
  • Revision ID: package-import@ubuntu.com-20130510233814-701dvlop7xfh88i7
Tags: 2.9.2-1
New upstream release (Closes: #700743).

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21
21
# IN THE SOFTWARE.
22
22
 
 
23
 
23
24
class Step(object):
24
25
    """
25
26
    Jobflow Step base class
62
63
        :type main_class: str
63
64
        :param main_class: The class to execute in the jar
64
65
        :type action_on_failure: str
65
 
        :param action_on_failure: An action, defined in the EMR docs to take on failure.
 
66
        :param action_on_failure: An action, defined in the EMR docs to
 
67
            take on failure.
66
68
        :type step_args: list(str)
67
69
        :param step_args: A list of arguments to pass to the step
68
70
        """
110
112
        :type reducer: str
111
113
        :param reducer: The reducer URI
112
114
        :type combiner: str
113
 
        :param combiner: The combiner URI. Only works for Hadoop 0.20 and later!
 
115
        :param combiner: The combiner URI. Only works for Hadoop 0.20
 
116
            and later!
114
117
        :type action_on_failure: str
115
 
        :param action_on_failure: An action, defined in the EMR docs to take on failure.
 
118
        :param action_on_failure: An action, defined in the EMR docs to
 
119
            take on failure.
116
120
        :type cache_files: list(str)
117
121
        :param cache_files: A list of cache files to be bundled with the job
118
122
        :type cache_archives: list(str)
119
 
        :param cache_archives: A list of jar archives to be bundled with the job
 
123
        :param cache_archives: A list of jar archives to be bundled with
 
124
            the job
120
125
        :type step_args: list(str)
121
126
        :param step_args: A list of arguments to pass to the step
122
127
        :type input: str or a list of str
124
129
        :type output: str
125
130
        :param output: The output uri
126
131
        :type jar: str
127
 
        :param jar: The hadoop streaming jar. This can be either a local path on the master node, or an s3:// URI.
 
132
        :param jar: The hadoop streaming jar. This can be either a local
 
133
            path on the master node, or an s3:// URI.
128
134
        """
129
135
        self.name = name
130
136
        self.mapper = mapper
180
186
                args.extend(('-cacheFile', cache_file))
181
187
 
182
188
        if self.cache_archives:
183
 
           for cache_archive in self.cache_archives:
 
189
            for cache_archive in self.cache_archives:
184
190
                args.extend(('-cacheArchive', cache_archive))
185
191
 
186
192
        return args
191
197
            self.name, self.mapper, self.reducer, self.action_on_failure,
192
198
            self.cache_files, self.cache_archives, self.step_args,
193
199
            self.input, self.output, self._jar)
 
200
 
 
201
 
 
202
class ScriptRunnerStep(JarStep):
 
203
 
 
204
    ScriptRunnerJar = 's3n://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar'
 
205
 
 
206
    def __init__(self, name, **kw):
 
207
        JarStep.__init__(self, name, self.ScriptRunnerJar, **kw)
 
208
 
 
209
 
 
210
class PigBase(ScriptRunnerStep):
 
211
 
 
212
    BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/pig/pig-script',
 
213
                '--base-path', 's3n://us-east-1.elasticmapreduce/libs/pig/']
 
214
 
 
215
 
 
216
class InstallPigStep(PigBase):
 
217
    """
 
218
    Install pig on emr step
 
219
    """
 
220
 
 
221
    InstallPigName = 'Install Pig'
 
222
 
 
223
    def __init__(self, pig_versions='latest'):
 
224
        step_args = []
 
225
        step_args.extend(self.BaseArgs)
 
226
        step_args.extend(['--install-pig'])
 
227
        step_args.extend(['--pig-versions', pig_versions])
 
228
        ScriptRunnerStep.__init__(self, self.InstallPigName, step_args=step_args)
 
229
 
 
230
 
 
231
class PigStep(PigBase):
 
232
    """
 
233
    Pig script step
 
234
    """
 
235
 
 
236
    def __init__(self, name, pig_file, pig_versions='latest', pig_args=[]):
 
237
        step_args = []
 
238
        step_args.extend(self.BaseArgs)
 
239
        step_args.extend(['--pig-versions', pig_versions])
 
240
        step_args.extend(['--run-pig-script', '--args', '-f', pig_file])
 
241
        step_args.extend(pig_args)
 
242
        ScriptRunnerStep.__init__(self, name, step_args=step_args)
 
243
 
 
244
 
 
245
class HiveBase(ScriptRunnerStep):
 
246
 
 
247
    BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/hive/hive-script',
 
248
                '--base-path', 's3n://us-east-1.elasticmapreduce/libs/hive/']
 
249
 
 
250
 
 
251
class InstallHiveStep(HiveBase):
 
252
    """
 
253
    Install Hive on EMR step
 
254
    """
 
255
    InstallHiveName = 'Install Hive'
 
256
 
 
257
    def __init__(self, hive_versions='latest', hive_site=None):
 
258
        step_args = []
 
259
        step_args.extend(self.BaseArgs)
 
260
        step_args.extend(['--install-hive'])
 
261
        step_args.extend(['--hive-versions', hive_versions])
 
262
        if hive_site is not None:
 
263
            step_args.extend(['--hive-site=%s' % hive_site])
 
264
        ScriptRunnerStep.__init__(self, self.InstallHiveName,
 
265
                                  step_args=step_args)
 
266
 
 
267
 
 
268
class HiveStep(HiveBase):
 
269
    """
 
270
    Hive script step
 
271
    """
 
272
 
 
273
    def __init__(self, name, hive_file, hive_versions='latest',
 
274
                 hive_args=None):
 
275
        step_args = []
 
276
        step_args.extend(self.BaseArgs)
 
277
        step_args.extend(['--hive-versions', hive_versions])
 
278
        step_args.extend(['--run-hive-script', '--args', '-f', hive_file])
 
279
        if hive_args is not None:
 
280
            step_args.extend(hive_args)
 
281
        ScriptRunnerStep.__init__(self, name, step_args=step_args)