~ubuntu-branches/ubuntu/vivid/sahara/vivid-proposed

« back to all changes in this revision

Viewing changes to sahara/plugins/spark/scaling.py

  • Committer: Package Import Robot
  • Author(s): Thomas Goirand
  • Date: 2014-09-24 16:34:46 UTC
  • Revision ID: package-import@ubuntu.com-20140924163446-8gu3zscu5e3n9lr2
Tags: upstream-2014.2~b3
ImportĀ upstreamĀ versionĀ 2014.2~b3

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2014 Hoang Do, Phuc Vo, P. Michiardi, D. Venzano
 
2
#
 
3
# Licensed under the Apache License, Version 2.0 (the "License");
 
4
# you may not use this file except in compliance with the License.
 
5
# You may obtain a copy of the License at
 
6
#
 
7
#    http://www.apache.org/licenses/LICENSE-2.0
 
8
#
 
9
# Unless required by applicable law or agreed to in writing, software
 
10
# distributed under the License is distributed on an "AS IS" BASIS,
 
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
12
# implied.
 
13
# See the License for the specific language governing permissions and
 
14
# limitations under the License.
 
15
 
 
16
import os
 
17
 
 
18
from oslo.utils import timeutils
 
19
import six
 
20
 
 
21
from sahara import context
 
22
from sahara.i18n import _
 
23
from sahara.plugins.general import exceptions as ex
 
24
from sahara.plugins.general import utils
 
25
from sahara.plugins.spark import config_helper as c_helper
 
26
from sahara.plugins.spark import run_scripts as run
 
27
from sahara.utils import remote
 
28
 
 
29
 
 
30
def decommission_sl(master, inst_to_be_deleted, survived_inst):
 
31
    if survived_inst is not None:
 
32
        slavenames = []
 
33
        for slave in survived_inst:
 
34
            slavenames.append(slave.hostname())
 
35
        slaves_content = c_helper.generate_spark_slaves_configs(slavenames)
 
36
    else:
 
37
        slaves_content = "\n"
 
38
 
 
39
    cluster = master.node_group.cluster
 
40
    sp_home = c_helper.get_config_value("Spark", "Spark home", cluster)
 
41
    r_master = remote.get_remote(master)
 
42
    run.stop_spark(r_master, sp_home)
 
43
 
 
44
    # write new slave file to master
 
45
    files = {os.path.join(sp_home, 'conf/slaves'): slaves_content}
 
46
    r_master.write_files_to(files)
 
47
 
 
48
    # write new slaves file to each survived slave as well
 
49
    for i in survived_inst:
 
50
        with remote.get_remote(i) as r:
 
51
            r.write_files_to(files)
 
52
 
 
53
    run.start_spark_master(r_master, sp_home)
 
54
 
 
55
 
 
56
def decommission_dn(nn, inst_to_be_deleted, survived_inst):
 
57
    with remote.get_remote(nn) as r:
 
58
        r.write_file_to('/etc/hadoop/dn.excl',
 
59
                        utils.generate_fqdn_host_names(
 
60
                            inst_to_be_deleted))
 
61
        run.refresh_nodes(remote.get_remote(nn), "dfsadmin")
 
62
        context.sleep(3)
 
63
 
 
64
        timeout = c_helper.get_decommissioning_timeout(
 
65
            nn.node_group.cluster)
 
66
        s_time = timeutils.utcnow()
 
67
        all_found = False
 
68
 
 
69
        while timeutils.delta_seconds(s_time, timeutils.utcnow()) < timeout:
 
70
            cmd = r.execute_command(
 
71
                "sudo -u hdfs hadoop dfsadmin -report")
 
72
            all_found = True
 
73
            datanodes_info = parse_dfs_report(cmd[1])
 
74
            for i in inst_to_be_deleted:
 
75
                for dn in datanodes_info:
 
76
                    if (dn["Name"].startswith(i.internal_ip)) and (
 
77
                            dn["Decommission Status"] != "Decommissioned"):
 
78
                        all_found = False
 
79
                        break
 
80
 
 
81
            if all_found:
 
82
                r.write_files_to({'/etc/hadoop/dn.incl':
 
83
                                 utils.
 
84
                                 generate_fqdn_host_names(survived_inst),
 
85
                                  '/etc/hadoop/dn.excl': "",
 
86
                                  })
 
87
                break
 
88
            context.sleep(3)
 
89
 
 
90
        if not all_found:
 
91
            ex.DecommissionError(
 
92
                _("Cannot finish decommission of cluster %(cluster)s in "
 
93
                  "%(seconds)d seconds") %
 
94
                {"cluster": nn.node_group.cluster,
 
95
                 "seconds": timeout})
 
96
 
 
97
 
 
98
def parse_dfs_report(cmd_output):
 
99
    report = cmd_output.rstrip().split(os.linesep)
 
100
    array = []
 
101
    started = False
 
102
    for line in report:
 
103
        if started:
 
104
            array.append(line)
 
105
        if line.startswith("Datanodes available"):
 
106
            started = True
 
107
 
 
108
    res = []
 
109
    datanode_info = {}
 
110
    for i in six.moves.xrange(0, len(array)):
 
111
        if array[i]:
 
112
            idx = str.find(array[i], ':')
 
113
            name = array[i][0:idx]
 
114
            value = array[i][idx + 2:]
 
115
            datanode_info[name.strip()] = value.strip()
 
116
        if not array[i] and datanode_info:
 
117
            res.append(datanode_info)
 
118
            datanode_info = {}
 
119
    if datanode_info:
 
120
        res.append(datanode_info)
 
121
    return res