~corey.bryant/sahara/2014.2-merge

« back to all changes in this revision

Viewing changes to sahara/service/validations/edp/job_executor.py

  • Committer: Package Import Robot
  • Author(s): Thomas Goirand
  • Date: 2014-10-05 11:16:07 UTC
  • mfrom: (1.1.1)
  • Revision ID: package-import@ubuntu.com-20141005111607-1i2o10gppr8yos1e
Tags: 2014.2~rc1-1
* New upstream release.
* Added python-mysqldb as dependency.
* Updated (build-)depends for this release.
* Now using templated init script for sysv-rc, generated systemd unit and
  upstart jobs, using openstack-pkg-tools >= 13.
* Removed Use_auth_uri_parameter_from_config.patch applied upstream.
* Added doc-base registration file in sahara-doc.
* Standards-Version is now 3.9.6 (no change).

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (c) 2013 Mirantis Inc.
2
 
#
3
 
# Licensed under the Apache License, Version 2.0 (the "License");
4
 
# you may not use this file except in compliance with the License.
5
 
# You may obtain a copy of the License at
6
 
#
7
 
#    http://www.apache.org/licenses/LICENSE-2.0
8
 
#
9
 
# Unless required by applicable law or agreed to in writing, software
10
 
# distributed under the License is distributed on an "AS IS" BASIS,
11
 
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
 
# implied.
13
 
# See the License for the specific language governing permissions and
14
 
# limitations under the License.
15
 
 
16
 
import sahara.exceptions as ex
17
 
from sahara.service.edp import api
18
 
import sahara.service.validations.base as main_base
19
 
import sahara.service.validations.edp.base as b
20
 
from sahara.utils import edp
21
 
 
22
 
JOB_EXEC_SCHEMA = {
23
 
    "type": "object",
24
 
    "properties": {
25
 
        "input_id": {
26
 
            "type": "string",
27
 
            "format": "uuid",
28
 
        },
29
 
        "output_id": {
30
 
            "type": "string",
31
 
            "format": "uuid",
32
 
        },
33
 
        "cluster_id": {
34
 
            "type": "string",
35
 
            "format": "uuid",
36
 
        },
37
 
        "job_configs": b.job_configs,
38
 
    },
39
 
    "additionalProperties": False,
40
 
    "required": [
41
 
        "cluster_id"
42
 
    ]
43
 
}
44
 
 
45
 
 
46
 
def _is_main_class_present(data):
47
 
    return data and 'edp.java.main_class' in data.get('job_configs',
48
 
                                                      {}).get('configs', {})
49
 
 
50
 
 
51
 
def _streaming_present(data):
52
 
    try:
53
 
        streaming = set(('edp.streaming.mapper',
54
 
                         'edp.streaming.reducer'))
55
 
        configs = set(data['job_configs']['configs'])
56
 
        return streaming.intersection(configs) == streaming
57
 
    except Exception:
58
 
        return False
59
 
 
60
 
 
61
 
def check_job_executor(data, job_id):
62
 
    job = api.get_job(job_id)
63
 
    job_type, subtype = edp.split_job_type(job.type)
64
 
 
65
 
    # Check if cluster contains Oozie service to run job
66
 
    main_base.check_edp_job_support(data['cluster_id'])
67
 
 
68
 
    # All types except Java/Spark require input and output objects
69
 
    # and Java/Spark require main class
70
 
    if job_type in [edp.JOB_TYPE_JAVA, edp.JOB_TYPE_SPARK]:
71
 
        if not _is_main_class_present(data):
72
 
            raise ex.InvalidDataException('%s job must '
73
 
                                          'specify edp.java.main_class'
74
 
                                          % job.type)
75
 
    else:
76
 
        if not ('input_id' in data and 'output_id' in data):
77
 
            raise ex.InvalidDataException("%s job requires 'input_id' "
78
 
                                          "and 'output_id'" % job.type)
79
 
 
80
 
        b.check_data_source_exists(data['input_id'])
81
 
        b.check_data_source_exists(data['output_id'])
82
 
 
83
 
        b.check_data_sources_are_different(data['input_id'], data['output_id'])
84
 
 
85
 
        if job_type == edp.JOB_TYPE_MAPREDUCE and (
86
 
                subtype == edp.JOB_SUBTYPE_STREAMING
87
 
                and not _streaming_present(data)):
88
 
            raise ex.InvalidDataException("%s job "
89
 
                                          "must specify streaming mapper "
90
 
                                          "and reducer" % job.type)
91
 
 
92
 
    main_base.check_cluster_exists(data['cluster_id'])