~ubuntu-branches/ubuntu/vivid/sahara/vivid-proposed

« back to all changes in this revision

Viewing changes to sahara/plugins/hdp/clusterspec.py

  • Committer: Package Import Robot
  • Author(s): Thomas Goirand
  • Date: 2014-09-24 16:34:46 UTC
  • Revision ID: package-import@ubuntu.com-20140924163446-8gu3zscu5e3n9lr2
Tags: upstream-2014.2~b3
ImportĀ upstreamĀ versionĀ 2014.2~b3

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2013 Hortonworks, Inc.
 
2
#
 
3
# Licensed under the Apache License, Version 2.0 (the "License");
 
4
# you may not use this file except in compliance with the License.
 
5
# You may obtain a copy of the License at
 
6
#
 
7
#    http://www.apache.org/licenses/LICENSE-2.0
 
8
#
 
9
# Unless required by applicable law or agreed to in writing, software
 
10
# distributed under the License is distributed on an "AS IS" BASIS,
 
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
12
# implied.
 
13
# See the License for the specific language governing permissions and
 
14
# limitations under the License.
 
15
 
 
16
from sahara.i18n import _
 
17
from sahara.openstack.common import jsonutils as json
 
18
from sahara.openstack.common import log as logging
 
19
from sahara.plugins.general import exceptions as ex
 
20
from sahara.plugins.hdp.versions import versionhandlerfactory as vhf
 
21
 
 
22
 
 
23
LOG = logging.getLogger(__name__)
 
24
 
 
25
 
 
26
class ClusterSpec():
 
27
    def __init__(self, config, version='1.3.2'):
 
28
        self._config_template = config
 
29
        self.services = []
 
30
        self.configurations = {}
 
31
        self.node_groups = {}
 
32
        self.version = version
 
33
        self.user_input_handlers = {}
 
34
 
 
35
        cluster_template = json.loads(config)
 
36
        self._parse_services(cluster_template)
 
37
        self._parse_configurations(cluster_template)
 
38
        self._process_node_groups(template_json=cluster_template)
 
39
 
 
40
    def create_operational_config(self, cluster, user_inputs,
 
41
                                  scaled_groups=None):
 
42
        if scaled_groups is None:
 
43
            scaled_groups = {}
 
44
        self._determine_deployed_services(cluster)
 
45
        self._process_node_groups(cluster=cluster)
 
46
 
 
47
        for ng_id in scaled_groups:
 
48
            existing = next(group for group in self.node_groups.values()
 
49
                            if group.id == ng_id)
 
50
            existing.count = scaled_groups[ng_id]
 
51
 
 
52
        self.validate_node_groups(cluster)
 
53
        self._finalize_ng_components()
 
54
        self._parse_configurations(json.loads(self._config_template))
 
55
        self._process_user_inputs(user_inputs)
 
56
        self._replace_config_tokens()
 
57
 
 
58
    def scale(self, updated_groups):
 
59
        for ng_id in updated_groups:
 
60
            existing = next(group for group in self.node_groups.values()
 
61
                            if group.id == ng_id)
 
62
            existing.count = updated_groups[ng_id]
 
63
 
 
64
    def validate_node_groups(self, cluster):
 
65
        for service in self.services:
 
66
            if service.deployed:
 
67
                service.validate(self, cluster)
 
68
            elif service.is_mandatory():
 
69
                raise ex.RequiredServiceMissingException(service.name)
 
70
 
 
71
    def get_deployed_configurations(self):
 
72
        configs = set()
 
73
        for service in self.services:
 
74
            if service.deployed:
 
75
                configs |= service.configurations
 
76
 
 
77
        return configs
 
78
 
 
79
    def determine_component_hosts(self, component):
 
80
        hosts = set()
 
81
        for ng in self.node_groups.values():
 
82
            if component in ng.components:
 
83
                hosts |= ng.instances
 
84
 
 
85
        return hosts
 
86
 
 
87
    def normalize(self):
 
88
        return NormalizedClusterConfig(self)
 
89
 
 
90
    def get_deployed_node_group_count(self, name):
 
91
        count = 0
 
92
        for ng in self.get_node_groups_containing_component(name):
 
93
            count += ng.count
 
94
 
 
95
        return count
 
96
 
 
97
    def get_node_groups_containing_component(self, component):
 
98
        found_node_groups = []
 
99
        for ng in self.node_groups.values():
 
100
            if component in ng.components:
 
101
                found_node_groups.append(ng)
 
102
 
 
103
        return found_node_groups
 
104
 
 
105
    def get_components_for_type(self, type):
 
106
        components = set()
 
107
        for service in self.services:
 
108
            for component in service.components:
 
109
                if component.type == type:
 
110
                    components.add(component.name)
 
