~ubuntu-branches/ubuntu/vivid/sahara/vivid-proposed

« back to all changes in this revision

Viewing changes to sahara/service/validations/edp/job_executor.py

  • Committer: Package Import Robot
  • Author(s): Thomas Goirand
  • Date: 2014-09-24 16:34:46 UTC
  • Revision ID: package-import@ubuntu.com-20140924163446-8gu3zscu5e3n9lr2
Tags: upstream-2014.2~b3
ImportĀ upstreamĀ versionĀ 2014.2~b3

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2013 Mirantis Inc.
 
2
#
 
3
# Licensed under the Apache License, Version 2.0 (the "License");
 
4
# you may not use this file except in compliance with the License.
 
5
# You may obtain a copy of the License at
 
6
#
 
7
#    http://www.apache.org/licenses/LICENSE-2.0
 
8
#
 
9
# Unless required by applicable law or agreed to in writing, software
 
10
# distributed under the License is distributed on an "AS IS" BASIS,
 
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
12
# implied.
 
13
# See the License for the specific language governing permissions and
 
14
# limitations under the License.
 
15
 
 
16
import sahara.exceptions as ex
 
17
from sahara.service.edp import api
 
18
import sahara.service.validations.base as main_base
 
19
import sahara.service.validations.edp.base as b
 
20
from sahara.utils import edp
 
21
 
 
22
JOB_EXEC_SCHEMA = {
 
23
    "type": "object",
 
24
    "properties": {
 
25
        "input_id": {
 
26
            "type": "string",
 
27
            "format": "uuid",
 
28
        },
 
29
        "output_id": {
 
30
            "type": "string",
 
31
            "format": "uuid",
 
32
        },
 
33
        "cluster_id": {
 
34
            "type": "string",
 
35
            "format": "uuid",
 
36
        },
 
37
        "job_configs": b.job_configs,
 
38
    },
 
39
    "additionalProperties": False,
 
40
    "required": [
 
41
        "cluster_id"
 
42
    ]
 
43
}
 
44
 
 
45
 
 
46
def _is_main_class_present(data):
 
47
    return data and 'edp.java.main_class' in data.get('job_configs',
 
48
                                                      {}).get('configs', {})
 
49
 
 
50
 
 
51
def _streaming_present(data):
 
52
    try:
 
53
        streaming = set(('edp.streaming.mapper',
 
54
                         'edp.streaming.reducer'))
 
55
        configs = set(data['job_configs']['configs'])
 
56
        return streaming.intersection(configs) == streaming
 
57
    except Exception:
 
58
        return False
 
59
 
 
60
 
 
61
def check_job_executor(data, job_id):
 
62
    job = api.get_job(job_id)
 
63
    job_type, subtype = edp.split_job_type(job.type)
 
64
 
 
65
    # Check if cluster contains Oozie service to run job
 
66
    main_base.check_edp_job_support(data['cluster_id'])
 
67
 
 
68
    # All types except Java/Spark require input and output objects
 
69
    # and Java/Spark require main class
 
70
    if job_type in [edp.JOB_TYPE_JAVA, edp.JOB_TYPE_SPARK]:
 
71
        if not _is_main_class_present(data):
 
72
            raise ex.InvalidDataException('%s job must '
 
73
                                          'specify edp.java.main_class'
 
74
                                          % job.type)
 
75
    else:
 
76
        if not ('input_id' in data and 'output_id' in data):
 
77
            raise ex.InvalidDataException("%s job requires 'input_id' "
 
78
                                          "and 'output_id'" % job.type)
 
79
 
 
80
        b.check_data_source_exists(data['input_id'])
 
81
        b.check_data_source_exists(data['output_id'])
 
82
 
 
83
        b.check_data_sources_are_different(data['input_id'], data['output_id'])
 
84
 
 
85
        if job_type == edp.JOB_TYPE_MAPREDUCE and (
 
86
                subtype == edp.JOB_SUBTYPE_STREAMING
 
87
                and not _streaming_present(data)):
 
88
            raise ex.InvalidDataException("%s job "
 
89
                                          "must specify streaming mapper "
 
90
                                          "and reducer" % job.type)
 
91
 
 
92
    main_base.check_cluster_exists(data['cluster_id'])