~ttx/nova/d4-merge

« back to all changes in this revision

Viewing changes to nova/tests/test_api.py

  • Committer: Thierry Carrez
  • Date: 2011-08-23 12:23:07 UTC
  • mfrom: (1130.75.258 nova)
  • Revision ID: thierry@openstack.org-20110823122307-f0vtuyg1ikc14n87
Merge diablo-4 development from trunk (rev1479)

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
import StringIO
28
28
import webob
29
29
 
 
30
from nova import block_device
30
31
from nova import context
31
32
from nova import exception
32
33
from nova import test
 
34
from nova import wsgi
 
35
from nova.api import auth
33
36
from nova.api import ec2
34
37
from nova.api.ec2 import apirequest
35
38
from nova.api.ec2 import cloud
36
39
from nova.api.ec2 import ec2utils
37
 
from nova.auth import manager
38
40
 
39
41
 
40
42
class FakeHttplibSocket(object):
147
149
        properties0 = {'mappings': mappings}
148
150
        properties1 = {'root_device_name': '/dev/sdb', 'mappings': mappings}
149
151
 
150
 
        root_device_name = ec2utils.properties_root_device_name(properties0)
 
152
        root_device_name = block_device.properties_root_device_name(
 
153
            properties0)
151
154
        self.assertEqual(root_device_name, '/dev/sda1')
152
155
 
153
 
        root_device_name = ec2utils.properties_root_device_name(properties1)
 
156
        root_device_name = block_device.properties_root_device_name(
 
157
            properties1)
154
158
        self.assertEqual(root_device_name, '/dev/sdb')
155
159
 
156
160
    def test_mapping_prepend_dev(self):
184
188
             'device': '/dev/sdc1'},
185
189
            {'virtual': 'ephemeral1',
186
190
             'device': '/dev/sdc1'}]
