~ubuntu-branches/ubuntu/trusty/python-boto/trusty

« back to all changes in this revision

Viewing changes to boto/emr/step.py

  • Committer: Package Import Robot
  • Author(s): Eric Evans
  • Date: 2011-11-13 11:58:40 UTC
  • mfrom: (14.1.1 experimental)
  • Revision ID: package-import@ubuntu.com-20111113115840-ckzyt3h17uh8s41y
Tags: 2.0-2
Promote new upstream to unstable (Closes: #638931).

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2010 Spotify AB
 
2
# Copyright (c) 2010-2011 Yelp
 
3
#
 
4
# Permission is hereby granted, free of charge, to any person obtaining a
 
5
# copy of this software and associated documentation files (the
 
6
# "Software"), to deal in the Software without restriction, including
 
7
# without limitation the rights to use, copy, modify, merge, publish, dis-
 
8
# tribute, sublicense, and/or sell copies of the Software, and to permit
 
9
# persons to whom the Software is furnished to do so, subject to the fol-
 
10
# lowing conditions:
 
11
#
 
12
# The above copyright notice and this permission notice shall be included
 
13
# in all copies or substantial portions of the Software.
 
14
#
 
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 
16
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
 
17
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
 
18
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
 
19
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 
20
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 
21
# IN THE SOFTWARE.
 
22
 
 
23
class Step(object):
 
24
    """
 
25
    Jobflow Step base class
 
26
    """
 
27
    def jar(self):
 
28
        """
 
29
        :rtype: str
 
30
        :return: URI to the jar
 
31
        """
 
32
        raise NotImplemented()
 
33
 
 
34
    def args(self):
 
35
        """
 
36
        :rtype: list(str)
 
37
        :return: List of arguments for the step
 
38
        """
 
39
        raise NotImplemented()
 
40
 
 
41
    def main_class(self):
 
42
        """
 
43
        :rtype: str
 
44
        :return: The main class name
 
45
        """
 
46
        raise NotImplemented()
 
47
 
 
48
 
 
49
class JarStep(Step):
 
50
    """
 
51
    Custom jar step
 
52
    """
 
53
    def __init__(self, name, jar, main_class=None,
 
54
                 action_on_failure='TERMINATE_JOB_FLOW', step_args=None):
 
55
        """
 
56
        A elastic mapreduce step that executes a jar
 
57
 
 
58
        :type name: str
 
59
        :param name: The name of the step
 
60
        :type jar: str
 
61
        :param jar: S3 URI to the Jar file
 
62
        :type main_class: str
 
63
        :param main_class: The class to execute in the jar
 
64
        :type action_on_failure: str
 
65
        :param action_on_failure: An action, defined in the EMR docs to take on failure.
 
66
        :type step_args: list(str)
 
67
        :param step_args: A list of arguments to pass to the step
 
68
        """
 
69
        self.name = name
 
70
        self._jar = jar
 
71
        self._main_class = main_class
 
72
        self.action_on_failure = action_on_failure
 
73
 
 
74
        if isinstance(step_args, basestring):
 
75
            step_args = [step_args]
 
76
 
 
77
        self.step_args = step_args
 
78
 
 
79
    def jar(self):
 
80
        return self._jar
 
81
 
 
82
    def args(self):
 
83
        args = []
 
84
 
 
85
        if self.step_args:
 
86
            args.extend(self.step_args)
 
87
 
 
88
        return args
 
89
 
 
90
    def main_class(self):
 
91
        return self._main_class
 
92
 
 
93
 
 
94
class StreamingStep(Step):
 
95
    """
 
96
    Hadoop streaming step
 
97
    """
 
98
    def __init__(self, name, mapper, reducer=None,
 
99
                 action_on_failure='TERMINATE_JOB_FLOW',
 
100
                 cache_files=None, cache_archives=None,
 
101
                 step_args=None, input=None, output=None,
 
102
                 jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar'):
 
103
        """
 
104
        A hadoop streaming elastic mapreduce step
 
105
 
 
106
        :type name: str
 
107
        :param name: The name of the step
 
108
        :type mapper: str
 
109
        :param mapper: The mapper URI
 
110
        :type reducer: str
 
111
        :param reducer: The reducer URI
 
112
        :type action_on_failure: str
 
113
        :param action_on_failure: An action, defined in the EMR docs to take on failure.
 
114
        :type cache_files: list(str)
 
115
        :param cache_files: A list of cache files to be bundled with the job
 
116
        :type cache_archives: list(str)
 
117
        :param cache_archives: A list of jar archives to be bundled with the job
 
118
        :type step_args: list(str)
 
119
        :param step_args: A list of arguments to pass to the step
 
120
        :type input: str or a list of str
 
121
        :param input: The input uri
 
122
        :type output: str
 
123
        :param output: The output uri
 
124
        :type jar: str
 
125
        :param jar: The hadoop streaming jar. This can be either a local path on the master node, or an s3:// URI.
 
126
        """
 
127
        self.name = name
 
128
        self.mapper = mapper
 
129
        self.reducer = reducer
 
130
        self.action_on_failure = action_on_failure
 
131
        self.cache_files = cache_files
 
132
        self.cache_archives = cache_archives
 
133
        self.input = input
 
134
        self.output = output
 
135
        self._jar = jar
 
136
 
 
137
        if isinstance(step_args, basestring):
 
138
            step_args = [step_args]
 
139
 
 
140
        self.step_args = step_args
 
141
 
 
142
    def jar(self):
 
143
        return self._jar
 
144
 
 
145
    def main_class(self):
 
146
        return None
 
147
 
 
148
    def args(self):
 
149
        args = []
 
150
 
 
151
        # put extra args BEFORE -mapper and -reducer so that e.g. -libjar
 
152
        # will work
 
153
        if self.step_args:
 
154
            args.extend(self.step_args)
 
155
 
 
156
        args.extend(['-mapper', self.mapper])
 
157
 
 
158
        if self.reducer:
 
159
            args.extend(['-reducer', self.reducer])
 
160
        else:
 
161
            args.extend(['-jobconf', 'mapred.reduce.tasks=0'])
 
162
 
 
163
        if self.input:
 
164
            if isinstance(self.input, list):
 
165
                for input in self.input:
 
166
                    args.extend(('-input', input))
 
167
            else:
 
168
                args.extend(('-input', self.input))
 
169
        if self.output:
 
170
            args.extend(('-output', self.output))
 
171
 
 
172
        if self.cache_files:
 
173
            for cache_file in self.cache_files:
 
174
                args.extend(('-cacheFile', cache_file))
 
175
 
 
176
        if self.cache_archives:
 
177
           for cache_archive in self.cache_archives:
 
178
                args.extend(('-cacheArchive', cache_archive))
 
179
 
 
180
        return args
 
181
 
 
182
    def __repr__(self):
 
183
        return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cache_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r, jar=%r)' % (
 
184
            self.__class__.__module__, self.__class__.__name__,
 
185
            self.name, self.mapper, self.reducer, self.action_on_failure,
 
186
            self.cache_files, self.cache_archives, self.step_args,
 
187
            self.input, self.output, self._jar)