1
# Copyright (c) 2014 Hoang Do, Phuc Vo, P. Michiardi, D. Venzano
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
18
from oslo.utils import timeutils
21
from sahara import context
22
from sahara.i18n import _
23
from sahara.plugins.general import exceptions as ex
24
from sahara.plugins.general import utils
25
from sahara.plugins.spark import config_helper as c_helper
26
from sahara.plugins.spark import run_scripts as run
27
from sahara.utils import remote
30
def decommission_sl(master, inst_to_be_deleted, survived_inst):
31
if survived_inst is not None:
33
for slave in survived_inst:
34
slavenames.append(slave.hostname())
35
slaves_content = c_helper.generate_spark_slaves_configs(slavenames)
39
cluster = master.node_group.cluster
40
sp_home = c_helper.get_config_value("Spark", "Spark home", cluster)
41
r_master = remote.get_remote(master)
42
run.stop_spark(r_master, sp_home)
44
# write new slave file to master
45
files = {os.path.join(sp_home, 'conf/slaves'): slaves_content}
46
r_master.write_files_to(files)
48
# write new slaves file to each survived slave as well
49
for i in survived_inst:
50
with remote.get_remote(i) as r:
51
r.write_files_to(files)
53
run.start_spark_master(r_master, sp_home)
56
def decommission_dn(nn, inst_to_be_deleted, survived_inst):
57
with remote.get_remote(nn) as r:
58
r.write_file_to('/etc/hadoop/dn.excl',
59
utils.generate_fqdn_host_names(
61
run.refresh_nodes(remote.get_remote(nn), "dfsadmin")
64
timeout = c_helper.get_decommissioning_timeout(
65
nn.node_group.cluster)
66
s_time = timeutils.utcnow()
69
while timeutils.delta_seconds(s_time, timeutils.utcnow()) < timeout:
70
cmd = r.execute_command(
71
"sudo -u hdfs hadoop dfsadmin -report")
73
datanodes_info = parse_dfs_report(cmd[1])
74
for i in inst_to_be_deleted:
75
for dn in datanodes_info:
76
if (dn["Name"].startswith(i.internal_ip)) and (
77
dn["Decommission Status"] != "Decommissioned"):
82
r.write_files_to({'/etc/hadoop/dn.incl':
84
generate_fqdn_host_names(survived_inst),
85
'/etc/hadoop/dn.excl': "",
92
_("Cannot finish decommission of cluster %(cluster)s in "
93
"%(seconds)d seconds") %
94
{"cluster": nn.node_group.cluster,
98
def parse_dfs_report(cmd_output):
99
report = cmd_output.rstrip().split(os.linesep)
105
if line.startswith("Datanodes available"):
110
for i in six.moves.xrange(0, len(array)):
112
idx = str.find(array[i], ':')
113
name = array[i][0:idx]
114
value = array[i][idx + 2:]
115
datanode_info[name.strip()] = value.strip()
116
if not array[i] and datanode_info:
117
res.append(datanode_info)
120
res.append(datanode_info)