1
# Copyright (c) 2013 Hortonworks, Inc.
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
16
from sahara.i18n import _
17
from sahara.openstack.common import jsonutils as json
18
from sahara.openstack.common import log as logging
19
from sahara.plugins.general import exceptions as ex
20
from sahara.plugins.hdp.versions import versionhandlerfactory as vhf
23
LOG = logging.getLogger(__name__)
27
def __init__(self, config, version='1.3.2'):
28
self._config_template = config
30
self.configurations = {}
32
self.version = version
33
self.user_input_handlers = {}
35
cluster_template = json.loads(config)
36
self._parse_services(cluster_template)
37
self._parse_configurations(cluster_template)
38
self._process_node_groups(template_json=cluster_template)
40
def create_operational_config(self, cluster, user_inputs,
42
if scaled_groups is None:
44
self._determine_deployed_services(cluster)
45
self._process_node_groups(cluster=cluster)
47
for ng_id in scaled_groups:
48
existing = next(group for group in self.node_groups.values()
50
existing.count = scaled_groups[ng_id]
52
self.validate_node_groups(cluster)
53
self._finalize_ng_components()
54
self._parse_configurations(json.loads(self._config_template))
55
self._process_user_inputs(user_inputs)
56
self._replace_config_tokens()
58
def scale(self, updated_groups):
59
for ng_id in updated_groups:
60
existing = next(group for group in self.node_groups.values()
62
existing.count = updated_groups[ng_id]
64
def validate_node_groups(self, cluster):
65
for service in self.services:
67
service.validate(self, cluster)
68
elif service.is_mandatory():
69
raise ex.RequiredServiceMissingException(service.name)
71
def get_deployed_configurations(self):
73
for service in self.services:
75
configs |= service.configurations
79
def determine_component_hosts(self, component):
81
for ng in self.node_groups.values():
82
if component in ng.components:
88
return NormalizedClusterConfig(self)
90
def get_deployed_node_group_count(self, name):
92
for ng in self.get_node_groups_containing_component(name):
97
def get_node_groups_containing_component(self, component):
98
found_node_groups = []
99
for ng in self.node_groups.values():
100
if component in ng.components:
101
found_node_groups.append(ng)
103
return found_node_groups
105
def get_components_for_type(self, type):
107
for service in self.services:
108
for component in service.components:
109
if component.type == type:
110
components.add(component.name)
114
def _parse_services(self, template_json):
115
handler = (vhf.VersionHandlerFactory.get_instance().
116
get_version_handler(self.version))
117
sp = handler.get_services_processor()
118
for s in template_json['services']:
120
service = sp.create_service(name)
122
self.services.append(service)
123
for c in s['components']:
124
component = Component(c['name'], c['type'], c['cardinality'])
125
service.add_component(component)
129
user = User(u['name'], u['password'], u['groups'])
130
service.add_user(user)
132
configs = self._parse_configurations(s)
133
for config in configs:
134
service.add_configuration(config)
136
def _parse_configurations(self, template_json):
138
for config in template_json['configurations']:
140
name = config['name']
141
config_names.append(name)
142
if name in self.configurations:
143
config_props = self.configurations[name]
145
self.configurations[name] = config_props
147
if 'properties' in config:
148
for prop in config['properties']:
149
config_props[prop['name']] = prop['value']
153
def _process_node_groups(self, template_json=None, cluster=None):
154
# get node_groups from config
155
if template_json and not cluster:
156
for group in template_json['host_role_mappings']:
157
node_group = NodeGroup(group['name'].lower())
158
for component in group['components']:
159
node_group.add_component(component['name'])
160
for host in group['hosts']:
161
if 'predicate' in host:
162
node_group.predicate = host['predicate']
163
if 'cardinality' in host:
164
node_group.cardinality = host['cardinality']
165
if 'default_count' in host:
166
node_group.count = host['default_count']
167
self.node_groups[node_group.name] = node_group
170
self.node_groups = {}
171
node_groups = cluster.node_groups
172
for ng in node_groups:
173
node_group = NodeGroup(ng.name.lower())
174
node_group.count = ng.count
175
node_group.id = ng.id
176
node_group.components = ng.node_processes[:]
177
node_group.ng_storage_paths = ng.storage_paths()
178
for instance in ng.instances:
179
node_group.instances.add(Instance(instance))
180
self.node_groups[node_group.name] = node_group
182
def _determine_deployed_services(self, cluster):
183
for ng in cluster.node_groups:
184
for service in self.services:
187
for sc in service.components:
188
if sc.name in ng.node_processes:
189
service.deployed = True
190
service.register_user_input_handlers(
191
self.user_input_handlers)
194
def _process_user_inputs(self, user_inputs):
195
for ui in user_inputs:
196
user_input_handler = self.user_input_handlers.get(
197
'{0}/{1}'.format(ui.config.tag, ui.config.name),
198
self._default_user_input_handler)
200
user_input_handler(ui, self.configurations)
202
def _replace_config_tokens(self):
203
for service in self.services:
205
service.finalize_configuration(self)
207
def _finalize_ng_components(self):
208
for service in self.services:
210
service.finalize_ng_components(self)
212
def _default_user_input_handler(self, user_input, configurations):
213
config_map = configurations[user_input.config.tag]
214
config_map[user_input.config.name] = user_input.value
218
def __init__(self, name, component_type, cardinality):
220
self.type = component_type
221
self.cardinality = cardinality
225
def __init__(self, name):
229
self.predicate = None
230
self.cardinality = None
232
self.instances = set()
233
self.ng_storage_paths = []
235
def add_component(self, component):
236
self.components.append(component)
238
def storage_paths(self):
239
return self.ng_storage_paths
243
def __init__(self, name, password, groups):
245
self.password = password
250
def __init__(self, sahara_instance):
251
self.inst_fqdn = sahara_instance.fqdn()
252
self.management_ip = sahara_instance.management_ip
253
self.internal_ip = sahara_instance.internal_ip
254
self.sahara_instance = sahara_instance
257
return self.inst_fqdn
260
return self.sahara_instance.remote()
263
return hash(self.fqdn())
265
def __eq__(self, other):
266
return self.fqdn() == other.fqdn()
269
class NormalizedClusterConfig():
270
def __init__(self, cluster_spec):
271
self.hadoop_version = cluster_spec.version
272
self.cluster_configs = []
273
self.node_groups = []
274
self.handler = (vhf.VersionHandlerFactory.get_instance().
275
get_version_handler(self.hadoop_version))
277
self._parse_configurations(cluster_spec.configurations)
278
self._parse_node_groups(cluster_spec.node_groups)
280
def _parse_configurations(self, configurations):
281
for config_name, properties in configurations.items():
282
for prop, value in properties.items():
283
target = self._get_property_target(prop)
285
prop_type = self._get_property_type(prop, value)
286
# TODO(sdpeidel): should we supply a scope?
287
self.cluster_configs.append(
288
NormalizedConfigEntry(NormalizedConfig(
289
prop, prop_type, value, target, 'cluster'),
292
def _parse_node_groups(self, node_groups):
293
for node_group in node_groups.values():
294
self.node_groups.append(NormalizedNodeGroup(node_group))
296
def _get_property_target(self, prop):
297
return self.handler.get_applicable_target(prop)
299
def _get_property_type(self, prop, value):
300
# TODO(jspeidel): seems that all numeric prop values in default config
301
# are encoded as strings. This may be incorrect.
302
# TODO(jspeidel): should probably analyze string value to determine if
304
# TODO(jspeidel): would then need to know whether Ambari expects a
305
# string or a numeric value
306
prop_type = type(value).__name__
307
# print 'Type: {0}'.format(prop_type)
308
if prop_type == 'str' or prop_type == 'unicode' or value == '':
310
elif prop_type == 'int':
312
elif prop_type == 'bool':
316
_("Could not determine property type for property "
317
"'%(property)s' with value: %(value)s") %
318
{"property": prop, "value": value})
321
class NormalizedConfig():
322
def __init__(self, name, config_type, default_value, target, scope):
324
self.description = None
325
self.type = config_type
326
self.default_value = default_value
327
self.is_optional = False
328
self.applicable_target = target
332
class NormalizedConfigEntry():
333
def __init__(self, config, value):
338
class NormalizedNodeGroup():
339
def __init__(self, node_group):
340
self.name = node_group.name
341
self.node_processes = node_group.components
342
self.node_configs = None
343
# TODO(jpseidel): should not have to specify img/flavor
345
# TODO(jmaron) the flavor will be set via an ambari blueprint setting,
346
# but that setting doesn't exist yet. It will be addressed by a bug
349
self.count = node_group.count
350
self.id = node_group.id