~0x44/nova/bug838466

« back to all changes in this revision

Viewing changes to vendor/boto/boto/emr/step.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2010 Spotify AB
 
2
#
 
3
# Permission is hereby granted, free of charge, to any person obtaining a
 
4
# copy of this software and associated documentation files (the
 
5
# "Software"), to deal in the Software without restriction, including
 
6
# without limitation the rights to use, copy, modify, merge, publish, dis-
 
7
# tribute, sublicense, and/or sell copies of the Software, and to permit
 
8
# persons to whom the Software is furnished to do so, subject to the fol-
 
9
# lowing conditions:
 
10
#
 
11
# The above copyright notice and this permission notice shall be included
 
12
# in all copies or substantial portions of the Software.
 
13
#
 
14
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 
15
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
 
16
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
 
17
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
 
18
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 
19
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 
20
# IN THE SOFTWARE.
 
21
 
 
22
class Step(object):
 
23
    """
 
24
    Jobflow Step base class
 
25
    """
 
26
    def jar(self):
 
27
        """
 
28
        :rtype: str
 
29
        :return: URI to the jar
 
30
        """
 
31
        raise NotImplemented()
 
32
 
 
33
    def args(self):
 
34
        """
 
35
        :rtype: list(str)
 
36
        :return: List of arguments for the step
 
37
        """
 
38
        raise NotImplemented()
 
39
 
 
40
    def main_class(self):
 
41
        """
 
42
        :rtype: str
 
43
        :return: The main class name
 
44
        """
 
45
        raise NotImplemented()
 
46
 
 
47
 
 
48
class JarStep(Step):
 
49
    """
 
50
    Custom jar step
 
51
    """
 
52
    def __init__(self, name, jar, main_class,
 
53
                 action_on_failure='TERMINATE_JOB_FLOW', step_args=None):
 
54
        """
 
55
        A elastic mapreduce step that executes a jar
 
56
 
 
57
        :type name: str
 
58
        :param name: The name of the step
 
59
        :type jar: str
 
60
        :param jar: S3 URI to the Jar file
 
61
        :type main_class: str
 
62
        :param main_class: The class to execute in the jar
 
63
        :type action_on_failure: str
 
64
        :param action_on_failure: An action, defined in the EMR docs to take on failure.
 
65
        :type step_args: list(str)
 
66
        :param step_args: A list of arguments to pass to the step
 
67
        """
 
68
        self.name = name
 
69
        self._jar = jar
 
70
        self._main_class = main_class
 
71
        self.action_on_failure = action_on_failure
 
72
 
 
73
        if isinstance(step_args, basestring):
 
74
            step_args = [step_args]
 
75
 
 
76
        self.step_args = step_args
 
77
 
 
78
    def jar(self):
 
79
        return self._jar
 
80
 
 
81
    def args(self):
 
82
        args = []
 
83
 
 
84
        if self.step_args:
 
85
            args.extend(self.step_args)
 
86
 
 
87
        return args
 
88
 
 
89
    def main_class(self):
 
90
        return self._main_class
 
91
 
 
92
 
 
93
class StreamingStep(Step):
 
94
    """
 
95
    Hadoop streaming step
 
96
    """
 
97
    def __init__(self, name, mapper, reducer,
 
98
                 action_on_failure='TERMINATE_JOB_FLOW',
 
99
                 cache_files=None, cache_archives=None,
 
100
                 step_args=None, input=None, output=None):
 
101
        """
 
102
        A hadoop streaming elastic mapreduce step
 
103
 
 
104
        :type name: str
 
105
        :param name: The name of the step
 
106
        :type mapper: str
 
107
        :param mapper: The mapper URI
 
108
        :type reducer: str
 
109
        :param reducer: The reducer URI
 
110
        :type action_on_failure: str
 
111
        :param action_on_failure: An action, defined in the EMR docs to take on failure.
 
112
        :type cache_files: list(str)
 
113
        :param cache_files: A list of cache files to be bundled with the job
 
114
        :type cache_archives: list(str)
 
115
        :param cache_archives: A list of jar archives to be bundled with the job
 
116
        :type step_args: list(str)
 
117
        :param step_args: A list of arguments to pass to the step
 
118
        :type input: str
 
119
        :param input: The input uri
 
120
        :type output: str
 
121
        :param output: The output uri
 
122
        """
 
123
        self.name = name
 
124
        self.mapper = mapper
 
125
        self.reducer = reducer
 
126
        self.action_on_failure = action_on_failure
 
127
        self.cache_files = cache_files
 
128
        self.cache_archives = cache_archives
 
129
        self.input = input
 
130
        self.output = output
 
131
 
 
132
        if isinstance(step_args, basestring):
 
133
            step_args = [step_args]
 
134
 
 
135
        self.step_args = step_args
 
136
 
 
137
    def jar(self):
 
138
        return '/home/hadoop/contrib/streaming/hadoop-0.18-streaming.jar'
 
139
 
 
140
    def main_class(self):
 
141
        return None
 
142
 
 
143
    def args(self):
 
144
        args = ['-mapper', self.mapper,
 
145
                '-reducer', self.reducer]
 
146
 
 
147
        if self.input:
 
148
            if isinstance(self.input, list):
 
149
                for input in self.input:
 
150
                    args.extend(('-input', input))
 
151
            else:
 
152
                args.extend(('-input', self.input))
 
153
        if self.output:
 
154
            args.extend(('-output', self.output))
 
155
 
 
156
        if self.cache_files:
 
157
            for cache_file in self.cache_files:
 
158
                args.extend(('-cacheFile', cache_file))
 
159
 
 
160
        if self.cache_archives:
 
161
           for cache_archive in self.cache_archives:
 
162
                args.extend(('-cacheArchive', cache_archive))
 
163
 
 
164
        if self.step_args:
 
165
            args.extend(self.step_args)
 
166
 
 
167
        return args
 
168