3
import os, unittest, sys
4
from commands import getstatusoutput
5
from paramiko import SSHClient, RSAKey, AutoAddPolicy
8
BUCKET_NAME = 'smoketest'
11
# pulling from environment means euca-bundle and other shell commands
12
# are runable without futzing with the environment and zip files
13
access_key = os.environ['EC2_ACCESS_KEY']
14
secret_key = os.environ['EC2_SECRET_KEY']
15
endpoint = os.environ['EC2_URL']
16
host = endpoint.split('/')[2].split(':')[0] # http://HOST:8773/services/Cloud
18
print 'you need to source admin rc before running smoketests'
21
from nova.adminclient import NovaAdminClient
22
admin = NovaAdminClient(access_key=access_key, secret_key=secret_key, clc_ip=host)
24
class NovaTestCase(unittest.TestCase):
31
def connect_ssh(self, ip, key_name):
32
# TODO: set a more reasonable connection timeout time
33
key = RSAKey.from_private_key_file('/tmp/%s.pem' % key_name)
35
client.load_system_host_keys()
36
client.set_missing_host_key_policy(AutoAddPolicy())
37
client.connect(ip, username='ubuntu', pkey=key)
38
stdin, stdout, stderr = client.exec_command('uptime')
39
print 'uptime: ', stdout.read()
42
def can_ping(self, ip):
43
return getstatusoutput('ping -c 1 %s' % ip)[0] == 0
47
return admin.connection_for('admin')
49
def connection_for(self, username):
50
return admin.connection_for(username)
52
def create_user(self, username):
53
return admin.create_user(username)
55
def get_user(self, username):
56
return admin.get_user(username)
58
def delete_user(self, username):
59
return admin.delete_user(username)
61
def get_signed_zip(self, username):
62
return admin.get_zip(username)
64
def create_key_pair(self, conn, key_name):
66
os.remove('/tmp/%s.pem' % key_name)
69
key = conn.create_key_pair(key_name)
73
def delete_key_pair(self, conn, key_name):
74
conn.delete_key_pair(key_name)
76
os.remove('/tmp/%s.pem' % key_name)
80
def bundle_image(self, image, kernel=False):
81
cmd = 'euca-bundle-image -i %s' % image
83
cmd += ' --kernel true'
84
status, output = getstatusoutput(cmd)
86
print '%s -> \n %s' % (cmd, output)
87
raise Exception(output)
90
def upload_image(self, bucket_name, image):
91
cmd = 'euca-upload-bundle -b %s -m /tmp/%s.manifest.xml' % (bucket_name, image)
92
status, output = getstatusoutput(cmd)
94
print '%s -> \n %s' % (cmd, output)
95
raise Exception(output)
98
def delete_bundle_bucket(self, bucket_name):
99
cmd = 'euca-delete-bundle --clear -b %s' % (bucket_name)
100
status, output = getstatusoutput(cmd)
102
print '%s -> \n%s' % (cmd, output)
103
raise Exception(output)
106
def register_image(self, bucket_name, manifest):
107
conn = admin.connection_for('admin')
108
return conn.register_image("%s/%s.manifest.xml" % (bucket_name, manifest))
110
def setUp_test_image(self, image, kernel=False):
111
self.bundle_image(image, kernel=kernel)
112
bucket = "auto_test_%s" % int(random.random() * 1000000)
113
self.upload_image(bucket, image)
114
return self.register_image(bucket, image)
116
def tearDown_test_image(self, conn, image_id):
117
conn.deregister_image(image_id)