115
126
'VMX_IMG1' : "%s/vmx/test1.vmx" % vcdir,
120
class PromptCheck(object):
121
def __init__(self, prompt, response=None):
123
self.response = response
125
self.response = self.response % test_files
127
def check(self, proc):
128
out = proc.stdout.readline()
130
if not out.count(self.prompt):
131
out += "\nContent didn't contain prompt '%s'" % (self.prompt)
135
proc.stdin.write(self.response + "\n")
139
class PromptTest(object):
140
def __init__(self, cmd):
141
cmd = cmd % test_files
142
app, opts = cmd.split(" ", 1)
143
self.cmd = [os.path.abspath(app)] + opts.split(" ")
147
self.prompt_list = []
149
def add(self, *args, **kwargs):
150
self.prompt_list.append(PromptCheck(*args, **kwargs))
153
proc = subprocess.Popen(self.cmd,
154
stdin=subprocess.PIPE,
155
stdout=subprocess.PIPE,
156
stderr=subprocess.STDOUT)
158
out = "Running %s\n" % self.cmdstr
160
for p in self.prompt_list:
161
ret, content = p.check(proc)
164
# Since we didn't match output, process might be hung
168
return proc.wait(), out
133
Any global args for every invocation should be added to default_args
134
function, so that individual tests can easily overwrite them.
176
"globalargs" : Arguments to be passed to every app invocation
178
139
"categoryfoo" : { Some descriptive test catagory name (e.g. storage)
180
141
"args" : Args to be applied to all invocations in category
159
def default_args(app, cli, testtype):
161
iscompare = testtype in ["compare"]
166
if app in ["virt-install", "virt-clone", "virt-image"] and not iscompare:
167
if "--connect " not in cli:
168
args += " --connect %(TESTURI)s"
170
if app in ["virt-install"]:
171
if "--name " not in cli:
172
args += " --name foobar"
173
if "--ram " not in cli:
176
if testtype in ["compare"]:
177
if app == "virt-install":
178
if (not cli.count("--print-xml") and
179
not cli.count("--print-step") and
180
not cli.count("--quiet")):
181
args += " --print-step all"
183
elif app == "virt-image":
184
if not cli.count("--print"):
187
elif app == "virt-clone":
188
if not cli.count("--print-xml"):
189
args += " --print-xml"
191
if app != "virt-convert" and not "--connect " in cli:
192
args += " --connect %s" % fakeuri
199
199
"virt-install" : {
200
"globalargs" : " --connect %(TESTURI)s -d --name foobar --ram 64",
203
201
"args": "--pxe --nographics --noautoconsole --hvm",
604
608
"--controller usb,model=ich9-uhci1,address=0:0:4.0,index=0,master=0 "
605
609
"--controller usb,model=ich9-uhci2,address=0:0:4.1,index=0,master=2 "
606
610
"--controller usb,model=ich9-uhci3,address=0:0:4.2,index=0,master=4 "
607
"--disk %(EXISTIMG1)s,cache=writeback,io=threads,perms=sh,serial=WD-WMAP9A966149 "
611
"--disk %(MANAGEDEXISTUPPER)s,cache=writeback,io=threads,perms=sh,serial=WD-WMAP9A966149 "
608
612
"--disk %(NEWIMG1)s,sparse=false,size=.001,perms=ro,error_policy=enospace "
609
613
"--disk device=cdrom,bus=sata "
610
614
"--serial tcp,host=:2222,mode=bind,protocol=telnet "
1094
1093
}, # app 'virt-convert'
1098
#if uri not in _conns:
1099
# _conns[uri] = virtinst.cli.getConnection(uri)
1101
return virtinst.cli.getConnection(uri)
1103
######################
1104
# Test class helpers #
1105
######################
1107
class Command(object):
1109
Instance of a single cli command to test
1111
def __init__(self, cmd):
1112
self.cmdstr = cmd % test_files
1113
self.check_success = True
1114
self.compare_file = None
1116
app, opts = self.cmdstr.split(" ", 1)
1117
self.argv = [os.path.abspath(app)] + shlex.split(opts)
1119
def _launch_command(self):
1120
logging.debug(self.cmdstr)
1126
for idx in reversed(range(len(self.argv))):
1127
if self.argv[idx] == "--connect":
1128
uri = self.argv[idx + 1]
1132
conn = open_conn(uri)
1134
oldstdout = sys.stdout
1135
oldstderr = sys.stderr
1138
out = StringIO.StringIO()
1141
sys.argv = self.argv
1144
if app.count("virt-install"):
1145
ret = virtinstall.main(conn=conn)
1146
elif app.count("virt-clone"):
1147
ret = virtclone.main(conn=conn)
1148
elif app.count("virt-image"):
1149
ret = virtimage.main(conn=conn)
1150
elif app.count("virt-convert"):
1151
ret = virtconvert.main()
1152
except SystemExit, sys_e:
1157
outt = out.getvalue()
1158
if outt.endswith("\n"):
1162
sys.stdout = oldstdout
1163
sys.stderr = oldstderr
1167
def _get_output(self):
1170
os.system("rm %s > /dev/null 2>&1" % i)
1172
code, output = self._launch_command()
1174
logging.debug(output + "\n")
1176
except Exception, e:
1177
return (-1, "".join(traceback.format_exc()) + str(e))
1180
filename = self.compare_file
1184
code, output = self._get_output()
1186
if bool(code) == self.check_success:
1187
raise AssertionError(
1188
("Expected command to %s, but failed.\n" %
1189
(self.check_success and "pass" or "fail")) +
1190
("Command was: %s\n" % self.cmdstr) +
1191
("Error code : %d\n" % code) +
1192
("Output was:\n%s" % output))
1195
# Generate test files that don't exist yet
1196
if not os.path.exists(filename):
1197
file(filename, "w").write(output)
1199
utils.diff_compare(output, filename)
1201
except AssertionError, e:
1202
err = self.cmdstr + "\n" + str(e)
1207
class PromptCheck(object):
1209
Individual question/response pair for automated --prompt tests
1211
def __init__(self, prompt, response=None):
1212
self.prompt = prompt
1213
self.response = response
1215
self.response = self.response % test_files
1217
def check(self, proc):
1218
out = proc.stdout.readline()
1220
if not out.count(self.prompt):
1221
out += "\nContent didn't contain prompt '%s'" % (self.prompt)
1225
proc.stdin.write(self.response + "\n")
1230
class PromptTest(Command):
1232
Fully automated --prompt test
1234
def __init__(self, cmdstr):
1235
Command.__init__(self, cmdstr)
1237
self.prompt_list = []
1239
def add(self, *args, **kwargs):
1240
self.prompt_list.append(PromptCheck(*args, **kwargs))
1242
def _launch_command(self):
1243
proc = subprocess.Popen(self.argv,
1244
stdin=subprocess.PIPE,
1245
stdout=subprocess.PIPE,
1246
stderr=subprocess.STDOUT)
1248
out = "Running %s\n" % self.cmdstr
1250
for p in self.prompt_list:
1251
ret, content = p.check(proc)
1254
# Since we didn't match output, process might be hung
1259
for ignore in range(30):
1260
if proc.poll() is not None:
1267
out += "\nProcess was killed by test harness"
1269
return proc.wait(), out
1272
##########################
1273
# Automated prompt tests #
1274
##########################
1099
1276
# Basic virt-install prompting
1100
1277
p1 = PromptTest("virt-install --connect %(TESTURI)s --prompt --quiet "
1145
1322
p6.add("use as the cloned disk", "%(NEWIMG2)s")
1146
1323
promptlist.append(p6)
1151
os.system("rm %s > /dev/null 2>&1" % i)
1153
if type(comm) is str:
1155
print comm % test_files
1157
code, output = commands.getstatusoutput(comm % test_files)
1162
code, output = comm.run()
1169
except Exception, e:
1174
sys.stdout.write(".")
1179
sys.stdout.write("F")
1182
class Command(object):
1183
def __init__(self, cmd):
1186
self.check_success = True
1187
self.compare_file = None
1189
if type(cmd) is not str:
1191
self.cmdstr = " ".join(self.cmd.cmd)
1194
filename = self.compare_file
1198
code, output = runcomm(self.cmd or self.cmdstr)
1200
if bool(code) == self.check_success:
1201
raise AssertionError(
1202
("Expected command to %s, but failed.\n" %
1203
(self.check_success and "pass" or "fail")) +
1204
("Command was: %s\n" % self.cmdstr) +
1205
("Error code : %d\n" % code) +
1206
("Output was:\n%s" % output))
1209
# Uncomment to generate new test files
1210
if not os.path.exists(filename):
1211
file(filename, "w").write(output)
1213
utils.diff_compare(output, filename)
1216
except AssertionError, e:
1218
err = self.cmdstr + "\n" + str(e)
1222
# Setup: build cliarg dict, which uses
1223
def run_tests(do_app, do_category, error_ret):
1224
if do_app and do_app not in args_dict.keys():
1225
raise ValueError("Unknown app '%s'" % do_app)
1229
# Prompt tests upfront
1230
for cmd in promptlist:
1231
cmdlist.append(Command(cmd))
1325
# Basic virt-clone prompting with disk failure handling
1326
p7 = PromptTest("virt-clone --connect %(TESTURI)s --prompt --quiet "
1327
"--clone-running -o test-clone-simple -n test-clone-new")
1328
p7.add("use as the cloned disk", "/root")
1329
p7.add("'/root' must be a file or a device")
1330
p7.add("use as the cloned disk", "%(MANAGEDNEW1)s")
1331
promptlist.append(p7)
1334
#########################
1335
# Test runner functions #
1336
#########################
1338
def build_cmd_list():
1339
cmdlist = promptlist
1233
1341
for app in args_dict:
1234
if do_app and app != do_app:
1240
1344
# Build default command line dict
1241
1345
for option in args_dict.get(app):
1242
if option == "globalargs":
1243
globalargs = args_dict[app][option]
1246
1346
# Default is a unique cmd string
1247
1347
unique[option] = args_dict[app][option]
1250
if do_category and do_category not in unique.keys():
1251
raise ValueError("Unknown category %s" % do_category)
1253
1349
# Build up unique command line cases
1254
1350
for category in unique.keys():
1255
if do_category and category != do_category:
1257
1351
catdict = unique[category]
1258
1352
category_args = catdict["args"]
1260
for optstr in catdict["valid"]:
1261
cmdstr = "./%s %s %s %s" % (app, globalargs,
1262
category_args, optstr)
1263
cmd = Command(cmdstr)
1264
cmd.check_success = True
1267
for optstr in catdict["invalid"]:
1268
cmdstr = "./%s %s %s %s" % (app, globalargs,
1269
category_args, optstr)
1270
cmd = Command(cmdstr)
1271
cmd.check_success = False
1274
for optstr, filename in catdict.get("compare") or []:
1275
filename = "%s/%s.xml" % (compare_xmldir, filename)
1276
cmdstr = "./%s %s %s %s" % (app, globalargs,
1277
category_args, optstr)
1278
cmdstr = cmdstr % test_files
1280
# Strip --debug to get reasonable output
1281
cmdstr = cmdstr.replace("--debug ", "").replace("-d ", "")
1282
if app == "virt-install":
1283
if (not cmdstr.count("--print-xml") and
1284
not cmdstr.count("--print-step") and
1285
not cmdstr.count("--quiet")):
1286
cmdstr += " --print-step all"
1288
elif app == "virt-image":
1289
if not cmdstr.count("--print"):
1290
cmdstr += " --print"
1292
elif app == "virt-clone":
1293
if not cmdstr.count("--print-xml"):
1294
cmdstr += " --print-xml"
1296
if app != "virt-convert" and not cmdstr.count(fakeuri):
1297
cmdstr += " --connect %s" % fakeuri
1299
cmd = Command(cmdstr)
1300
cmd.check_success = not filename.endswith("fail.xml")
1301
cmd.compare_file = filename
1308
error_ret.append(err)
1317
if len(sys.argv) > 1:
1318
for i in range(1, len(sys.argv)):
1319
if sys.argv[i].count("debug"):
1321
elif sys.argv[i].count("--app"):
1322
do_app = sys.argv[i + 1]
1323
elif sys.argv[i].count("--category"):
1324
do_category = sys.argv[i + 1]
1326
# Setup needed files
1327
for i in exist_files:
1328
if os.path.exists(i):
1329
raise ValueError("'%s' will be used by testsuite, can not already"
1354
for testtype in ["valid", "invalid", "compare"]:
1355
for optstr in catdict.get(testtype) or []:
1356
if testtype == "compare":
1357
optstr, filename = optstr
1358
filename = "%s/%s.xml" % (compare_xmldir, filename)
1360
args = category_args + " " + optstr
1361
args = default_args(app, args, testtype) + " " + args
1362
cmdstr = "./%s %s" % (app, args)
1364
cmd = Command(cmdstr)
1365
if testtype == "compare":
1366
cmd.check_success = not filename.endswith("fail.xml")
1367
cmd.compare_file = filename
1369
cmd.check_success = bool(testtype == "valid")
1377
old_bridge = virtinst._util.default_bridge2
1381
Create initial test files/dirs
1332
1383
os.system("mkdir %s" % ro_dir)
1334
1385
for i in exist_files:
1338
1389
os.system("chmod 444 %s" % ro_img)
1339
1390
os.system("chmod 555 %s" % ro_dir)
1343
run_tests(do_app, do_category, error_ret)
1346
for err in error_ret:
1392
virtinst._util.default_bridge2 = lambda ignore: None
1350
print "\nAll tests completed successfully."
1397
Cleanup temporary files used for testing
1354
1399
for i in clean_files:
1355
1400
os.system("chmod 777 %s > /dev/null 2>&1" % i)
1356
1401
os.system("rm -rf %s > /dev/null 2>&1" % i)
1358
if __name__ == "__main__":
1361
except KeyboardInterrupt:
1362
print "Tests interrupted"
1403
virtinst._util.default_bridge2 = old_bridge
1405
class CLITests(unittest.TestCase):
1406
def __init__(self, *args, **kwargs):
1407
unittest.TestCase.__init__(self, *args, **kwargs)
1412
# Only run this for first test
1417
# Only run this on the last test
1418
if curtest == newidx:
1422
def cmdtemplate(self, c):
1426
return lambda s: cmdtemplate(s, cmd)
1428
_cmdlist = build_cmd_list()
1429
for _cmd in _cmdlist:
1431
setattr(CLITests, "testCLI%d" % newidx, maketest(_cmd))