187
 
        self.assertDictListMatch(ec2utils.mappings_prepend_dev(mappings),
 
191
        self.assertDictListMatch(block_device.mappings_prepend_dev(mappings),
188
192
                                 expected_result)
189
193
 
190
194
 
192
196
    """Unit test for the cloud controller on an EC2 API"""
193
197
    def setUp(self):
194
198
        super(ApiEc2TestCase, self).setUp()
195
 
        self.manager = manager.AuthManager()
196
199
        self.host = '127.0.0.1'
197
 
        self.app = ec2.Authenticate(ec2.Requestify(ec2.Executor(),
198
 
                       'nova.api.ec2.cloud.CloudController'))
 
200
        # NOTE(vish): skipping the Authorizer
 
201
        roles = ['sysadmin', 'netadmin']
 
202
        ctxt = context.RequestContext('fake', 'fake', roles=roles)
 
203
        self.app = auth.InjectContext(ctxt,
 
204
                ec2.Requestify(ec2.Authorizer(ec2.Executor()),
 
205
                               'nova.api.ec2.cloud.CloudController'))
199
206
 
200
207
    def expect_http(self, host=None, is_secure=False, api_version=None):
201
208
        """Returns a new EC2 connection"""
246
253
        self.expect_http(api_version='2010-10-30')
247
254
        self.mox.ReplayAll()
248
255
 
249
 
        user = self.manager.create_user('fake', 'fake', 'fake')
250
 
        project = self.manager.create_project('fake', 'fake', 'fake')
251
 
 
252
256
        # Any request should be fine
253
257
        self.ec2.get_all_instances()
254
258
        self.assertTrue(self.ec2.APIVersion in self.http.getresponsebody(),
255
259
                       'The version in the xmlns of the response does '
256
260
                       'not match the API version given in the request.')
257
261
 
258
 
        self.manager.delete_project(project)
259
 
        self.manager.delete_user(user)
260
 
 
261
262
    def test_describe_instances(self):
262
263
        """Test that, after creating a user and a project, the describe
263
264
        instances call to the API works properly"""
264
265
        self.expect_http()
265
266
        self.mox.ReplayAll()
266
 
        user = self.manager.create_user('fake', 'fake', 'fake')
267
 
        project = self.manager.create_project('fake', 'fake', 'fake')
268
267
        self.assertEqual(self.ec2.get_all_instances(), [])
269
 
        self.manager.delete_project(project)
270
 
        self.manager.delete_user(user)
271
268
 
272
269
    def test_terminate_invalid_instance(self):
273
270
        """Attempt to terminate an invalid instance"""
274
271
        self.expect_http()
275
272
        self.mox.ReplayAll()
276
 
        user = self.manager.create_user('fake', 'fake', 'fake')
277
 
        project = self.manager.create_project('fake', 'fake', 'fake')
278
273
        self.assertRaises(EC2ResponseError, self.ec2.terminate_instances,
279
274
                            "i-00000005")
280
 
        self.manager.delete_project(project)
281
 
        self.manager.delete_user(user)
282
275
 
283
276
    def test_get_all_key_pairs(self):
284
277
        """Test that, after creating a user and project and generating
287
280
        self.mox.ReplayAll()
288
281
        keyname = "".join(random.choice("sdiuisudfsdcnpaqwertasd") \
289
282
                          for x in range(random.randint(4, 8)))
290
 
        user = self.manager.create_user('fake', 'fake', 'fake')
291
 
        project = self.manager.create_project('fake', 'fake', 'fake')
292
283
        # NOTE(vish): create depends on pool, so call helper directly
293
 
        cloud._gen_key(context.get_admin_context(), user.id, keyname)
 
284
        cloud._gen_key(context.get_admin_context(), 'fake', keyname)
294
285
 
295
286
        rv = self.ec2.get_all_key_pairs()
296
287
        results = [k for k in rv if k.name == keyname]
297
288
        self.assertEquals(len(results), 1)
298
 
        self.manager.delete_project(project)
299
 
        self.manager.delete_user(user)
300
289
 
301
290
    def test_create_duplicate_key_pair(self):
302
291
        """Test that, after successfully generating a keypair,
305
294
        self.mox.ReplayAll()
306
295
        keyname = "".join(random.choice("sdiuisudfsdcnpaqwertasd") \
307
296
                          for x in range(random.randint(4, 8)))
308
 
        user = self.manager.create_user('fake', 'fake', 'fake')
309
 
        project = self.manager.create_project('fake', 'fake', 'fake')
310
297
        # NOTE(vish): create depends on pool, so call helper directly
311
298
        self.ec2.create_key_pair('test')
312
299
 
325
312
        """Test that we can retrieve security groups"""
326
313
        self.expect_http()
327
314
        self.mox.ReplayAll()
328
 
        user = self.manager.create_user('fake', 'fake', 'fake', admin=True)
329
 
        project = self.manager.create_project('fake', 'fake', 'fake')
330
315
 
331
316
        rv = self.ec2.get_all_security_groups()
332
317
 
333
318
        self.assertEquals(len(rv), 1)
334
319
        self.assertEquals(rv[0].name, 'default')
335
320
 
336
 
        self.manager.delete_project(project)
337
 
        self.manager.delete_user(user)
338
 
 
339
321
    def test_create_delete_security_group(self):
340
322
        """Test that we can create a security group"""
341
323
        self.expect_http()
342
324
        self.mox.ReplayAll()
343
 
        user = self.manager.create_user('fake', 'fake', 'fake', admin=True)
344
 
        project = self.manager.create_project('fake', 'fake', 'fake')
345
 
 
346
 
        # At the moment, you need both of these to actually be netadmin
347
 
        self.manager.add_role('fake', 'netadmin')
348
 
        project.add_role('fake', 'netadmin')
349
325
 
350
326
        security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
351
327
                                      for x in range(random.randint(4, 8)))
364
340
 
365
341
        self.ec2.delete_security_group(security_group_name)
366
342
 
367
 
        self.manager.delete_project(project)
368
 
        self.manager.delete_user(user)
 
343
    def test_group_name_valid_chars_security_group(self):
 
