30
from nova import block_device
30
31
from nova import context
31
32
from nova import exception
32
33
from nova import test
35
from nova.api import auth
33
36
from nova.api import ec2
34
37
from nova.api.ec2 import apirequest
35
38
from nova.api.ec2 import cloud
36
39
from nova.api.ec2 import ec2utils
37
from nova.auth import manager
40
42
class FakeHttplibSocket(object):
147
149
properties0 = {'mappings': mappings}
148
150
properties1 = {'root_device_name': '/dev/sdb', 'mappings': mappings}
150
root_device_name = ec2utils.properties_root_device_name(properties0)
152
root_device_name = block_device.properties_root_device_name(
151
154
self.assertEqual(root_device_name, '/dev/sda1')
153
root_device_name = ec2utils.properties_root_device_name(properties1)
156
root_device_name = block_device.properties_root_device_name(
154
158
self.assertEqual(root_device_name, '/dev/sdb')
156
160
def test_mapping_prepend_dev(self):
184
188
'device': '/dev/sdc1'},
185
189
{'virtual': 'ephemeral1',
186
190
'device': '/dev/sdc1'}]
187
self.assertDictListMatch(ec2utils.mappings_prepend_dev(mappings),
191
self.assertDictListMatch(block_device.mappings_prepend_dev(mappings),
192
196
"""Unit test for the cloud controller on an EC2 API"""
194
198
super(ApiEc2TestCase, self).setUp()
195
self.manager = manager.AuthManager()
196
199
self.host = '127.0.0.1'
197
self.app = ec2.Authenticate(ec2.Requestify(ec2.Executor(),
198
'nova.api.ec2.cloud.CloudController'))
200
# NOTE(vish): skipping the Authorizer
201
roles = ['sysadmin', 'netadmin']
202
ctxt = context.RequestContext('fake', 'fake', roles=roles)
203
self.app = auth.InjectContext(ctxt,
204
ec2.Requestify(ec2.Authorizer(ec2.Executor()),
205
'nova.api.ec2.cloud.CloudController'))
200
207
def expect_http(self, host=None, is_secure=False, api_version=None):
201
208
"""Returns a new EC2 connection"""
246
253
self.expect_http(api_version='2010-10-30')
247
254
self.mox.ReplayAll()
249
user = self.manager.create_user('fake', 'fake', 'fake')
250
project = self.manager.create_project('fake', 'fake', 'fake')
252
256
# Any request should be fine
253
257
self.ec2.get_all_instances()
254
258
self.assertTrue(self.ec2.APIVersion in self.http.getresponsebody(),
255
259
'The version in the xmlns of the response does '
256
260
'not match the API version given in the request.')
258
self.manager.delete_project(project)
259
self.manager.delete_user(user)
261
262
def test_describe_instances(self):
262
263
"""Test that, after creating a user and a project, the describe
263
264
instances call to the API works properly"""
264
265
self.expect_http()
265
266
self.mox.ReplayAll()
266
user = self.manager.create_user('fake', 'fake', 'fake')
267
project = self.manager.create_project('fake', 'fake', 'fake')
268
267
self.assertEqual(self.ec2.get_all_instances(), [])
269
self.manager.delete_project(project)
270
self.manager.delete_user(user)
272
269
def test_terminate_invalid_instance(self):
273
270
"""Attempt to terminate an invalid instance"""
274
271
self.expect_http()
275
272
self.mox.ReplayAll()
276
user = self.manager.create_user('fake', 'fake', 'fake')
277
project = self.manager.create_project('fake', 'fake', 'fake')
278
273
self.assertRaises(EC2ResponseError, self.ec2.terminate_instances,
280
self.manager.delete_project(project)
281
self.manager.delete_user(user)
283
276
def test_get_all_key_pairs(self):
284
277
"""Test that, after creating a user and project and generating
287
280
self.mox.ReplayAll()
288
281
keyname = "".join(random.choice("sdiuisudfsdcnpaqwertasd") \
289
282
for x in range(random.randint(4, 8)))
290
user = self.manager.create_user('fake', 'fake', 'fake')
291
project = self.manager.create_project('fake', 'fake', 'fake')
292
283
# NOTE(vish): create depends on pool, so call helper directly
293
cloud._gen_key(context.get_admin_context(), user.id, keyname)
284
cloud._gen_key(context.get_admin_context(), 'fake', keyname)
295
286
rv = self.ec2.get_all_key_pairs()
296
287
results = [k for k in rv if k.name == keyname]
297
288
self.assertEquals(len(results), 1)
298
self.manager.delete_project(project)
299
self.manager.delete_user(user)
301
290
def test_create_duplicate_key_pair(self):
302
291
"""Test that, after successfully generating a keypair,
305
294
self.mox.ReplayAll()
306
295
keyname = "".join(random.choice("sdiuisudfsdcnpaqwertasd") \
307
296
for x in range(random.randint(4, 8)))
308
user = self.manager.create_user('fake', 'fake', 'fake')
309
project = self.manager.create_project('fake', 'fake', 'fake')
310
297
# NOTE(vish): create depends on pool, so call helper directly
311
298
self.ec2.create_key_pair('test')
325
312
"""Test that we can retrieve security groups"""
326
313
self.expect_http()
327
314
self.mox.ReplayAll()
328
user = self.manager.create_user('fake', 'fake', 'fake', admin=True)
329
project = self.manager.create_project('fake', 'fake', 'fake')
331
316
rv = self.ec2.get_all_security_groups()
333
318
self.assertEquals(len(rv), 1)
334
319
self.assertEquals(rv[0].name, 'default')
336
self.manager.delete_project(project)
337
self.manager.delete_user(user)
339
321
def test_create_delete_security_group(self):
340
322
"""Test that we can create a security group"""
341
323
self.expect_http()
342
324
self.mox.ReplayAll()
343
user = self.manager.create_user('fake', 'fake', 'fake', admin=True)
344
project = self.manager.create_project('fake', 'fake', 'fake')
346
# At the moment, you need both of these to actually be netadmin
347
self.manager.add_role('fake', 'netadmin')
348
project.add_role('fake', 'netadmin')
350
326
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
351
327
for x in range(random.randint(4, 8)))
365
341
self.ec2.delete_security_group(security_group_name)
367
self.manager.delete_project(project)
368
self.manager.delete_user(user)
343
def test_group_name_valid_chars_security_group(self):
344
""" Test that we sanely handle invalid security group names.
345
API Spec states we should only accept alphanumeric characters,
346
spaces, dashes, and underscores. """
350
# Test block group_name of non alphanumeric characters, spaces,
351
# dashes, and underscores.
352
security_group_name = "aa #^% -=99"
354
self.assertRaises(EC2ResponseError, self.ec2.create_security_group,
355
security_group_name, 'test group')
357
def test_group_name_valid_length_security_group(self):
358
"""Test that we sanely handle invalid security group names.
359
API Spec states that the length should not exceed 255 chars """
363
# Test block group_name > 255 chars
364
security_group_name = "".join(random.choice("poiuytrewqasdfghjklmnbvc")
365
for x in range(random.randint(256, 266)))
367
self.assertRaises(EC2ResponseError, self.ec2.create_security_group,
368
security_group_name, 'test group')
370
370
def test_authorize_revoke_security_group_cidr(self):
375
375
self.expect_http()
376
376
self.mox.ReplayAll()
377
user = self.manager.create_user('fake', 'fake', 'fake')
378
project = self.manager.create_project('fake', 'fake', 'fake')
380
# At the moment, you need both of these to actually be netadmin
381
self.manager.add_role('fake', 'netadmin')
382
project.add_role('fake', 'netadmin')
384
378
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
385
379
for x in range(random.randint(4, 8)))
426
420
self.assertEqual(len(rv), 1)
427
421
self.assertEqual(rv[0].name, 'default')
429
self.manager.delete_project(project)
430
self.manager.delete_user(user)
434
425
def test_authorize_revoke_security_group_cidr_v6(self):
439
430
self.expect_http()
440
431
self.mox.ReplayAll()
441
user = self.manager.create_user('fake', 'fake', 'fake')
442
project = self.manager.create_project('fake', 'fake', 'fake')
444
# At the moment, you need both of these to actually be netadmin
445
self.manager.add_role('fake', 'netadmin')
446
project.add_role('fake', 'netadmin')
448
433
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
449
434
for x in range(random.randint(4, 8)))
489
474
self.assertEqual(len(rv), 1)
490
475
self.assertEqual(rv[0].name, 'default')
492
self.manager.delete_project(project)
493
self.manager.delete_user(user)
497
479
def test_authorize_revoke_security_group_foreign_group(self):
502
484
self.expect_http()
503
485
self.mox.ReplayAll()
504
user = self.manager.create_user('fake', 'fake', 'fake', admin=True)
505
project = self.manager.create_project('fake', 'fake', 'fake')
507
# At the moment, you need both of these to actually be netadmin
508
self.manager.add_role('fake', 'netadmin')
509
project.add_role('fake', 'netadmin')
511
487
rand_string = 'sdiuisudfsdcnpaqwertasd'
512
488
security_group_name = "".join(random.choice(rand_string)