~justin-fathomdb/nova/bug744150

« back to all changes in this revision

Viewing changes to smoketests/novatestcase.py

  • Committer: andy
  • Author(s): Jesse Andrews
  • Date: 2010-06-24 03:11:53 UTC
  • Revision ID: git-v1:5f52e0e923d4b317748c114b1fed1aa4ccd33c3c
devin's smoketests

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# COPYRIGHT NASA
 
2
 
 
3
import os, unittest, sys
 
4
from commands import getstatusoutput
 
5
from paramiko import SSHClient, RSAKey, AutoAddPolicy
 
6
import random
 
7
 
 
8
BUCKET_NAME = 'smoketest'
 
9
 
 
10
try:
 
11
    # pulling from environment means euca-bundle and other shell commands
 
12
    # are runable without futzing with the environment and zip files
 
13
    access_key = os.environ['EC2_ACCESS_KEY']
 
14
    secret_key = os.environ['EC2_SECRET_KEY']
 
15
    endpoint = os.environ['EC2_URL']
 
16
    host = endpoint.split('/')[2].split(':')[0] # http://HOST:8773/services/Cloud
 
17
except:
 
18
    print 'you need to source admin rc before running smoketests'
 
19
    sys.exit(2)
 
20
 
 
21
from nova.adminclient import NovaAdminClient
 
22
admin = NovaAdminClient(access_key=access_key, secret_key=secret_key, clc_ip=host)
 
23
 
 
24
class NovaTestCase(unittest.TestCase):
 
25
    def setUp(self):
 
26
        pass
 
27
 
 
28
    def tearDown(self):
 
29
        pass
 
30
 
 
31
    def connect_ssh(self, ip, key_name):
 
32
        # TODO: set a more reasonable connection timeout time
 
33
        key = RSAKey.from_private_key_file('/tmp/%s.pem' % key_name)
 
34
        client = SSHClient()
 
35
        client.load_system_host_keys()
 
36
        client.set_missing_host_key_policy(AutoAddPolicy())
 
37
        client.connect(ip, username='ubuntu', pkey=key)
 
38
        stdin, stdout, stderr = client.exec_command('uptime')
 
39
        print 'uptime: ', stdout.read()
 
40
        return client
 
41
 
 
42
    def can_ping(self, ip):
 
43
        return getstatusoutput('ping -c 1 %s' % ip)[0] == 0
 
44
 
 
45
    @property
 
46
    def admin(self):
 
47
        return admin.connection_for('admin')
 
48
 
 
49
    def connection_for(self, username):
 
50
        return admin.connection_for(username)
 
51
 
 
52
    def create_user(self, username):
 
53
        return admin.create_user(username)
 
54
 
 
55
    def get_user(self, username):
 
56
        return admin.get_user(username)
 
57
 
 
58
    def delete_user(self, username):
 
59
        return admin.delete_user(username)
 
60
 
 
61
    def get_signed_zip(self, username):
 
62
        return admin.get_zip(username)
 
63
 
 
64
    def create_key_pair(self, conn, key_name):
 
65
        try:
 
66
            os.remove('/tmp/%s.pem' % key_name)
 
67
        except:
 
68
            pass
 
69
        key = conn.create_key_pair(key_name)
 
70
        key.save('/tmp/')
 
71
        return key
 
72
 
 
73
    def delete_key_pair(self, conn, key_name):
 
74
        conn.delete_key_pair(key_name)
 
75
        try:
 
76
            os.remove('/tmp/%s.pem' % key_name)
 
77
        except:
 
78
            pass
 
79
 
 
80
    def bundle_image(self, image, kernel=False):
 
81
        cmd = 'euca-bundle-image -i %s' % image
 
82
        if kernel:
 
83
            cmd += ' --kernel true'
 
84
        status, output = getstatusoutput(cmd)
 
85
        if status != 0:
 
86
            print '%s -> \n %s' % (cmd, output)
 
87
            raise Exception(output)
 
88
        return True
 
89
 
 
90
    def upload_image(self, bucket_name, image):
 
91
        cmd = 'euca-upload-bundle -b %s -m /tmp/%s.manifest.xml' % (bucket_name, image)
 
92
        status, output = getstatusoutput(cmd)
 
93
        if status != 0:
 
94
            print '%s -> \n %s' % (cmd, output)
 
95
            raise Exception(output)
 
96
        return True
 
97
 
 
98
    def delete_bundle_bucket(self, bucket_name):
 
99
        cmd = 'euca-delete-bundle --clear -b %s' % (bucket_name)
 
100
        status, output = getstatusoutput(cmd)
 
101
        if status != 0:
 
102
            print '%s -> \n%s' % (cmd, output)
 
103
            raise Exception(output)
 
104
        return True
 
105
 
 
106
    def register_image(self, bucket_name, manifest):
 
107
        conn = admin.connection_for('admin')
 
108
        return conn.register_image("%s/%s.manifest.xml" % (bucket_name, manifest))
 
109
 
 
110
    def setUp_test_image(self, image, kernel=False):
 
111
        self.bundle_image(image, kernel=kernel)
 
112
        bucket = "auto_test_%s" % int(random.random() * 1000000)
 
113
        self.upload_image(bucket, image)
 
114
        return self.register_image(bucket, image)
 
115
 
 
116
    def tearDown_test_image(self, conn, image_id):
 
117
        conn.deregister_image(image_id)