1
# Copyright (c) 2013 Mirantis Inc.
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
16
import sahara.exceptions as ex
17
from sahara.service.edp import api
18
import sahara.service.validations.base as main_base
19
import sahara.service.validations.edp.base as b
20
from sahara.utils import edp
37
"job_configs": b.job_configs,
39
"additionalProperties": False,
46
def _is_main_class_present(data):
47
return data and 'edp.java.main_class' in data.get('job_configs',
48
{}).get('configs', {})
51
def _streaming_present(data):
53
streaming = set(('edp.streaming.mapper',
54
'edp.streaming.reducer'))
55
configs = set(data['job_configs']['configs'])
56
return streaming.intersection(configs) == streaming
61
def check_job_executor(data, job_id):
62
job = api.get_job(job_id)
63
job_type, subtype = edp.split_job_type(job.type)
65
# Check if cluster contains Oozie service to run job
66
main_base.check_edp_job_support(data['cluster_id'])
68
# All types except Java/Spark require input and output objects
69
# and Java/Spark require main class
70
if job_type in [edp.JOB_TYPE_JAVA, edp.JOB_TYPE_SPARK]:
71
if not _is_main_class_present(data):
72
raise ex.InvalidDataException('%s job must '
73
'specify edp.java.main_class'
76
if not ('input_id' in data and 'output_id' in data):
77
raise ex.InvalidDataException("%s job requires 'input_id' "
78
"and 'output_id'" % job.type)
80
b.check_data_source_exists(data['input_id'])
81
b.check_data_source_exists(data['output_id'])
83
b.check_data_sources_are_different(data['input_id'], data['output_id'])
85
if job_type == edp.JOB_TYPE_MAPREDUCE and (
86
subtype == edp.JOB_SUBTYPE_STREAMING
87
and not _streaming_present(data)):
88
raise ex.InvalidDataException("%s job "
89
"must specify streaming mapper "
90
"and reducer" % job.type)
92
main_base.check_cluster_exists(data['cluster_id'])