26
26
import types, string, select, sys, time, re, os, struct, signal
27
27
import time, syslog, random, traceback, base64, pickle, binascii, fcntl
29
30
from socket import gethostbyname_ex
30
31
from UserDict import UserDict
31
32
from subprocess import Popen,PIPE
32
33
from cts.CTSvars import *
39
log_stats_bin = CTSvars.CRM_DAEMON_DIR + "/cts_log_stats.sh"
42
# Tool for generating system load reports while CTS runs
49
echo "Time, Load 1, Load 5, Load 15, Test Marker" > $f
53
if [ -e $f.pid ]; then
60
if [ -e $f.pid ]; then
69
# Is it already running?
80
uptime | sed s/up.*:/,/ | tr '\\n' ',' >> $f
81
#top -b -c -n1 | grep -e usr/libexec/pacemaker | grep -v -e grep -e python | head -n 1 | sed s@/usr/libexec/pacemaker/@@ | awk '{print " 0, "$9", "$10", "$12}' | tr '\\n' ',' >> $f
99
uptime | sed s/up.*:/,/ | tr '\\n' ',' >> $f
104
echo "Unknown action: $action."
34
109
class CtsLab(UserDict):
35
110
'''This class defines the Lab Environment for the Cluster Test System.
36
111
It defines those things which are expected to change from test
141
217
self.log("Exception by %s" % sys.exc_info()[0])
142
218
for logmethod in self["logger"]:
143
219
traceback.print_exc(50, logmethod)
145
221
Scenario.summarize()
146
222
Scenario.TearDown()
149
#ClusterManager.oprofileSave(Iterations)
226
#ClusterManager.oprofileSave(Iterations)
150
227
Scenario.TearDown()
152
230
Scenario.summarize()
153
231
if Scenario.Stats["failure"] > 0:
154
232
return Scenario.Stats["failure"]
223
301
'''Choose a random node from the cluster'''
224
302
return self.RandomGen.choice(self["nodes"])
304
def StatsExtract(self):
305
if not self["stats"]:
308
for host in self["nodes"]:
309
log_stats_file = "%s/cts-stats.csv" % CTSvars.CRM_DAEMON_DIR
310
if has_log_stats.has_key(host):
311
self.rsh(host, '''bash %s %s stop''' % (log_stats_bin, log_stats_file))
312
(rc, lines) = self.rsh(host, '''cat %s''' % log_stats_file, stdout=2)
313
self.rsh(host, '''bash %s %s delete''' % (log_stats_bin, log_stats_file))
315
fname = "cts-stats-%d-nodes-%s.csv" % (len(self["nodes"]), host)
316
print "Extracted stats: %s" % fname
317
fd = open(fname, "a")
321
def StatsMark(self, testnum):
322
'''Mark the test number in the stats log'''
325
if not self["stats"]:
328
for host in self["nodes"]:
329
log_stats_file = "%s/cts-stats.csv" % CTSvars.CRM_DAEMON_DIR
330
if not has_log_stats.has_key(host):
335
#script = re.sub("\\\\", "\\\\", script)
336
script = re.sub('\"', '\\\"', script)
337
script = re.sub("'", "\'", script)
338
script = re.sub("`", "\`", script)
339
script = re.sub("\$", "\\\$", script)
341
self.debug("Installing %s on %s" % (log_stats_bin, host))
342
self.rsh(host, '''echo "%s" > %s''' % (script, log_stats_bin), silent=True)
343
self.rsh(host, '''bash %s %s delete''' % (log_stats_bin, log_stats_file))
344
has_log_stats[host] = 1
347
self.rsh(host, '''bash %s %s mark %s''' % (log_stats_bin, log_stats_file, testnum), synchronous=0)
227
350
TimeFormat = "%b %d %H:%M:%S\t"
285
408
self.facility=SysLog.map[self.facility]
286
409
syslog.closelog()
287
410
syslog.openlog(self.source, 0, self.facility)
290
413
def __call__(self, lines):
291
414
if isinstance(lines, types.StringType):
305
428
def __call__(self, lines):
306
t = time.strftime(Logger.TimeFormat, time.localtime(time.time()))
429
t = time.strftime(Logger.TimeFormat, time.localtime(time.time()))
307
430
if isinstance(lines, types.StringType):
308
431
sys.__stderr__.writelines([t, lines, "\n"])
327
450
def __call__(self, lines):
329
452
fd = open(self.logfile, "a")
330
t = time.strftime(Logger.TimeFormat, time.localtime(time.time()))
453
t = time.strftime(Logger.TimeFormat, time.localtime(time.time()))
332
455
if isinstance(lines, types.StringType):
333
456
fd.writelines([t, self.hostname, self.source, lines, "\n"])
351
474
self.silent = silent
353
479
# -n: no stdin, -x: no X11,
354
# -o ServerAliveInterval=5 disconnect after 3*5s if the server stops responding
480
# -o ServerAliveInterval=5 disconnect after 3*5s if the server stops responding
355
481
self.Command = "ssh -l root -n -x -o ServerAliveInterval=5 -o ConnectTimeout=10 -o TCPKeepAlive=yes -o ServerAliveCountMax=3 "
356
482
# -B: batch mode, -q: no stats (quiet)
357
483
self.CpCommand = "scp -B -q"
361
487
def enable_qarsh(self):
362
488
# http://nstraz.wordpress.com/2008/12/03/introducing-qarsh/
363
489
self.log("Using QARSH for connections to cluster nodes")
365
491
self.Command = "qarsh -t 300 -l root"
366
492
self.CpCommand = "qacp -q"
368
494
def _fixcmd(self, cmd):
369
495
return re.sub("\'", "'\\''", cmd)
459
588
'''Perform a remote copy'''
460
589
cpstring = self.CpCommand + " \'" + source + "\'" + " \'" + target + "\'"
461
590
rc = os.system(cpstring)
462
593
if not silent: self.debug("cmd: rc=%d: %s" % (rc, cpstring))
466
598
has_log_watcher = {}
467
log_watcher_bin = "/tmp/cts_log_watcher.py"
599
log_watcher_bin = CTSvars.CRM_DAEMON_DIR + "/cts_log_watcher.py"
468
600
log_watcher = """
469
601
import sys, os, fcntl
499
631
elif args[i] == '-o' or args[i] == '--offset':
501
633
offset = args[i+1]
503
635
elif args[i] == '-p' or args[i] == '--prefix':
505
637
prefix = args[i+1]
639
elif args[i] == '-t' or args[i] == '--tag':
642
if not os.access(filename, os.R_OK):
643
print prefix + 'Last read: %d, limit=%d, count=%d - unreadable' % (0, limit, 0)
507
646
logfile=open(filename, 'r')
508
647
logfile.seek(0, os.SEEK_END)
509
648
newsize=logfile.tell()
587
727
global log_watcher_bin
588
728
(rc, lines) = self.Env.rsh(
590
"python %s -p CTSwatcher: -f %s -o %s" % (log_watcher_bin, self.filename, self.offset),
730
"python %s -t %s -p CTSwatcher: -f %s -o %s" % (log_watcher_bin, self.name, self.filename, self.offset),
591
731
stdout=None, silent=True, blocking=False)
593
733
for line in lines:
594
734
match = re.search("^CTSwatcher:Last read: (\d+)", line)
620
760
Call look() to scan the log looking for the patterns
623
def __init__(self, Env, log, regexes, name="Anon", timeout=10, debug_level=None, silent=False):
763
def __init__(self, Env, log, regexes, name="Anon", timeout=10, debug_level=None, silent=False, hosts=None):
624
764
'''This is the constructor for the LogWatcher class. It takes a
625
765
log name to watch, and a list of regular expressions to watch for."
661
810
if self.Env["LogWatcher"] == "remote":
662
for node in self.Env["nodes"]:
663
self.file_list.append(SearchObj(self.Env, self.filename, node))
811
for node in self.hosts:
812
self.file_list.append(SearchObj(self.Env, self.filename, node, self.name))
666
815
self.file_list.append(SearchObj(self.Env, self.filename))
714
870
if self.debug_level > 2: self.debug("Processing: "+ line)
715
871
for regex in self.regexes:
717
if self.debug_level > 2: self.debug("Comparing line to: "+ regex)
873
if self.debug_level > 3: self.debug("Comparing line to: "+ regex)
718
874
#matchobj = re.search(string.lower(regex), string.lower(line))
719
875
matchobj = re.search(regex, line)
726
882
if self.debug_level > 1: self.debug("With: "+ regex)
729
elif timeout > 0 and end > time.time():
885
elif timeout > 0 and end > time.time():
886
if self.debug_level > 1: self.debug("lines during timeout")
731
888
self.__get_lines()
734
891
# Grab any relevant messages that might have arrived since
735
892
# the last time the buffer was populated
893
if self.debug_level > 1: self.debug("lines without timeout")
736
894
self.__get_lines()
738
896
# Don't come back here again
742
900
self.debug("Single search terminated: start=%d, end=%d, now=%d, lines=%d" % (begin, end, time.time(), lines))
897
1061
self.ShouldBeStatus={}
898
1062
self.ns = NodeStatus(self.Env)
899
1063
self.OurNode=string.lower(os.uname()[1])
1064
self.__instance_errorstoignore = []
901
1066
def key_for_node(self, node):
1069
def instance_errorstoignore_clear(self):
1070
'''Allows the test scenario to reset instance errors to ignore on each iteration.'''
1071
self.__instance_errorstoignore = []
1073
def instance_errorstoignore(self):
1074
'''Return list of errors which are 'normal' for a specific test instance'''
1075
return self.__instance_errorstoignore
904
1077
def errorstoignore(self):
905
1078
'''Return list of errors which are 'normal' and should be ignored'''
933
def install_helper(self, filename, nodes=None):
934
file_with_path="%s/%s" % (CTSvars.CTS_home, filename)
1106
def install_helper(self, filename, destdir=None, nodes=None, sourcedir=None):
1107
if sourcedir == None:
1108
sourcedir = CTSvars.CTS_home
1109
file_with_path="%s/%s" % (sourcedir, filename)
936
1111
nodes = self.Env["nodes"]
938
self.debug("Installing %s to %s on %s" % (filename, CTSvars.CTS_home, repr(self.Env["nodes"])))
1114
destdir=CTSvars.CTS_home
1116
self.debug("Installing %s to %s on %s" % (filename, destdir, repr(self.Env["nodes"])))
939
1117
for node in nodes:
940
self.rsh(node, "mkdir -p %s" % CTSvars.CTS_home)
941
self.rsh.cp(file_with_path, "root@%s:%s" % (node, file_with_path))
1118
self.rsh(node, "mkdir -p %s" % destdir)
1119
self.rsh.cp(file_with_path, "root@%s:%s/%s" % (node, destdir, filename))
942
1120
return file_with_path
944
1122
def install_config(self, node):
956
1134
def prepare_fencing_watcher(self, node):
957
1135
# If we don't have quorum now but get it as a result of starting this node,
958
1136
# then a bunch of nodes might get fenced
959
1138
if self.HasQuorum(None):
962
if not self.has_key("Pat:They_fenced"):
1141
if not self.has_key("Pat:Fencing_start"):
965
if not self.has_key("Pat:They_fenced_offset"):
1144
if not self.has_key("Pat:Fencing_ok"):
969
1148
stonithPats = []
970
1149
for peer in self.Env["nodes"]:
971
1150
if peer != node and self.ShouldBeStatus[peer] != "up":
972
stonithPats.append(self["Pat:They_fenced"] % peer)
1151
stonithPats.append(self["Pat:Fencing_ok"] % peer)
1152
stonithPats.append(self["Pat:Fencing_start"] % peer)
1153
elif self.Env["Stack"] == "corosync (cman)":
1154
# There is a delay between gaining quorum and CMAN starting fencing
1155
# This can mean that even nodes that are fully up get fenced
1156
# There is no use fighting it, just look for everyone so that CTS doesn't get confused
1157
stonithPats.append(self["Pat:Fencing_ok"] % peer)
1158
stonithPats.append(self["Pat:Fencing_start"] % peer)
1160
if peer != node and not upnode and self.ShouldBeStatus[peer] == "up":
975
1163
# Look for STONITH ops, depending on Env["at-boot"] we might need to change the nodes status
976
stonith = LogWatcher(self.Env, self["LogFileName"], stonithPats, "StartaCM", 0)
1167
stonith = LogWatcher(self.Env, self["LogFileName"], stonithPats, "StartupFencing", 0, hosts=[upnode])
977
1168
stonith.setwatch()
980
1171
def fencing_cleanup(self, node, stonith):
1175
self.debug("Looking for nodes that were fenced as a result of %s starting" % node)
983
1177
# If we just started a node, we may now have quorum (and permission to fence)
984
# Make sure everyone is online before continuing
985
self.ns.WaitForAllNodesToComeUp(self.Env["nodes"])
1179
self.debug("Nothing to do")
988
1180
return peer_list
990
if not self.HasQuorum(None) and len(self.Env["nodes"]) > 2:
1182
q = self.HasQuorum(None)
1183
if not q and len(self.Env["nodes"]) > 2:
991
1184
# We didn't gain quorum - we shouldn't have shot anyone
1185
self.debug("Quorum: %d Len: %d" % (q, len(self.Env["nodes"])))
992
1186
return peer_list
994
1188
# Now see if any states need to be updated
998
1192
line = repr(shot)
999
1193
self.debug("Found: "+ line)
1194
del stonith.regexes[stonith.whichmatch]
1001
1196
# Extract node name
1002
start = line.find(self["Pat:They_fenced_offset"]) + len(self["Pat:They_fenced_offset"])
1003
peer = line[start:].split("' ")[0]
1005
self.debug("Found peer: "+ peer)
1007
peer_list.append(peer)
1008
self.ShouldBeStatus[peer]="down"
1009
self.log(" Peer %s was fenced as a result of %s starting" % (peer, node))
1197
for n in self.Env["nodes"]:
1198
if re.search(self["Pat:Fencing_ok"] % n, shot):
1200
peer_state[peer] = "complete"
1201
self.__instance_errorstoignore.append(self["Pat:Fencing_ok"] % peer)
1203
elif re.search(self["Pat:Fencing_start"] % n, shot):
1205
peer_state[peer] = "in-progress"
1206
self.__instance_errorstoignore.append(self["Pat:Fencing_start"] % peer)
1209
self.log("ERROR: Unknown stonith match: %s" % line)
1211
elif not peer in peer_list:
1212
self.debug("Found peer: "+ peer)
1213
peer_list.append(peer)
1011
1215
# Get the next one
1012
1216
shot = stonith.look(60)
1218
for peer in peer_list:
1220
self.ShouldBeStatus[peer] = "down"
1221
self.debug(" Peer %s was fenced as a result of %s starting: %s" % (peer, node, peer_state[peer]))
1223
self.ns.WaitForNodeToComeUp(peer, self["DeadTime"])
1014
1225
# Poll until it comes up
1015
1226
if self.Env["at-boot"]:
1016
1227
if not self.StataCM(peer):
1017
1228
time.sleep(self["StartTime"])
1019
1230
if not self.StataCM(peer):
1020
self.log("ERROR: Peer %s failed to restart after being fenced" % peer)
1231
self.log("ERROR: Peer %s failed to restart after being fenced" % peer)
1023
self.ShouldBeStatus[peer]="up"
1234
self.ShouldBeStatus[peer] = "up"
1025
1236
return peer_list
1077
1286
stonith = self.prepare_fencing_watcher(node)
1079
1290
if self.rsh(node, startCmd) != 0:
1080
1291
self.log ("Warn: Start command failed on node %s" %(node))
1292
self.fencing_cleanup(node, stonith)
1083
1295
self.ShouldBeStatus[node]="up"
1084
1296
watch_result = watch.lookforall()
1086
self.fencing_cleanup(node, stonith)
1088
1298
if watch.unmatched:
1089
1299
for regex in watch.unmatched:
1090
1300
self.log ("Warn: Startup pattern not found: %s" %(regex))
1093
1302
if watch_result and self.cluster_stable(self["DeadTime"]):
1094
1303
#self.debug("Found match: "+ repr(watch_result))
1304
self.fencing_cleanup(node, stonith)
1097
1307
elif self.StataCM(node) and self.cluster_stable(self["DeadTime"]):
1308
self.fencing_cleanup(node, stonith)
1100
1311
self.log ("Warn: Start failed for node %s" %(node))
1140
1351
if self.rsh(node, self["StopCmd"]) == 0:
1141
1352
# Make sure we can continue even if corosync leaks
1142
1353
# fdata-* is the old name
1143
self.rsh(node, "rm -f /dev/shm/qb-* /dev/shm/fdata-*")
1354
#self.rsh(node, "rm -f /dev/shm/qb-* /dev/shm/fdata-*")
1144
1355
self.ShouldBeStatus[node]="down"
1145
1356
self.cluster_stable(self["DeadTime"])
1148
self.log ("Could not stop %s on node %s" %(self["Name"], node))
1359
self.log ("ERROR: Could not stop %s on node %s" %(self["Name"], node))
1185
1396
'''Report the status of the cluster manager on a given node'''
1187
out=self.rsh(node, self["StatusCmd"], 1)
1398
out=self.rsh(node, self["StatusCmd"] % node, 1)
1188
1399
ret= (string.find(out, 'stopped') == -1)
1295
1507
self.rsh(target, self["FixCommCmd"] % self.key_for_node(node), synchronous=0)
1296
1508
self.rsh(node, self["FixCommCmd"] % self.key_for_node(target), synchronous=0)
1297
1509
self.debug("Communication restored between %s and %s" % (target, node))
1299
1511
def reducecomm_node(self,node):
1300
1512
'''reduce the communication between the nodes'''
1301
1513
rc = self.rsh(node, self["ReduceCommCmd"]%(self.Env["XmitLoss"],self.Env["RecvLoss"]))
1321
1533
# If we are auditing a partition, then one side will
1322
1534
# have quorum and the other not.
1323
1535
# So the caller needs to tell us which we are checking
1324
# If no value for node_list is specified... assume all nodes
1536
# If no value for node_list is specified... assume all nodes
1325
1537
raise ValueError("Abstract Class member (HasQuorum)")
1327
1539
def Components(self):
1328
1540
raise ValueError("Abstract Class member (Components)")
1333
1545
self.oprofileStart(n)
1335
1547
elif node in self.Env["oprofile"]:
1336
self.debug("Enabling oprofile on %s" % node)
1548
self.debug("Enabling oprofile on %s" % node)
1337
1549
self.rsh(node, "opcontrol --init")
1338
1550
self.rsh(node, "opcontrol --setup --no-vmlinux --separate=lib --callgraph=20 --image=all")
1339
1551
self.rsh(node, "opcontrol --start")
1360
1572
self.oprofileStop(n)
1362
1574
elif node in self.Env["oprofile"]:
1363
self.debug("Stopping oprofile on %s" % node)
1575
self.debug("Stopping oprofile on %s" % node)
1364
1576
self.rsh(node, "opcontrol --reset")
1365
1577
self.rsh(node, "opcontrol --shutdown 2>&1 > /dev/null")
1368
1580
class Resource:
1370
1582
This is an HA resource (not a resource group).
1428
1640
class Component:
1429
1641
def kill(self, node):
1432
1644
class Process(Component):
1433
def __init__(self, cm, name, process=None, dc_only=0, pats=[], dc_pats=[], badnews_ignore=[], triggersreboot=0):
1645
def __init__(self, cm, name, process=None, dc_only=0, pats=[], dc_pats=[], badnews_ignore=[], common_ignore=[], triggersreboot=0):
1434
1646
self.name = str(name)
1435
1647
self.dc_only = dc_only
1436
1648
self.pats = pats
1437
1649
self.dc_pats = dc_pats
1439
1651
self.badnews_ignore = badnews_ignore
1652
self.badnews_ignore.extend(common_ignore)
1440
1653
self.triggersreboot = triggersreboot
1442
1656
self.proc = str(process)