111
 
 
112
        return components
 
113
 
 
114
    def _parse_services(self, template_json):
 
115
        handler = (vhf.VersionHandlerFactory.get_instance().
 
116
                   get_version_handler(self.version))
 
117
        sp = handler.get_services_processor()
 
118
        for s in template_json['services']:
 
119
            name = s['name']
 
120
            service = sp.create_service(name)
 
121
 
 
122
            self.services.append(service)
 
123
            for c in s['components']:
 
124
                component = Component(c['name'], c['type'], c['cardinality'])
 
125
                service.add_component(component)
 
126
 
 
127
            if 'users' in s:
 
128
                for u in s['users']:
 
129
                    user = User(u['name'], u['password'], u['groups'])
 
130
                    service.add_user(user)
 
131
 
 
132
            configs = self._parse_configurations(s)
 
133
            for config in configs:
 
134
                service.add_configuration(config)
 
135
 
 
136
    def _parse_configurations(self, template_json):
 
137
        config_names = []
 
138
        for config in template_json['configurations']:
 
139
            config_props = {}
 
140
            name = config['name']
 
141
            config_names.append(name)
 
142
            if name in self.configurations:
 
143
                config_props = self.configurations[name]
 
144
            else:
 
145
                self.configurations[name] = config_props
 
146
 
 
147
            if 'properties' in config:
 
148
                for prop in config['properties']:
 
149
                    config_props[prop['name']] = prop['value']
 
150
 
 
151
        return config_names
 
152
 
 
153
    def _process_node_groups(self, template_json=None, cluster=None):
 
154
        # get node_groups from config
 
155
        if template_json and not cluster:
 
156
            for group in template_json['host_role_mappings']:
 
157
                node_group = NodeGroup(group['name'].lower())
 
158
                for component in group['components']:
 
159
                    node_group.add_component(component['name'])
 
160
                for host in group['hosts']:
 
161
                    if 'predicate' in host:
 
162
                        node_group.predicate = host['predicate']
 
163
                    if 'cardinality' in host:
 
164
                        node_group.cardinality = host['cardinality']
 
165
                    if 'default_count' in host:
 
166
                        node_group.count = host['default_count']
 
167
                self.node_groups[node_group.name] = node_group
 
168
 
 
169
        if cluster:
 
170
            self.node_groups = {}
 
171
            node_groups = cluster.node_groups
 
172
            for ng in node_groups:
 
173
                node_group = NodeGroup(ng.name.lower())
 
174
                node_group.count = ng.count
 
175
                node_group.id = ng.id
 
176
                node_group.components = ng.node_processes[:]
 
177
                node_group.ng_storage_paths = ng.storage_paths()
 
178
                for instance in ng.instances:
 
179
                    node_group.instances.add(Instance(instance))
 
180
                self.node_groups[node_group.name] = node_group
 
181
 
 
182
    def _determine_deployed_services(self, cluster):
 
183
        for ng in cluster.node_groups:
 
184
            for service in self.services:
 
185
                if service.deployed:
 
186
                    continue
 
187
                for sc in service.components:
 
188
                    if sc.name in ng.node_processes:
 
189
                        service.deployed = True
 
190
                        service.register_user_input_handlers(
 
191
                            self.user_input_handlers)
 
192
                        break
 
193
 
 
194
    def _process_user_inputs(self, user_inputs):
 
195
        for ui in user_inputs:
 
196
            user_input_handler = self.user_input_handlers.get(
 
197
                '{0}/{1}'.format(ui.config.tag, ui.config.name),
 
198
                self._default_user_input_handler)
 
199
 
 
200
            user_input_handler(ui, self.configurations)
 
201
 
 
202
    def _replace_config_tokens(self):
 
203
        for service in self.services:
 
204
            if service.deployed:
 
205
                service.finalize_configuration(self)
 
206
 
 
207
    def _finalize_ng_components(self):
 
208
        for service in self.services:
 
209
            if service.deployed:
 
210
                service.finalize_ng_components(self)
 
211
 
 
212
    def _default_user_input_handler(self, user_input, configurations):
 
213
        config_map = configurations[user_input.config.tag]
 
214
        config_map[user_input.config.name] = user_input.value
 
215
 
 
216
 
 
217
class Component():
 
218
    def __init__(self, name, component_type, cardinality):
 
219
        self.name = name
 
220
        self.type = component_type
 
221
        self.cardinality = cardinality
 
222
 
 
223
 
 
224
class NodeGroup():
 
225
    def __init__(self, name):
 
226
        self.id = None
 
227
        self.name = name
 
228
        self.components = []
 
229
        self.predicate = None
 
230
        self.cardinality = None
 
231
        self.count = None
 
232
        self.instances = set()
 
233
        self.ng_storage_paths = []
 