344
        """ Test that we sanely handle invalid security group names.
 
345
         API Spec states we should only accept alphanumeric characters,
 
346
         spaces, dashes, and underscores. """
 
347
        self.expect_http()
 
348
        self.mox.ReplayAll()
 
349
 
 
350
        # Test block group_name of non alphanumeric characters, spaces,
 
351
        # dashes, and underscores.
 
352
        security_group_name = "aa #^% -=99"
 
353
 
 
354
        self.assertRaises(EC2ResponseError, self.ec2.create_security_group,
 
355
                          security_group_name, 'test group')
 
356
 
 
357
    def test_group_name_valid_length_security_group(self):
 
358
        """Test that we sanely handle invalid security group names.
 
359
         API Spec states that the length should not exceed 255 chars """
 
360
        self.expect_http()
 
361
        self.mox.ReplayAll()
 
362
 
 
363
        # Test block group_name > 255 chars
 
364
        security_group_name = "".join(random.choice("poiuytrewqasdfghjklmnbvc")
 
365
                                      for x in range(random.randint(256, 266)))
 
366
 
 
367
        self.assertRaises(EC2ResponseError, self.ec2.create_security_group,
 
368
                          security_group_name, 'test group')
369
369
 
370
370
    def test_authorize_revoke_security_group_cidr(self):
371
371
        """
374
374
        """
375
375
        self.expect_http()
376
376
        self.mox.ReplayAll()
377
 
        user = self.manager.create_user('fake', 'fake', 'fake')
378
 
        project = self.manager.create_project('fake', 'fake', 'fake')
379
 
 
380
 
        # At the moment, you need both of these to actually be netadmin
381
 
        self.manager.add_role('fake', 'netadmin')
382
 
        project.add_role('fake', 'netadmin')
383
377
 
384
378
        security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
385
379
                                      for x in range(random.randint(4, 8)))
426
420
        self.assertEqual(len(rv), 1)
427
421
        self.assertEqual(rv[0].name, 'default')
428
422
 
429
 
        self.manager.delete_project(project)
430
 
        self.manager.delete_user(user)
431
 
 
432
423
        return
433
424
 
434
425
    def test_authorize_revoke_security_group_cidr_v6(self):
438
429
        """
439
430
        self.expect_http()
440
431
        self.mox.ReplayAll()
441
 
        user = self.manager.create_user('fake', 'fake', 'fake')
442
 
        project = self.manager.create_project('fake', 'fake', 'fake')
443
 
 
444
 
        # At the moment, you need both of these to actually be netadmin
445
 
        self.manager.add_role('fake', 'netadmin')
446
 
        project.add_role('fake', 'netadmin')
447
432
 
448
433
        security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
449
434
                                      for x in range(random.randint(4, 8)))
489
474
        self.assertEqual(len(rv), 1)
490
475
        self.assertEqual(rv[0].name, 'default')
491
476
 
492
 
        self.manager.delete_project(project)
493
 
        self.manager.delete_user(user)
494
 
 
495
477
        return
496
478
 
497
479
    def test_authorize_revoke_security_group_foreign_group(self):
501
483
        """
502
484
        self.expect_http()
503
485
        self.mox.ReplayAll()
504
 
        user = self.manager.create_user('fake', 'fake', 'fake', admin=True)
505
 
        project = self.manager.create_project('fake', 'fake', 'fake')
506
 
 
507
 
        # At the moment, you need both of these to actually be netadmin
508
 
        self.manager.add_role('fake', 'netadmin')
509
 
        project.add_role('fake', 'netadmin')
510
486
 
511
487
        rand_string = 'sdiuisudfsdcnpaqwertasd'
512
488
        security_group_name = "".join(random.choice(rand_string)
560
536
        self.mox.ReplayAll()
561
537
 
562
538
        self.ec2.delete_security_group(security_group_name)
563
 
 
564
 
        self.manager.delete_project(project)
565
 
        self.manager.delete_user(user)
566
 
 
567
 
        return