~ewanmellor/nova/xenapi

« back to all changes in this revision

Viewing changes to nova/tests/cloud_unittest.py

  • Committer: Ewan Mellor
  • Date: 2010-09-28 18:12:13 UTC
  • mfrom: (145.13.135 trunk)
  • Revision ID: ewan.mellor@citrix.com-20100928181213-rnit65ejj20hjvjo
Merged with trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
#    under the License.
18
18
 
19
19
import logging
 
20
from M2Crypto import BIO
 
21
from M2Crypto import RSA
20
22
import StringIO
21
23
import time
22
 
from tornado import ioloop
 
24
 
23
25
from twisted.internet import defer
24
26
import unittest
25
27
from xml.etree import ElementTree
26
28
 
 
29
from nova import crypto
 
30
from nova import db
27
31
from nova import flags
28
32
from nova import rpc
29
33
from nova import test
 
34
from nova import utils
30
35
from nova.auth import manager
31
 
from nova.compute import service
32
 
from nova.endpoint import api
33
 
from nova.endpoint import cloud
 
36
from nova.compute import power_state
 
37
from nova.api.ec2 import context
 
38
from nova.api.ec2 import cloud
34
39
 
35
40
 
36
41
FLAGS = flags.FLAGS
39
44
class CloudTestCase(test.BaseTestCase):
40
45
    def setUp(self):
41
46
        super(CloudTestCase, self).setUp()
42
 
        self.flags(connection_type='fake',
43
 
                   fake_storage=True)
 
47
        self.flags(connection_type='fake')
44
48
 
45
49
        self.conn = rpc.Connection.instance()
46
50
        logging.getLogger().setLevel(logging.DEBUG)
49
53
        self.cloud = cloud.CloudController()
50
54
 
51
55
        # set up a service
52
 
        self.compute = service.ComputeService()
 
56
        self.compute = utils.import_class(FLAGS.compute_manager)
53
57
        self.compute_consumer = rpc.AdapterConsumer(connection=self.conn,
54
58
                                                     topic=FLAGS.compute_topic,
55
59
                                                     proxy=self.compute)
56
60
        self.injected.append(self.compute_consumer.attach_to_tornado(self.ioloop))
57
61
 
58
 
        try:
59
 
            manager.AuthManager().create_user('admin', 'admin', 'admin')
60
 
        except: pass
61
 
        admin = manager.AuthManager().get_user('admin')
62
 
        project = manager.AuthManager().create_project('proj', 'admin', 'proj')
63
 
        self.context = api.APIRequestContext(handler=None,project=project,user=admin)
 
62
        self.manager = manager.AuthManager()
 
63
        self.user = self.manager.create_user('admin', 'admin', 'admin', True)
 
64
        self.project = self.manager.create_project('proj', 'admin', 'proj')
 
65
        self.context = context.APIRequestContext(user=self.user,
 
66
                                                 project=self.project)
64
67
 
65
68
    def tearDown(self):
66
 
        manager.AuthManager().delete_project('proj')
67
 
        manager.AuthManager().delete_user('admin')
 
69
        self.manager.delete_project(self.project)
 
70
        self.manager.delete_user(self.user)
 
71
        super(CloudTestCase, self).setUp()
 
72
 
 
73
    def _create_key(self, name):
 
74
        # NOTE(vish): create depends on pool, so just call helper directly
 
75
        return cloud._gen_key(self.context, self.context.user.id, name)
68
76
 
69
77
    def test_console_output(self):
70
78
        if FLAGS.connection_type == 'fake':
77
85
        self.assert_(output)
78
86
        rv = yield self.compute.terminate_instance(instance_id)
79
87
 
 
88
 
 
89
    def test_key_generation(self):
 
90
        result = self._create_key('test')
 
91
        private_key = result['private_key']
 
92
        key = RSA.load_key_string(private_key, callback=lambda: None)
 
93
        bio = BIO.MemoryBuffer()
 
94
        public_key = db.key_pair_get(self.context,
 
95
                                    self.context.user.id,
 
96
                                    'test')['public_key']
 
97
        key.save_pub_key_bio(bio)
 
98
        converted = crypto.ssl_pub_to_ssh_pub(bio.read())
 
99
        # assert key fields are equal
 
100
        self.assertEqual(public_key.split(" ")[1].strip(),
 
101
                         converted.split(" ")[1].strip())
 
102
 
 
103
    def test_describe_key_pairs(self):
 
104
        self._create_key('test1')
 
105
        self._create_key('test2')
 
106
        result = self.cloud.describe_key_pairs(self.context)
 
107
        keys = result["keypairsSet"]
 
108
        self.assertTrue(filter(lambda k: k['keyName'] == 'test1', keys))
 
109
        self.assertTrue(filter(lambda k: k['keyName'] == 'test2', keys))
 
110
 
 
111
    def test_delete_key_pair(self):
 
112
        self._create_key('test')
 
113
        self.cloud.delete_key_pair(self.context, 'test')
 
114
 
80
115
    def test_run_instances(self):
81
116
        if FLAGS.connection_type == 'fake':
82
117
            logging.debug("Can't test instances without a real virtual env.")
95
130
            rv = yield defer.succeed(time.sleep(1))
96
131
            info = self.cloud._get_instance(instance['instance_id'])
97
132
            logging.debug(info['state'])
98
 
            if info['state'] == node.Instance.RUNNING:
 
133
            if info['state'] == power_state.RUNNING:
99
134
                break
100
135
        self.assert_(rv)
101
136