234
 
 
235
    def add_component(self, component):
 
236
        self.components.append(component)
 
237
 
 
238
    def storage_paths(self):
 
239
        return self.ng_storage_paths
 
240
 
 
241
 
 
242
class User():
 
243
    def __init__(self, name, password, groups):
 
244
        self.name = name
 
245
        self.password = password
 
246
        self.groups = groups
 
247
 
 
248
 
 
249
class Instance():
 
250
    def __init__(self, sahara_instance):
 
251
        self.inst_fqdn = sahara_instance.fqdn()
 
252
        self.management_ip = sahara_instance.management_ip
 
253
        self.internal_ip = sahara_instance.internal_ip
 
254
        self.sahara_instance = sahara_instance
 
255
 
 
256
    def fqdn(self):
 
257
        return self.inst_fqdn
 
258
 
 
259
    def remote(self):
 
260
        return self.sahara_instance.remote()
 
261
 
 
262
    def __hash__(self):
 
263
        return hash(self.fqdn())
 
264
 
 
265
    def __eq__(self, other):
 
266
        return self.fqdn() == other.fqdn()
 
267
 
 
268
 
 
269
class NormalizedClusterConfig():
 
270
    def __init__(self, cluster_spec):
 
271
        self.hadoop_version = cluster_spec.version
 
272
        self.cluster_configs = []
 
273
        self.node_groups = []
 
274
        self.handler = (vhf.VersionHandlerFactory.get_instance().
 
275
                        get_version_handler(self.hadoop_version))
 
276
 
 
277
        self._parse_configurations(cluster_spec.configurations)
 
278
        self._parse_node_groups(cluster_spec.node_groups)
 
279
 
 
280
    def _parse_configurations(self, configurations):
 
281
        for config_name, properties in configurations.items():
 
282
            for prop, value in properties.items():
 
283
                target = self._get_property_target(prop)
 
284
                if target:
 
285
                    prop_type = self._get_property_type(prop, value)
 
286
                    # TODO(sdpeidel): should we supply a scope?
 
287
                    self.cluster_configs.append(
 
288
                        NormalizedConfigEntry(NormalizedConfig(
 
289
                            prop, prop_type, value, target, 'cluster'),
 
290
                            value))
 
291
 
 
292
    def _parse_node_groups(self, node_groups):
 
293
        for node_group in node_groups.values():
 
294
            self.node_groups.append(NormalizedNodeGroup(node_group))
 
295
 
 
296
    def _get_property_target(self, prop):
 
297
        return self.handler.get_applicable_target(prop)
 
298
 
 
299
    def _get_property_type(self, prop, value):
 
300
        # TODO(jspeidel): seems that all numeric prop values in default config
 
301
        # are encoded as strings.  This may be incorrect.
 
302
        # TODO(jspeidel): should probably analyze string value to determine if
 
303
        # it is numeric
 
304
        # TODO(jspeidel): would then need to know whether Ambari expects a
 
305
        # string or a numeric value
 
306
        prop_type = type(value).__name__
 
307
        # print 'Type: {0}'.format(prop_type)
 
308
        if prop_type == 'str' or prop_type == 'unicode' or value == '':
 
309
            return 'string'
 
310
        elif prop_type == 'int':
 
311
            return 'integer'
 
312
        elif prop_type == 'bool':
 
313
            return 'boolean'
 
314
        else:
 
315
            raise ValueError(
 
316
                _("Could not determine property type for property "
 
317
                  "'%(property)s' with value: %(value)s") %
 
318
                {"property": prop, "value": value})
 
319
 
 
320
 
 
321
class NormalizedConfig():
 
322
    def __init__(self, name, config_type, default_value, target, scope):
 
323
        self.name = name
 
324
        self.description = None
 
325
        self.type = config_type
 
326
        self.default_value = default_value
 
327
        self.is_optional = False
 
328
        self.applicable_target = target
 
329
        self.scope = scope
 
330
 
 
331
 
 
332
class NormalizedConfigEntry():
 
333
    def __init__(self, config, value):
 
334
        self.config = config
 
335
        self.value = value
 
336
 
 
337
 
 
338
class NormalizedNodeGroup():
 
339
    def __init__(self, node_group):
 
340
        self.name = node_group.name
 
341
        self.node_processes = node_group.components
 
342
        self.node_configs = None
 
343
        # TODO(jpseidel): should not have to specify img/flavor
 
344
        self.img = None
 
345
        # TODO(jmaron) the flavor will be set via an ambari blueprint setting,
 
346
        # but that setting doesn't exist yet.  It will be addressed by a bug
 
347
        # fix shortly
 
348
        self.flavor = 3
 
349
        self.count = node_group.count
 
350
        self.id = node_group.id