1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2010 United States Government as represented by the
4
# Administrator of the National Aeronautics and Space Administration.
7
# Licensed under the Apache License, Version 2.0 (the "License"); you may
8
# not use this file except in compliance with the License. You may obtain
9
# a copy of the License at
11
# http://www.apache.org/licenses/LICENSE-2.0
13
# Unless required by applicable law or agreed to in writing, software
14
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16
# License for the specific language governing permissions and limitations
25
# If ../nova/__init__.py exists, add ../ to Python search path, so that
26
# it will override what happens to be installed in /usr/(local/)lib/python...
27
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
30
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
31
sys.path.insert(0, possible_topdir)
33
from smoketests import flags
34
from smoketests import base
39
TEST_PREFIX = 'test%s' % int(random.random() * 1000000)
40
TEST_BUCKET = '%s_bucket' % TEST_PREFIX
41
TEST_KEY = '%s_key' % TEST_PREFIX
42
TEST_GROUP = '%s_group' % TEST_PREFIX
45
class AddressTests(base.UserSmokeTestCase):
46
def test_000_setUp(self):
47
self.create_key_pair(self.conn, TEST_KEY)
48
reservation = self.conn.run_instances(FLAGS.test_image,
49
instance_type='m1.tiny',
51
self.data['instance'] = reservation.instances[0]
52
if not self.wait_for_running(self.data['instance']):
53
self.fail('instance failed to start')
54
self.data['instance'].update()
55
if not self.wait_for_ping(self.data['instance'].private_dns_name):
56
self.fail('could not ping instance')
57
if not self.wait_for_ssh(self.data['instance'].private_dns_name,
59
self.fail('could not ssh to instance')
61
def test_001_can_allocate_floating_ip(self):
62
result = self.conn.allocate_address()
63
self.assertTrue(hasattr(result, 'public_ip'))
64
self.data['public_ip'] = result.public_ip
66
def test_002_can_associate_ip_with_instance(self):
67
result = self.conn.associate_address(self.data['instance'].id,
68
self.data['public_ip'])
69
self.assertTrue(result)
71
def test_003_can_ssh_with_public_ip(self):
72
ssh_authorized = False
73
groups = self.conn.get_all_security_groups(['default'])
74
for rule in groups[0].rules:
75
if (rule.ip_protocol == 'tcp' and
76
int(rule.from_port) <= 22 and
77
int(rule.to_port) >= 22):
80
if not ssh_authorized:
81
self.conn.authorize_security_group('default',
86
if not self.wait_for_ssh(self.data['public_ip'], TEST_KEY):
87
self.fail('could not ssh to public ip')
89
if not ssh_authorized:
90
self.conn.revoke_security_group('default',
95
def test_004_can_disassociate_ip_from_instance(self):
96
result = self.conn.disassociate_address(self.data['public_ip'])
97
self.assertTrue(result)
99
def test_005_can_deallocate_floating_ip(self):
100
result = self.conn.release_address(self.data['public_ip'])
101
self.assertTrue(result)
103
def test_999_tearDown(self):
104
self.delete_key_pair(self.conn, TEST_KEY)
105
self.conn.terminate_instances([self.data['instance'].id])
108
class SecurityGroupTests(base.UserSmokeTestCase):
110
def __get_metadata_item(self, category):
111
id_url = "latest/meta-data/%s" % category
112
options = "-f -s --max-time 1"
113
command = "curl %s %s/%s" % (options, self.data['public_ip'], id_url)
114
status, output = commands.getstatusoutput(command)
115
value = output.strip()
120
def __public_instance_is_accessible(self):
121
instance_id = self.__get_metadata_item('instance-id')
124
if instance_id != self.data['instance'].id:
125
raise Exception("Wrong instance id. Expected: %s, Got: %s" %
126
(self.data['instance'].id, instance_id))
129
def test_001_can_create_security_group(self):
130
self.conn.create_security_group(TEST_GROUP, description='test')
132
groups = self.conn.get_all_security_groups()
133
self.assertTrue(TEST_GROUP in [group.name for group in groups])
135
def test_002_can_launch_instance_in_security_group(self):
136
with open("proxy.sh") as f:
138
self.create_key_pair(self.conn, TEST_KEY)
139
reservation = self.conn.run_instances(FLAGS.test_image,
141
security_groups=[TEST_GROUP],
143
instance_type='m1.tiny')
145
self.data['instance'] = reservation.instances[0]
146
if not self.wait_for_running(self.data['instance']):
147
self.fail('instance failed to start')
148
self.data['instance'].update()
150
def test_003_can_authorize_security_group_ingress(self):
151
self.assertTrue(self.conn.authorize_security_group(TEST_GROUP,
156
def test_004_can_access_metadata_over_public_ip(self):
157
result = self.conn.allocate_address()
158
self.assertTrue(hasattr(result, 'public_ip'))
159
self.data['public_ip'] = result.public_ip
161
result = self.conn.associate_address(self.data['instance'].id,
162
self.data['public_ip'])
163
start_time = time.time()
165
while not self.__public_instance_is_accessible():
167
if time.time() - start_time > 60:
168
raise Exception("Timeout")
171
result = self.conn.disassociate_address(self.data['public_ip'])
173
def test_005_validate_metadata(self):
175
instance = self.data['instance']
176
self.assertTrue(instance.instance_type,
177
self.__get_metadata_item("instance-type"))
178
#FIXME(dprince): validate more metadata here
180
def test_006_can_revoke_security_group_ingress(self):
181
self.assertTrue(self.conn.revoke_security_group(TEST_GROUP,
185
start_time = time.time()
186
while self.__public_instance_is_accessible():
187
# 1 minute to teardown
188
if time.time() - start_time > 60:
189
raise Exception("Timeout")
192
def test_999_tearDown(self):
193
self.conn.delete_key_pair(TEST_KEY)
194
self.conn.delete_security_group(TEST_GROUP)
195
groups = self.conn.get_all_security_groups()
196
self.assertFalse(TEST_GROUP in [group.name for group in groups])
197
self.conn.terminate_instances([self.data['instance'].id])
198
self.assertTrue(self.conn.release_address(self.data['public_ip']))