4
import logging, logging.handlers
13
from twisted.internet import reactor, task
14
from twisted.python import usage
15
from zipfile import ZipFile, is_zipfile
16
from shutil import rmtree
17
from collections import defaultdict, namedtuple
21
class Options(usage.Options):
23
["cleanup", None, "Clean up" ],
24
["write_log", 'w', "Write log to file" ],
25
["max_instances", 'm', "Run the maximum number of instances" ],
29
"log level wanted (debug, info, warning, error, critical"],
30
["start_stop", "s", None,
31
"start and stop instances over a certain period of time (x minutes)"],
34
euca_describe_availability_zones = 'euca-describe-availability-zones'
35
euca_run_instances = 'euca-run-instances'
36
euca_describe_instances = 'euca-describe-instances'
37
euca_terminate_instances = 'euca-terminate-instances'
38
euca_add_keypair = 'euca-add-keypair'
39
euca_delete_keypair = 'euca-delete-keypair'
40
euca_add_group = 'euca-add-group'
41
euca_delete_group = 'euca-delete-group'
42
euca_authorize = 'euca-authorize'
43
euca_get_console_output = 'euca-get-console-output'
44
euca_conf = 'euca_conf'
45
euca_describe_images = 'euca-describe-images'
47
zone = namedtuple('zone', 'max cpu ram disk')
52
class Instance(object):
53
TEST_STATES = [ 'not-tested', 'being-tested', 'success', 'failed',
54
'rescheduled', 'lost' ]
56
EMI_USED = defaultdict(int)
57
VMTYPE_USED = defaultdict(int)
58
GROUP_USED = defaultdict(int)
59
KEYPAIR_USED = defaultdict(int)
62
ZONE_CPU_DICT = dict()
63
CLUSTERS = defaultdict(int)
65
def __init__(self, emi, vmtype, user, ssh_key, group):
69
self.ssh_key = ssh_key
73
self.test_state = "not-tested"
74
self._test_retries = 0
76
self.req_shutdown = False
81
return "<Instance: %s - %s - %s - %s - %s - %s - %s>" % (self.id, \
82
self.emi, self.vmtype, self.user, self.ssh_key, self.group, self.state)
87
return logging.getLogger("INSTANCE %s" % (self.id))
89
return logging.getLogger()
92
def private_key(self):
93
return self.user.private_keys[self.ssh_key]
99
def state(self, value):
100
self.logger.debug("State: %s" % (value))
101
# The state has changed
102
if self._state != value:
103
self.logger.debug("New state: %s => %s" % (self._state, value))
105
# Only trigger a test when the state has been changed to running
106
if self._state == "running":
107
self.logger.debug("Scheduling test")
108
reactor.callLater(random.randint(5, 10) + random.random(),
110
if self._state in ["terminated", "shutting-down"] \
111
and not self.req_shutdown:
112
self.test_state = 'lost'
115
def test_state(self):
116
return self._test_state
118
def test_state(self, value):
119
self.logger.debug("Test state: %s" % (value))
120
if value not in self.TEST_STATES:
121
raise ValueError, "Unknow test state: %s" % (value)
123
self._test_state = value
125
def getProcessOutput(self, executable, args=(), timeout=60):
126
self.logger.debug("Executing: %s - %s" % (executable, args))
127
return self.user.getProcessOutput(executable, args, timeout)
130
self.logger.info("Starting instance")
131
output = self.getProcessOutput(euca_run_instances,
136
output.addCallbacks(self.started, self.errStarted)
137
# Terminate the instance no matter what
138
reactor.callLater(45*60, self.terminate)
140
def started(self, output):
141
self.logger.debug("Instance start output: %s" % (output))
143
for l in output.split("\n"):
144
if l.startswith('INSTANCE'):
145
self.id = l.split()[1]
146
self.cluster = l.split()[10]
147
self.logger.info("Started (%s/%s)" %
148
(self.emi, self.vmtype))
151
self.errStarted(output)
153
def errStarted(self, output):
154
self.logger.warning("Instance failed to start: %s" % (output))
157
if self.state != "running":
158
self.logger.debug("Not running - aborting scheduled test.")
160
self.logger.info("Testing instance")
161
self.test_state = "being-tested"
162
self._test_retries += 1
163
args = ['-o', 'UserKnownHostsFile=/dev/null',
164
'-o', 'StrictHostKeyChecking=no',
165
'-o', 'ConnectTimeout=5',
166
'-o', 'Batchmode=yes',
167
'-i', self.private_key,
168
"ubuntu@%s" % (self.pub_ip),
170
self.logger.debug("Ssh args: %s" % (args))
171
output = uec_utils.getProcessOutput('ssh', args, errortoo=True)
172
output.addCallback(self.tested)
174
def tested(self, output):
175
if 'TEST_SUCCESS' in output:
176
self.logger.info("Test successful [%s]" % self._test_retries)
177
self.test_state = "success"
178
self.logger.debug("Test output: %s" % (output))
181
elif self._test_retries <= self.TEST_MAX_RETRIES:
182
self.logger.info("Rescheduling test (%s/%s)" %
183
(self._test_retries, self.TEST_MAX_RETRIES))
184
self.test_state = "rescheduled"
185
self.logger.debug("Test output: %s" % (output))
186
reactor.callLater(30, self.test)
188
self.logger.warning("Test output: %s" % (output))
189
self.logger.info("Test failed")
190
self.test_state = "failed"
191
output = self.getProcessOutput(euca_get_console_output,
192
args = [self.id], timeout=300)
193
output.addCallback(self.consoleOutput)
196
def consoleOutput(self, output):
197
# caution -- only used when a test failed
198
self.logger.warning("Console output: START")
199
for aLine in re.split("\n", output):
200
self.logger.warning(aLine)
201
self.logger.warning("Console output: END")
205
if self.state != "terminated":
206
self.logger.info("Terminating")
208
self.state = "terminated"
210
"instance has already terminated, last test state=%s" \
213
if self.test_state not in ( "success", "failed" ):
214
output = self.getProcessOutput(euca_get_console_output,
215
args = [self.id], timeout=300)
216
output.addCallback(self.consoleOutput)
218
self.req_shutdown = True
219
output = self.getProcessOutput(euca_terminate_instances,
221
output.addCallback(self.terminated)
223
def terminated(self, output):
224
self.logger.info("Terminated")
225
if self.test_state not in ( "success", "failed" ):
226
self.logger.info("terminated state found while test_state = %s" %
228
self.test_state = "failed"
229
self.state = "terminated"
230
self.logger.debug("terminate output: %s" % (output))
234
self.user_id = 'admin'
235
self.logger = logging.getLogger("USER %s" % (self.user_id))
238
self.cred_dir = '%s/cred/%s/' %(os.getcwd(), self.user_id)
239
self.cred_zip = 'cred.zip'
244
return "<User: %s>" % (self.user_id)
247
rmtree(self.cred_dir, ignore_errors=1)
248
os.makedirs(self.cred_dir)
249
output = self.getProcessOutput(euca_conf,
250
args = ['--get-credentials',
251
self.cred_dir+self.cred_zip,
254
output.addCallback(self.credentialsAdded)
256
def credentialsAdded(self, output):
257
self.logger.debug("Credentials added output: %s" % (output))
258
if not is_zipfile(self.cred_dir+self.cred_zip):
259
self.logger.error("Credential file is not a valid zip")
261
ZipFile(self.cred_dir+self.cred_zip).extractall(self.cred_dir)
262
self.logger.info("Credentials downloaded")
263
for k in self.keypair_names:
264
self.logger.debug("Adding keypair %s" % (k))
265
output = self.getProcessOutput(euca_add_keypair, args = [k])
266
output.addCallback(self.keypairAdded)
267
for g in self.group_names:
268
self.logger.debug("Adding group %s" % (g))
269
output = self.getProcessOutput(euca_add_group,
270
args = ['-d', 'UEC-test', g])
271
output.addCallback(self.groupAdded)
273
def keypairAdded(self, output):
274
self.logger.debug("Keypair added output: %s" % (output))
277
for l in output.split("\n"):
278
if not key and l.startswith("KEYPAIR"):
280
self.logger.info("Keypair (%s) added" % (key))
281
Instance.KEYPAIR_USED[key]
282
self.keypairs.append(key)
283
privkey_fh = open(self.private_keys[key], 'w')
284
os.chmod(self.private_keys[key], 0600)
287
privkey_fh.write("%s\n" % (l))
291
def groupAdded(self, output):
292
self.logger.debug("Group added output: %s" % (output))
293
for l in output.split("\n"):
294
if l.startswith("GROUP"):
296
self.logger.info("Group (%s) added" % (group))
297
Instance.GROUP_USED[group]
298
self.logger.debug("Authorizing group %s" % (group))
299
for port in ['22','80']:
301
time.sleep(3) # Dummy delay see LP #592641
302
output = self.getProcessOutput(euca_authorize,
307
output.addCallback(self.groupAuthorized)
309
def groupAuthorized(self, output):
310
self.logger.debug("Group authorize output: %s" % (output))
311
for l in output.split("\n"):
312
if l.startswith("GROUP"):
314
self.logger.info("Group %s authorized" % (group))
315
if not self.groups.count(group): self.groups.append(group)
319
return len(self.keypairs) == self.nb_keypairs and \
320
len(self.groups) == self.nb_groups
323
def private_keys(self):
325
for k in self.keypairs:
326
ret[k] = os.path.join(self.cred_dir, "%s.priv" % (k))
330
def keypair_names(self):
331
return [ "uectest-k%s" % (k) for k in range(self.nb_keypairs)]
334
def group_names(self):
335
return [ "uectest-g%s" % (g) for g in range(self.nb_groups)]
337
def getProcessOutput(self, executable, args=(), timeout=60, eucarc = True):
338
eucarc = "source %s/eucarc ; " %self.cred_dir if eucarc else ""
339
myargs = [ '-c', "%s%s %s" % (eucarc,
342
self.logger.debug("Running bash command: %s " % (myargs))
343
return uec_utils.getProcessOutput('bash', myargs, timeout=timeout,
347
for k in self.keypair_names:
348
self.logger.debug("Deleting keypair %s" % (k))
349
output = self.getProcessOutput(euca_delete_keypair, args = [k])
350
output.addCallback(self.keypairDeleted, k)
351
for g in self.group_names:
352
self.logger.debug("Deleting group %s" % (g))
353
output = self.getProcessOutput(euca_delete_group, args = [g])
354
output.addCallback(self.groupDeleted, g)
356
def keypairDeleted(self, output, key):
357
self.logger.info("Keypair (%s) deleted" % (key))
358
self.logger.debug("Keypair deleted output: %s" % (output))
360
def groupDeleted(self, output, group):
361
self.logger.info("Group (%s) deleted" % (group))
362
self.logger.debug("Group deleted output: %s" % (output))
364
def checkRessources(instances, admin_user, first_call = False):
365
if not admin_user.ready:
366
reactor.callLater(10, checkRessources, instances, admin_user,
369
logging.debug("Checking ressource availability with %s" % (admin_user))
370
logging.debug("Known instances: %s" % (instances))
371
output = admin_user.getProcessOutput(euca_describe_availability_zones,
373
output.addCallback(availableEMI, instances, admin_user, first_call)
375
def availableEMI(output, instances, admin_user, first_call):
376
logging.debug("describe-availability-zones: %s" % (output))
377
# Get a list of types which have available ressource
382
for l in output.split("\n"):
383
m = re.search(r"(\S+)\s+(\d+)\s/\s(\d+)\s+(\d+)\s+(\d+)\s+(\d+)", l)
384
if m and int(m.group(2)) != 0:
385
available_types.append(m.group(1))
387
zone_cpu.append(m.group(1))
388
Instance.ZONE_CPU_DICT[m.group(1)] = zone._make([int(m.group(3)),
393
Instance.ZONE_CPU = zone_cpu
394
max_i = Instance.ZONE_CPU_DICT[zone_cpu[0]].max
395
if opts["max_instances"]:
396
Instance.MAX_INSTANCES = max_i
398
for zone in zone_cpu:
399
if max_i >= Instance.ZONE_CPU_DICT[zone].cpu:
400
max_i -= Instance.ZONE_CPU_DICT[zone].cpu
401
Instance.MAX_INSTANCES += 1
403
reactor.callLater(int(overtime)*60, TerminaTestedInstances, instances)
404
output = admin_user.getProcessOutput(euca_describe_images, args= [])
405
output.addCallback(availableRessource, instances,
406
admin_user, available_types)
408
def availableRessource(output, instances, admin_user, available_types):
409
# Get a list of available EMI
411
#logging.debug("describe-images: %s" % (output))
412
for l in output.split("\n"):
413
m = re.search(r"IMAGE\s+(emi.*?)\s+", l)
414
if m : available_emi.append(m.group(1))
415
logging.debug("Available VM types: %s" % (available_types))
416
logging.debug("Available EMI: %s" % (available_emi))
417
[ Instance.VMTYPE_USED[t] for t in available_types ]
418
[ Instance.EMI_USED[i] for i in available_emi ]
419
# Start one instance of an emi that has ressource available
424
emi = [ i for i in available_emi if Instance.EMI_USED[i] \
425
== min(Instance.EMI_USED.values())][0]
426
vmtype = [ t for t in available_types if Instance.VMTYPE_USED[t] \
427
== min(Instance.VMTYPE_USED.values())][0]
428
key = [ k for k in admin_user.keypairs if Instance.KEYPAIR_USED[k] \
429
== min(Instance.KEYPAIR_USED.values())][0]
430
group = [ g for g in admin_user.groups if Instance.GROUP_USED[g] \
431
== min(Instance.GROUP_USED.values())][0]
433
logging.info("Not enough ressource available to start a new EMI")
434
ressources_ok = False
436
if opts["max_instances"] and Instance.ZONE_CPU[0] in available_types:
437
vmtype = Instance.ZONE_CPU[0]
440
logging.info("Creating new instance")
441
i = Instance(emi = emi, vmtype = vmtype,
442
user = admin_user, ssh_key = key, group = group)
444
Instance.EMI_USED[emi] += 1
445
Instance.VMTYPE_USED[vmtype] += 1
446
Instance.KEYPAIR_USED[key] += 1
447
Instance.GROUP_USED[group] += 1
450
if not opts['start_stop']:
451
if (ressources_ok and len(instances) < Instance.MAX_INSTANCES):
452
reactor.callLater(random.randint(20, 30) + random.random(),
453
checkRessources, instances, admin_user)
455
logging.info("Max instances reached. Stop checking for %s" %
456
"available ressource.")
457
reactor.callLater(30, TerminaTestedInstances, instances)
459
reactor.callLater(random.randint(20, 30) + random.random(),
460
checkRessources, instances, admin_user)
462
def TerminaTestedInstances(instances):
465
logging.debug("Check if all the instances have been tested")
466
if len(instances) == len(filter(lambda i: i.test_state in \
467
( "success", "failed", "lost" ),
469
logging.info("Terminate all tested instances")
471
if i.state not in 'terminated': i.terminate()
472
if len(instances) == len(filter(lambda i: i.state == 'terminated',
474
logging.info("Test run completed")
477
reactor.callLater(random.randint(10, 15) + random.random(),
478
TerminaTestedInstances, instances)
480
def checkInstancesState(instances, admin_user):
481
if not admin_user.ready:
482
reactor.callLater(15, checkInstancesState, instances, admin_user)
484
logging.debug("Checking instances state")
485
output = admin_user.getProcessOutput(euca_describe_instances)
486
output.addCallback(instancesState, instances, admin_user)
488
def instancesState(output, instances, admin_user):
489
logging.debug("describe instance: %s", output)
490
for l in output.split("\n"):
491
if l.startswith('INSTANCE'):
494
i = filter(lambda i: i.id == i_id, instances)[0]
496
#logging.debug("Unknown instance %s - skipping" % (i_id))
500
pub_ip = l.split()[3]
501
if pub_ip != '0.0.0.0':
502
i.logger.debug("Setting pub ip (%s)" % (pub_ip))
504
m = re.search(r"\s+(\w+)\s+eki-", l)
505
if m: i.zone = m.group(1)
506
# Stop when the number of instances have been run and tested
507
#if config['max_instances_to_start'] != 0 \
508
# and len(instances) >= config['max_instances_to_start'] \
509
# and len(filter(lambda i: i.state != 'terminated', instances)) == 0:
510
# logging.info("Test run completed")
513
reactor.callLater(random.randint(10, 20) + random.random(),
514
checkInstancesState, instances, admin_user)
516
def cleanUp(instances):
517
logging.info("Cleaning up")
518
logging.debug("Cleaning instances: %s" % (instances))
519
[ i.terminate() for i in instances ]
520
logging.debug("Cleaning user: %s" % (admin_user.user_id))
523
def printStats(instances, once=False):
524
stats = { 'started' : len(instances) }
525
for s in Instance.TEST_STATES:
526
stats[s] = len(filter(lambda i: i.test_state == s, instances))
528
stats["%s_rate" % (s)] = float("%.2f" \
529
% (float(stats[s])/stats['started']))
530
except ZeroDivisionError:
531
stats["%s_rate" % (s)] = 0.00
532
stats["%s_instances" % (s)] = \
533
[ i.id for i in instances if i.test_state == s ]
534
logging.getLogger('STATS').info("Stats: %s" % (yaml.dump(stats)))
536
reactor.callLater(60, printStats, instances)
538
def printFinalStats(instances):
539
stats = { 'started' : len(instances) }
540
for s in Instance.TEST_STATES:
541
stats[s] = len(filter(lambda i: i.test_state == s, instances))
543
stats["%s_rate" % (s)] = float("%.2f" \
544
% (float(stats[s])/stats['started']))
545
except ZeroDivisionError:
546
stats["%s_rate" % (s)] = 0.00
547
for s in sorted(stats.keys()):
548
print "%s : %s" % (s, stats[s])
551
LEVELS = {'debug': logging.DEBUG,
552
'info': logging.INFO,
553
'warning': logging.WARNING,
554
'error': logging.ERROR,
555
'critical': logging.CRITICAL}
562
opts.parseOptions() # When given no argument, parses sys.argv[1:]
563
except usage.UsageError, errortext:
564
print '%s: %s' % (sys.argv[0], errortext)
565
print '%s: Try --help for usage details.' % (sys.argv[0])
568
# Verify that script is run as root
571
"This script needs superuser permissions to run correctly\n")
574
level = LEVELS.get(opts['log'], logging.NOTSET)
576
if opts['write_log']:
577
suffix = "all_vmtypes"
578
if opts["max_instances"]:
579
suffix = "max_instances"
580
elif opts["start_stop"]:
581
suffix = "start_stop"
582
LOG_FILENAME = "/var/log/uec_instances.%s.log" %suffix
583
handler = logging.handlers.RotatingFileHandler(LOG_FILENAME,
585
logging.getLogger().addHandler(handler)
586
handler.setFormatter(
587
logging.Formatter("[%(asctime)s] %(levelname)s %(message)s"))
588
logging.getLogger().setLevel(level)
591
logging.basicConfig(level=level)
592
logging.info("log level set to " + opts['log'])
594
admin_user = User() # Create admin user
595
if opts["start_stop"]:
596
overtime = opts["start_stop"]
600
reactor.callWhenRunning(admin_user.setup) # setup admin user
601
reactor.callWhenRunning(checkRessources, instances, admin_user,
603
reactor.callWhenRunning(checkInstancesState, instances, admin_user)
604
printStats(instances)
607
printStats(instances, once=True)
608
printFinalStats(instances)
609
if len(filter(lambda i: i.test_state == 'success', instances)) != \
615
if __name__ == "__main__":