1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2010 United States Government as represented by the
4
# Administrator of the National Aeronautics and Space Administration.
7
# Licensed under the Apache License, Version 2.0 (the "License"); you may
8
# not use this file except in compliance with the License. You may obtain
9
# a copy of the License at
11
# http://www.apache.org/licenses/LICENSE-2.0
13
# Unless required by applicable law or agreed to in writing, software
14
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16
# License for the specific language governing permissions and limitations
27
from smoketests import flags
28
from smoketests import base
31
SUITE_NAMES = '[image, instance, volume]'
34
flags.DEFINE_string('suite', None, 'Specific test suite to run ' + SUITE_NAMES)
35
flags.DEFINE_string('bundle_kernel', 'openwrt-x86-vmlinuz',
36
'Local kernel file to use for bundling tests')
37
flags.DEFINE_string('bundle_image', 'openwrt-x86-ext2.image',
38
'Local image file to use for bundling tests')
40
TEST_PREFIX = 'test%s' % int (random.random()*1000000)
41
TEST_BUCKET = '%s_bucket' % TEST_PREFIX
42
TEST_KEY = '%s_key' % TEST_PREFIX
46
class UserSmokeTestCase(base.SmokeTestCase):
49
self.conn = self.connection_for_env()
53
class ImageTests(UserSmokeTestCase):
54
def test_001_can_bundle_image(self):
55
self.assertTrue(self.bundle_image(FLAGS.bundle_image))
57
def test_002_can_upload_image(self):
58
self.assertTrue(self.upload_image(TEST_BUCKET, FLAGS.bundle_image))
60
def test_003_can_register_image(self):
61
image_id = self.conn.register_image('%s/%s.manifest.xml' %
62
(TEST_BUCKET, FLAGS.bundle_image))
63
self.assert_(image_id is not None)
64
self.data['image_id'] = image_id
66
def test_004_can_bundle_kernel(self):
67
self.assertTrue(self.bundle_image(FLAGS.bundle_kernel, kernel=True))
69
def test_005_can_upload_kernel(self):
70
self.assertTrue(self.upload_image(TEST_BUCKET, FLAGS.bundle_kernel))
72
def test_006_can_register_kernel(self):
73
kernel_id = self.conn.register_image('%s/%s.manifest.xml' %
74
(TEST_BUCKET, FLAGS.bundle_kernel))
75
self.assert_(kernel_id is not None)
76
self.data['kernel_id'] = kernel_id
78
def test_007_images_are_available_within_10_seconds(self):
80
image = self.conn.get_image(self.data['image_id'])
81
if image and image.state == 'available':
86
self.assert_(False) # wasn't available within 10 seconds
87
self.assert_(image.type == 'machine')
90
kernel = self.conn.get_image(self.data['kernel_id'])
91
if kernel and kernel.state == 'available':
95
self.assert_(False) # wasn't available within 10 seconds
96
self.assert_(kernel.type == 'kernel')
98
def test_008_can_describe_image_attribute(self):
99
attrs = self.conn.get_image_attribute(self.data['image_id'],
101
self.assert_(attrs.name, 'launch_permission')
103
def test_009_can_modify_image_launch_permission(self):
104
self.conn.modify_image_attribute(image_id=self.data['image_id'],
106
attribute='launchPermission',
108
image = self.conn.get_image(self.data['image_id'])
109
self.assertEqual(image.id, self.data['image_id'])
111
def test_010_can_see_launch_permission(self):
112
attrs = self.conn.get_image_attribute(self.data['image_id'],
114
self.assert_(attrs.name, 'launch_permission')
115
self.assert_(attrs.attrs['groups'][0], 'all')
117
def test_011_user_can_deregister_kernel(self):
118
self.assertTrue(self.conn.deregister_image(self.data['kernel_id']))
120
def test_012_can_deregister_image(self):
121
self.assertTrue(self.conn.deregister_image(self.data['image_id']))
123
def test_013_can_delete_bundle(self):
124
self.assertTrue(self.delete_bundle_bucket(TEST_BUCKET))
127
class InstanceTests(UserSmokeTestCase):
128
def test_001_can_create_keypair(self):
129
key = self.create_key_pair(self.conn, TEST_KEY)
130
self.assertEqual(key.name, TEST_KEY)
132
def test_002_can_create_instance_with_keypair(self):
133
reservation = self.conn.run_instances(FLAGS.test_image,
135
instance_type='m1.tiny')
136
self.assertEqual(len(reservation.instances), 1)
137
self.data['instance_id'] = reservation.instances[0].id
139
def test_003_instance_runs_within_60_seconds(self):
140
reservations = self.conn.get_all_instances([data['instance_id']])
141
instance = reservations[0].instances[0]
142
# allow 60 seconds to exit pending with IP
145
if instance.state == u'running':
149
self.fail('instance failed to start')
150
ip = reservations[0].instances[0].private_dns_name
151
self.failIf(ip == '0.0.0.0')
152
self.data['private_ip'] = ip
153
print self.data['private_ip']
155
def test_004_can_ping_private_ip(self):
156
for x in xrange(120):
157
# ping waits for 1 second
158
status, output = commands.getstatusoutput(
159
'ping -c1 %s' % self.data['private_ip'])
163
self.fail('could not ping instance')
165
def test_005_can_ssh_to_private_ip(self):
168
conn = self.connect_ssh(self.data['private_ip'], TEST_KEY)
175
self.fail('could not ssh to instance')
177
def test_006_can_allocate_elastic_ip(self):
178
result = self.conn.allocate_address()
179
self.assertTrue(hasattr(result, 'public_ip'))
180
self.data['public_ip'] = result.public_ip
182
def test_007_can_associate_ip_with_instance(self):
183
result = self.conn.associate_address(self.data['instance_id'],
184
self.data['public_ip'])
185
self.assertTrue(result)
187
def test_008_can_ssh_with_public_ip(self):
190
conn = self.connect_ssh(self.data['public_ip'], TEST_KEY)
197
self.fail('could not ssh to instance')
199
def test_009_can_disassociate_ip_from_instance(self):
200
result = self.conn.disassociate_address(self.data['public_ip'])
201
self.assertTrue(result)
203
def test_010_can_deallocate_elastic_ip(self):
204
result = self.conn.release_address(self.data['public_ip'])
205
self.assertTrue(result)
207
def test_999_tearDown(self):
208
self.delete_key_pair(self.conn, TEST_KEY)
209
if self.data.has_key('instance_id'):
210
self.conn.terminate_instances([data['instance_id']])
213
class VolumeTests(UserSmokeTestCase):
215
super(VolumeTests, self).setUp()
216
self.device = '/dev/vdb'
218
def test_000_setUp(self):
219
self.create_key_pair(self.conn, TEST_KEY)
220
reservation = self.conn.run_instances(FLAGS.test_image,
221
instance_type='m1.tiny',
223
instance = reservation.instances[0]
224
self.data['instance'] = instance
225
for x in xrange(120):
226
if self.can_ping(instance.private_dns_name):
229
self.fail('unable to start instance')
231
def test_001_can_create_volume(self):
232
volume = self.conn.create_volume(1, 'nova')
233
self.assertEqual(volume.size, 1)
234
self.data['volume'] = volume
235
# Give network time to find volume.
238
def test_002_can_attach_volume(self):
239
volume = self.data['volume']
242
if volume.status == u'available':
247
self.fail('cannot attach volume with state %s' % volume.status)
249
volume.attach(self.data['instance'].id, self.device)
251
# Volumes seems to report "available" too soon.
253
if volume.status == u'in-use':
258
self.assertEqual(volume.status, u'in-use')
260
# Give instance time to recognize volume.
263
def test_003_can_mount_volume(self):
264
ip = self.data['instance'].private_dns_name
265
conn = self.connect_ssh(ip, TEST_KEY)
267
commands.append('mkdir -p /mnt/vol')
268
commands.append('mkfs.ext2 %s' % self.device)
269
commands.append('mount %s /mnt/vol' % self.device)
270
commands.append('echo success')
271
stdin, stdout, stderr = conn.exec_command(' && '.join(commands))
274
if not out.strip().endswith('success'):
275
self.fail('Unable to mount: %s %s' % (out, stderr.read()))
277
def test_004_can_write_to_volume(self):
278
ip = self.data['instance'].private_dns_name
279
conn = self.connect_ssh(ip, TEST_KEY)
280
# FIXME(devcamcar): This doesn't fail if the volume hasn't been mounted
281
stdin, stdout, stderr = conn.exec_command(
282
'echo hello > /mnt/vol/test.txt')
286
self.fail('Unable to write to mount: %s' % (err))
288
def test_005_volume_is_correct_size(self):
289
ip = self.data['instance'].private_dns_name
290
conn = self.connect_ssh(ip, TEST_KEY)
291
stdin, stdout, stderr = conn.exec_command(
292
"df -h | grep %s | awk {'print $2'}" % self.device)
295
if not out.strip() == '1008M':
296
self.fail('Volume is not the right size: %s %s' %
297
(out, stderr.read()))
299
def test_006_me_can_umount_volume(self):
300
ip = self.data['instance'].private_dns_name
301
conn = self.connect_ssh(ip, TEST_KEY)
302
stdin, stdout, stderr = conn.exec_command('umount /mnt/vol')
306
self.fail('Unable to unmount: %s' % (err))
308
def test_007_me_can_detach_volume(self):
309
result = self.conn.detach_volume(volume_id=self.data['volume'].id)
310
self.assertTrue(result)
313
def test_008_me_can_delete_volume(self):
314
result = self.conn.delete_volume(self.data['volume'].id)
315
self.assertTrue(result)
317
def test_999_tearDown(self):
318
self.conn.terminate_instances([self.data['instance'].id])
319
self.conn.delete_key_pair(TEST_KEY)
322
if __name__ == "__main__":
323
suites = {'image': unittest.makeSuite(ImageTests),
324
'instance': unittest.makeSuite(InstanceTests),
325
'volume': unittest.makeSuite(VolumeTests)}
326
sys.exit(base.run_tests(suites))