1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2011 OpenStack LLC
4
# Copyright 2012 Justin Santa Barbara
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may
7
# not use this file except in compliance with the License. You may obtain
8
# a copy of the License at
10
# http://www.apache.org/licenses/LICENSE-2.0
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15
# License for the specific language governing permissions and limitations
20
from lxml import etree
24
from nova.api.openstack.compute.contrib import security_groups
25
from nova.api.openstack import wsgi
27
from nova import exception
29
from nova.tests.api.openstack import fakes
32
FAKE_UUID = 'a47ae74e-ab08-447f-8eee-ffd43fc46c16'
36
def __getattr__(self, k):
40
def security_group_template(**kwargs):
42
sg.setdefault('tenant_id', '123')
43
sg.setdefault('name', 'test')
44
sg.setdefault('description', 'test-description')
48
def security_group_db(security_group, id=None):
49
attrs = security_group.copy()
50
if 'tenant_id' in attrs:
51
attrs['project_id'] = attrs.pop('tenant_id')
54
attrs.setdefault('rules', [])
55
attrs.setdefault('instances', [])
56
return AttrDict(attrs)
59
def security_group_rule_template(**kwargs):
61
rule.setdefault('ip_protocol', 'tcp')
62
rule.setdefault('from_port', 22)
63
rule.setdefault('to_port', 22)
64
rule.setdefault('parent_group_id', 2)
68
def security_group_rule_db(rule, id=None):
70
if 'ip_protocol' in attrs:
71
attrs['protocol'] = attrs.pop('ip_protocol')
72
return AttrDict(attrs)
75
def return_server(context, server_id):
76
return {'id': int(server_id),
83
def return_server_by_uuid(context, server_uuid):
91
def return_non_running_server(context, server_id):
92
return {'id': server_id, 'power_state': 0x02, 'uuid': FAKE_UUID,
93
'host': "localhost", 'name': 'asdf'}
96
def return_security_group_by_name(context, project_id, group_name):
97
return {'id': 1, 'name': group_name,
98
"instances": [{'id': 1, 'uuid': FAKE_UUID}]}
101
def return_security_group_without_instances(context, project_id, group_name):
102
return {'id': 1, 'name': group_name}
105
def return_server_nonexistent(context, server_id):
106
raise exception.InstanceNotFound(instance_id=server_id)
109
class TestSecurityGroups(test.TestCase):
111
super(TestSecurityGroups, self).setUp()
113
self.controller = security_groups.SecurityGroupController()
114
self.server_controller = (
115
security_groups.ServerSecurityGroupController())
116
self.manager = security_groups.SecurityGroupActionController()
118
def test_create_security_group(self):
119
sg = security_group_template()
121
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
122
res_dict = self.controller.create(req, {'security_group': sg})
123
self.assertEqual(res_dict['security_group']['name'], 'test')
124
self.assertEqual(res_dict['security_group']['description'],
127
def test_create_security_group_with_no_name(self):
128
sg = security_group_template()
131
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
132
self.assertRaises(webob.exc.HTTPUnprocessableEntity,
133
self.controller.create, req, sg)
135
def test_create_security_group_with_no_description(self):
136
sg = security_group_template()
137
del sg['description']
139
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
140
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
141
req, {'security_group': sg})
143
def test_create_security_group_with_blank_name(self):
144
sg = security_group_template(name='')
146
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
147
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
148
req, {'security_group': sg})
150
def test_create_security_group_with_whitespace_name(self):
151
sg = security_group_template(name=' ')
153
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
154
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
155
req, {'security_group': sg})
157
def test_create_security_group_with_blank_description(self):
158
sg = security_group_template(description='')
160
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
161
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
162
req, {'security_group': sg})
164
def test_create_security_group_with_whitespace_description(self):
165
sg = security_group_template(description=' ')
167
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
168
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
169
req, {'security_group': sg})
171
def test_create_security_group_with_duplicate_name(self):
172
sg = security_group_template()
174
# FIXME: Stub out _get instead of creating twice
175
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
176
self.controller.create(req, {'security_group': sg})
178
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
179
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
180
req, {'security_group': sg})
182
def test_create_security_group_with_no_body(self):
183
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
184
self.assertRaises(webob.exc.HTTPUnprocessableEntity,
185
self.controller.create, req, None)
187
def test_create_security_group_with_no_security_group(self):
188
body = {'no-securityGroup': None}
190
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
191
self.assertRaises(webob.exc.HTTPUnprocessableEntity,
192
self.controller.create, req, body)
194
def test_create_security_group_above_255_characters_name(self):
195
sg = security_group_template(name='1234567890' * 26)
197
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
198
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
199
req, {'security_group': sg})
201
def test_create_security_group_above_255_characters_description(self):
202
sg = security_group_template(description='1234567890' * 26)
204
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
205
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
206
req, {'security_group': sg})
208
def test_create_security_group_non_string_name(self):
209
sg = security_group_template(name=12)
211
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
212
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
213
req, {'security_group': sg})
215
def test_create_security_group_non_string_description(self):
216
sg = security_group_template(description=12)
218
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
219
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
220
req, {'security_group': sg})
222
def test_get_security_group_list(self):
224
for i, name in enumerate(['default', 'test']):
225
sg = security_group_template(id=i + 1,
227
description=name + '-desc',
230
expected = {'security_groups': groups}
232
def return_security_groups(context, project_id):
233
return [security_group_db(sg) for sg in groups]
235
self.stubs.Set(nova.db, 'security_group_get_by_project',
236
return_security_groups)
238
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
239
res_dict = self.controller.index(req)
241
self.assertEquals(res_dict, expected)
243
def test_get_security_group_by_instance(self):
245
for i, name in enumerate(['default', 'test']):
246
sg = security_group_template(id=i + 1,
248
description=name + '-desc',
251
expected = {'security_groups': groups}
253
def return_instance(context, server_id):
254
self.assertEquals(server_id, FAKE_UUID)
255
return return_server_by_uuid(context, server_id)
257
self.stubs.Set(nova.db, 'instance_get_by_uuid',
260
def return_security_groups(context, instance_id):
261
self.assertEquals(instance_id, 1)
262
return [security_group_db(sg) for sg in groups]
264
self.stubs.Set(nova.db, 'security_group_get_by_instance',
265
return_security_groups)
267
req = fakes.HTTPRequest.blank('/v2/%s/servers/%s/os-security-groups' %
269
res_dict = self.server_controller.index(req, FAKE_UUID)
271
self.assertEquals(res_dict, expected)
273
def test_get_security_group_by_id(self):
274
sg = security_group_template(id=2, rules=[])
276
def return_security_group(context, group_id):
277
self.assertEquals(sg['id'], group_id)
278
return security_group_db(sg)
280
self.stubs.Set(nova.db, 'security_group_get',
281
return_security_group)
283
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/2')
284
res_dict = self.controller.show(req, '2')
286
expected = {'security_group': sg}
287
self.assertEquals(res_dict, expected)
289
def test_get_security_group_by_invalid_id(self):
290
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/invalid')
291
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,
294
def test_get_security_group_by_non_existing_id(self):
295
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/111111111')
296
self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete,
299
def test_delete_security_group_by_id(self):
300
sg = security_group_template(id=1, rules=[])
304
def security_group_destroy(context, id):
307
def return_security_group(context, group_id):
308
self.assertEquals(sg['id'], group_id)
309
return security_group_db(sg)
311
self.stubs.Set(nova.db, 'security_group_destroy',
312
security_group_destroy)
313
self.stubs.Set(nova.db, 'security_group_get',
314
return_security_group)
316
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/1')
317
self.controller.delete(req, '1')
319
self.assertTrue(self.called)
321
def test_delete_security_group_by_invalid_id(self):
322
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/invalid')
323
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,
326
def test_delete_security_group_by_non_existing_id(self):
327
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/11111111')
328
self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete,
331
def test_delete_security_group_in_use(self):
332
sg = security_group_template(id=1, rules=[])
334
def security_group_in_use(context, id):
337
def return_security_group(context, group_id):
338
self.assertEquals(sg['id'], group_id)
339
return security_group_db(sg)
341
self.stubs.Set(nova.db, 'security_group_in_use',
342
security_group_in_use)
343
self.stubs.Set(nova.db, 'security_group_get',
344
return_security_group)
346
req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/1')
347
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,
350
def test_associate_by_non_existing_security_group_name(self):
351
body = dict(addSecurityGroup=dict(name='non-existing'))
353
req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
354
self.assertRaises(webob.exc.HTTPNotFound,
355
self.manager._addSecurityGroup, req, '1', body)
357
def test_associate_by_invalid_server_id(self):
358
body = dict(addSecurityGroup=dict(name='test'))
360
req = fakes.HTTPRequest.blank('/v2/fake/servers/invalid/action')
361
self.assertRaises(webob.exc.HTTPNotFound,
362
self.manager._addSecurityGroup, req, 'invalid', body)
364
def test_associate_without_body(self):
365
self.stubs.Set(nova.db, 'instance_get', return_server)
366
body = dict(addSecurityGroup=None)
368
req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
369
self.assertRaises(webob.exc.HTTPBadRequest,
370
self.manager._addSecurityGroup, req, '1', body)
372
def test_associate_no_security_group_name(self):
373
self.stubs.Set(nova.db, 'instance_get', return_server)
374
body = dict(addSecurityGroup=dict())
376
req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
377
self.assertRaises(webob.exc.HTTPBadRequest,
378
self.manager._addSecurityGroup, req, '1', body)
380
def test_associate_security_group_name_with_whitespaces(self):
381
self.stubs.Set(nova.db, 'instance_get', return_server)
382
body = dict(addSecurityGroup=dict(name=" "))
384
req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
385
self.assertRaises(webob.exc.HTTPBadRequest,
386
self.manager._addSecurityGroup, req, '1', body)
388
def test_associate_non_existing_instance(self):
389
self.stubs.Set(nova.db, 'instance_get', return_server_nonexistent)
390
self.stubs.Set(nova.db, 'instance_get_by_uuid',
391
return_server_nonexistent)
392
body = dict(addSecurityGroup=dict(name="test"))
394
req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
395
self.assertRaises(webob.exc.HTTPNotFound,
396
self.manager._addSecurityGroup, req, '1', body)
398
def test_associate_non_running_instance(self):
399
self.stubs.Set(nova.db, 'instance_get', return_non_running_server)
400
self.stubs.Set(nova.db, 'instance_get_by_uuid',
401
return_non_running_server)
402
self.stubs.Set(nova.db, 'security_group_get_by_name',
403
return_security_group_without_instances)
404
body = dict(addSecurityGroup=dict(name="test"))
406
req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
407
self.assertRaises(webob.exc.HTTPBadRequest,
408
self.manager._addSecurityGroup, req, '1', body)
410
def test_associate_already_associated_security_group_to_instance(self):
411
self.stubs.Set(nova.db, 'instance_get', return_server)
412
self.stubs.Set(nova.db, 'instance_get_by_uuid',
413
return_server_by_uuid)
414
self.stubs.Set(nova.db, 'security_group_get_by_name',
415
return_security_group_by_name)
416
body = dict(addSecurityGroup=dict(name="test"))
418
req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
419
self.assertRaises(webob.exc.HTTPBadRequest,
420
self.manager._addSecurityGroup, req, '1', body)
422
def test_associate(self):
423
self.stubs.Set(nova.db, 'instance_get', return_server)
424
self.stubs.Set(nova.db, 'instance_get_by_uuid',
425
return_server_by_uuid)
426
self.mox.StubOutWithMock(nova.db, 'instance_add_security_group')
427
nova.db.instance_add_security_group(mox.IgnoreArg(),
430
self.stubs.Set(nova.db, 'security_group_get_by_name',
431
return_security_group_without_instances)
434
body = dict(addSecurityGroup=dict(name="test"))
436
req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
437
self.manager._addSecurityGroup(req, '1', body)
439
def test_disassociate_by_non_existing_security_group_name(self):
440
body = dict(removeSecurityGroup=dict(name='non-existing'))
442
req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
443
self.assertRaises(webob.exc.HTTPNotFound,
444
self.manager._removeSecurityGroup, req, '1', body)
446
def test_disassociate_by_invalid_server_id(self):
447
self.stubs.Set(nova.db, 'security_group_get_by_name',
448
return_security_group_by_name)
449
body = dict(removeSecurityGroup=dict(name='test'))
451
req = fakes.HTTPRequest.blank('/v2/fake/servers/invalid/action')
452
self.assertRaises(webob.exc.HTTPNotFound,
453
self.manager._removeSecurityGroup, req, 'invalid',
456
def test_disassociate_without_body(self):
457
self.stubs.Set(nova.db, 'instance_get', return_server)
458
body = dict(removeSecurityGroup=None)
460
req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
461
self.assertRaises(webob.exc.HTTPBadRequest,
462
self.manager._removeSecurityGroup, req, '1', body)
464
def test_disassociate_no_security_group_name(self):
465
self.stubs.Set(nova.db, 'instance_get', return_server)
466
body = dict(removeSecurityGroup=dict())
468
req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
469
self.assertRaises(webob.exc.HTTPBadRequest,
470
self.manager._removeSecurityGroup, req, '1', body)
472
def test_disassociate_security_group_name_with_whitespaces(self):
473
self.stubs.Set(nova.db, 'instance_get', return_server)
474
body = dict(removeSecurityGroup=dict(name=" "))
476
req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
477
self.assertRaises(webob.exc.HTTPBadRequest,
478
self.manager._removeSecurityGroup, req, '1', body)
480
def test_disassociate_non_existing_instance(self):
481
self.stubs.Set(nova.db, 'instance_get', return_server_nonexistent)
482
self.stubs.Set(nova.db, 'security_group_get_by_name',
483
return_security_group_by_name)
484
body = dict(removeSecurityGroup=dict(name="test"))
486
req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
487
self.assertRaises(webob.exc.HTTPNotFound,
488
self.manager._removeSecurityGroup, req, '1', body)
490
def test_disassociate_non_running_instance(self):
491
self.stubs.Set(nova.db, 'instance_get', return_non_running_server)
492
self.stubs.Set(nova.db, 'instance_get_by_uuid',
493
return_non_running_server)
494
self.stubs.Set(nova.db, 'security_group_get_by_name',
495
return_security_group_by_name)
496
body = dict(removeSecurityGroup=dict(name="test"))
498
req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
499
self.assertRaises(webob.exc.HTTPBadRequest,
500
self.manager._removeSecurityGroup, req, '1', body)
502
def test_disassociate_already_associated_security_group_to_instance(self):
503
self.stubs.Set(nova.db, 'instance_get', return_server)
504
self.stubs.Set(nova.db, 'instance_get_by_uuid',
505
return_server_by_uuid)
506
self.stubs.Set(nova.db, 'security_group_get_by_name',
507
return_security_group_without_instances)
508
body = dict(removeSecurityGroup=dict(name="test"))
510
req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
511
self.assertRaises(webob.exc.HTTPBadRequest,
512
self.manager._removeSecurityGroup, req, '1', body)
514
def test_disassociate(self):
515
self.stubs.Set(nova.db, 'instance_get', return_server)
516
self.stubs.Set(nova.db, 'instance_get_by_uuid',
517
return_server_by_uuid)
518
self.mox.StubOutWithMock(nova.db, 'instance_remove_security_group')
519
nova.db.instance_remove_security_group(mox.IgnoreArg(),
522
self.stubs.Set(nova.db, 'security_group_get_by_name',
523
return_security_group_by_name)
526
body = dict(removeSecurityGroup=dict(name="test"))
528
req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
529
self.manager._removeSecurityGroup(req, '1', body)
532
class TestSecurityGroupRules(test.TestCase):
534
super(TestSecurityGroupRules, self).setUp()
536
sg1 = security_group_template(id=1)
537
sg2 = security_group_template(id=2,
538
name='authorize_revoke',
539
description='authorize-revoke testing')
540
db1 = security_group_db(sg1)
541
db2 = security_group_db(sg2)
543
def return_security_group(context, group_id):
544
if group_id == db1['id']:
546
if group_id == db2['id']:
548
raise exception.NotFound()
550
self.stubs.Set(nova.db, 'security_group_get',
551
return_security_group)
553
self.parent_security_group = db2
555
self.controller = security_groups.SecurityGroupRulesController()
557
def test_create_by_cidr(self):
558
rule = security_group_rule_template(cidr='10.2.3.124/24')
560
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
561
res_dict = self.controller.create(req, {'security_group_rule': rule})
563
security_group_rule = res_dict['security_group_rule']
564
self.assertNotEquals(security_group_rule['id'], 0)
565
self.assertEquals(security_group_rule['parent_group_id'], 2)
566
self.assertEquals(security_group_rule['ip_range']['cidr'],
569
def test_create_by_group_id(self):
570
rule = security_group_rule_template(group_id=1)
572
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
573
res_dict = self.controller.create(req, {'security_group_rule': rule})
575
security_group_rule = res_dict['security_group_rule']
576
self.assertNotEquals(security_group_rule['id'], 0)
577
self.assertEquals(security_group_rule['parent_group_id'], 2)
579
def test_create_by_same_group_id(self):
580
rule1 = security_group_rule_template(group_id=1, from_port=80,
582
self.parent_security_group['rules'] = [security_group_rule_db(rule1)]
584
rule2 = security_group_rule_template(group_id=1, from_port=81,
587
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
588
res_dict = self.controller.create(req, {'security_group_rule': rule2})
590
security_group_rule = res_dict['security_group_rule']
591
self.assertNotEquals(security_group_rule['id'], 0)
592
self.assertEquals(security_group_rule['parent_group_id'], 2)
593
self.assertEquals(security_group_rule['from_port'], 81)
594
self.assertEquals(security_group_rule['to_port'], 81)
596
def test_create_by_invalid_cidr_json(self):
598
"security_group_rule": {
599
"ip_protocol": "tcp",
602
"parent_group_id": 2,
603
"cidr": "10.2.3.124/2433"}}
604
rule = security_group_rule_template(
609
cidr="10.2.3.124/2433")
610
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
611
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
612
req, {'security_group_rule': rule})
614
def test_create_by_invalid_tcp_port_json(self):
615
rule = security_group_rule_template(
620
cidr="10.2.3.124/24")
622
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
623
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
624
req, {'security_group_rule': rule})
626
def test_create_by_invalid_icmp_port_json(self):
627
rule = security_group_rule_template(
632
cidr="10.2.3.124/24")
633
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
634
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
635
req, {'security_group_rule': rule})
637
def test_create_add_existing_rules_by_cidr(self):
638
rule = security_group_rule_template(cidr='10.0.0.0/24')
640
self.parent_security_group['rules'] = [security_group_rule_db(rule)]
642
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
643
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
644
req, {'security_group_rule': rule})
646
def test_create_add_existing_rules_by_group_id(self):
647
rule = security_group_rule_template(group_id=1)
649
self.parent_security_group['rules'] = [security_group_rule_db(rule)]
651
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
652
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
653
req, {'security_group_rule': rule})
655
def test_create_with_no_body(self):
656
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
657
self.assertRaises(webob.exc.HTTPUnprocessableEntity,
658
self.controller.create, req, None)
660
def test_create_with_no_security_group_rule_in_body(self):
661
rules = {'test': 'test'}
662
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
663
self.assertRaises(webob.exc.HTTPUnprocessableEntity,
664
self.controller.create, req, rules)
666
def test_create_with_invalid_parent_group_id(self):
667
rule = security_group_rule_template(parent_group_id='invalid')
669
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
670
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
671
req, {'security_group_rule': rule})
673
def test_create_with_non_existing_parent_group_id(self):
674
rule = security_group_rule_template(group_id='invalid',
675
parent_group_id='1111111111111')
677
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
678
self.assertRaises(webob.exc.HTTPNotFound, self.controller.create,
679
req, {'security_group_rule': rule})
681
def test_create_with_invalid_protocol(self):
682
rule = security_group_rule_template(ip_protocol='invalid-protocol',
685
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
686
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
687
req, {'security_group_rule': rule})
689
def test_create_with_no_protocol(self):
690
rule = security_group_rule_template(cidr='10.2.2.0/24')
691
del rule['ip_protocol']
693
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
694
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
695
req, {'security_group_rule': rule})
697
def test_create_with_invalid_from_port(self):
698
rule = security_group_rule_template(from_port='666666',
701
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
702
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
703
req, {'security_group_rule': rule})
705
def test_create_with_invalid_to_port(self):
706
rule = security_group_rule_template(to_port='666666',
709
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
710
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
711
req, {'security_group_rule': rule})
713
def test_create_with_non_numerical_from_port(self):
714
rule = security_group_rule_template(from_port='invalid',
717
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
718
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
719
req, {'security_group_rule': rule})
721
def test_create_with_non_numerical_to_port(self):
722
rule = security_group_rule_template(to_port='invalid',
725
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
726
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
727
req, {'security_group_rule': rule})
729
def test_create_with_no_from_port(self):
730
rule = security_group_rule_template(cidr='10.2.2.0/24')
731
del rule['from_port']
733
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
734
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
735
req, {'security_group_rule': rule})
737
def test_create_with_no_to_port(self):
738
rule = security_group_rule_template(cidr='10.2.2.0/24')
741
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
742
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
743
req, {'security_group_rule': rule})
745
def test_create_with_invalid_cidr(self):
746
rule = security_group_rule_template(cidr='10.2.2222.0/24')
748
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
749
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
750
req, {'security_group_rule': rule})
752
def test_create_with_no_cidr_group(self):
753
rule = security_group_rule_template()
755
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
756
res_dict = self.controller.create(req, {'security_group_rule': rule})
758
security_group_rule = res_dict['security_group_rule']
759
self.assertNotEquals(security_group_rule['id'], 0)
760
self.assertEquals(security_group_rule['parent_group_id'],
761
self.parent_security_group['id'])
762
self.assertEquals(security_group_rule['ip_range']['cidr'],
765
def test_create_with_invalid_group_id(self):
766
rule = security_group_rule_template(group_id='invalid')
768
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
769
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
770
req, {'security_group_rule': rule})
772
def test_create_with_empty_group_id(self):
773
rule = security_group_rule_template(group_id='')
775
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
776
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
777
req, {'security_group_rule': rule})
779
def test_create_with_nonexist_group_id(self):
780
rule = security_group_rule_template(group_id='222222')
782
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
783
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
784
req, {'security_group_rule': rule})
786
def test_create_with_same_group_parent_id_and_group_id(self):
787
rule = security_group_rule_template(group_id=1, parent_group_id=1)
789
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
790
res_dict = self.controller.create(req, {'security_group_rule': rule})
791
security_group_rule = res_dict['security_group_rule']
792
self.assertNotEquals(security_group_rule['id'], 0)
793
self.assertEquals(security_group_rule['parent_group_id'], 1)
794
self.assertEquals(security_group_rule['id'], 1)
796
def _test_create_with_no_ports_and_no_group(self, proto):
797
rule = {'ip_protocol': proto, 'parent_group_id': '2'}
799
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
800
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
801
req, {'security_group_rule': rule})
803
def _test_create_with_no_ports(self, proto):
804
rule = {'ip_protocol': proto, 'parent_group_id': '2', 'group_id': '1'}
806
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
807
res_dict = self.controller.create(req, {'security_group_rule': rule})
809
security_group_rule = res_dict['security_group_rule']
811
'from_port': 1, 'group': {'tenant_id': '123', 'name': 'test'},
812
'ip_protocol': proto, 'to_port': 65535, 'parent_group_id': 2,
813
'ip_range': {}, 'id': 1
816
expected_rule['to_port'] = -1
817
expected_rule['from_port'] = -1
818
self.assertTrue(security_group_rule == expected_rule)
820
def test_create_with_no_ports_icmp(self):
821
self._test_create_with_no_ports_and_no_group('icmp')
822
self._test_create_with_no_ports('icmp')
824
def test_create_with_no_ports_tcp(self):
825
self._test_create_with_no_ports_and_no_group('tcp')
826
self._test_create_with_no_ports('tcp')
828
def test_create_with_no_ports_udp(self):
829
self._test_create_with_no_ports_and_no_group('udp')
830
self._test_create_with_no_ports('udp')
832
def _test_create_with_ports(self, id_val, proto, from_port, to_port):
834
'ip_protocol': proto, 'from_port': from_port, 'to_port': to_port,
835
'parent_group_id': '2', 'group_id': '1'
837
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
838
res_dict = self.controller.create(req, {'security_group_rule': rule})
840
security_group_rule = res_dict['security_group_rule']
842
'from_port': from_port,
843
'group': {'tenant_id': '123', 'name': 'test'},
844
'ip_protocol': proto, 'to_port': to_port, 'parent_group_id': 2,
845
'ip_range': {}, 'id': id_val
847
self.assertTrue(security_group_rule['ip_protocol'] == proto)
848
self.assertTrue(security_group_rule['id'] == id_val)
849
self.assertTrue(security_group_rule['from_port'] == from_port)
850
self.assertTrue(security_group_rule['to_port'] == to_port)
851
self.assertTrue(security_group_rule == expected_rule)
853
def test_create_with_ports_icmp(self):
854
self._test_create_with_ports(1, 'icmp', 0, 1)
855
self._test_create_with_ports(2, 'icmp', 0, 0)
856
self._test_create_with_ports(3, 'icmp', 1, 0)
858
def test_create_with_ports_tcp(self):
859
self._test_create_with_ports(1, 'tcp', 1, 1)
860
self._test_create_with_ports(2, 'tcp', 1, 65535)
861
self._test_create_with_ports(3, 'tcp', 65535, 65535)
863
def test_create_with_ports_udp(self):
864
self._test_create_with_ports(1, 'udp', 1, 1)
865
self._test_create_with_ports(2, 'udp', 1, 65535)
866
self._test_create_with_ports(3, 'udp', 65535, 65535)
868
def test_delete(self):
869
rule = security_group_rule_template(id=10)
871
def security_group_rule_get(context, id):
872
return security_group_rule_db(rule)
874
def security_group_rule_destroy(context, id):
877
self.stubs.Set(nova.db, 'security_group_rule_get',
878
security_group_rule_get)
879
self.stubs.Set(nova.db, 'security_group_rule_destroy',
880
security_group_rule_destroy)
882
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules/10')
883
self.controller.delete(req, '10')
885
def test_delete_invalid_rule_id(self):
886
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules' +
888
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,
891
def test_delete_non_existing_rule_id(self):
892
req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules' +
894
self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete,
895
req, '22222222222222')
898
class TestSecurityGroupRulesXMLDeserializer(unittest.TestCase):
901
self.deserializer = security_groups.SecurityGroupRulesXMLDeserializer()
903
def test_create_request(self):
905
<security_group_rule>
906
<parent_group_id>12</parent_group_id>
907
<from_port>22</from_port>
908
<to_port>22</to_port>
909
<group_id></group_id>
910
<ip_protocol>tcp</ip_protocol>
911
<cidr>10.0.0.0/24</cidr>
912
</security_group_rule>"""
913
request = self.deserializer.deserialize(serial_request)
915
"security_group_rule": {
916
"parent_group_id": "12",
919
"ip_protocol": "tcp",
921
"cidr": "10.0.0.0/24",
924
self.assertEquals(request['body'], expected)
926
def test_create_no_protocol_request(self):
928
<security_group_rule>
929
<parent_group_id>12</parent_group_id>
930
<from_port>22</from_port>
931
<to_port>22</to_port>
932
<group_id></group_id>
933
<cidr>10.0.0.0/24</cidr>
934
</security_group_rule>"""
935
request = self.deserializer.deserialize(serial_request)
937
"security_group_rule": {
938
"parent_group_id": "12",
942
"cidr": "10.0.0.0/24",
945
self.assertEquals(request['body'], expected)
948
class TestSecurityGroupXMLDeserializer(unittest.TestCase):
951
self.deserializer = security_groups.SecurityGroupXMLDeserializer()
953
def test_create_request(self):
955
<security_group name="test">
956
<description>test</description>
958
request = self.deserializer.deserialize(serial_request)
962
"description": "test",
965
self.assertEquals(request['body'], expected)
967
def test_create_no_description_request(self):
969
<security_group name="test">
971
request = self.deserializer.deserialize(serial_request)
977
self.assertEquals(request['body'], expected)
979
def test_create_no_name_request(self):
982
<description>test</description>
984
request = self.deserializer.deserialize(serial_request)
987
"description": "test",
990
self.assertEquals(request['body'], expected)
993
class TestSecurityGroupXMLSerializer(unittest.TestCase):
995
self.namespace = wsgi.XMLNS_V11
996
self.rule_serializer = security_groups.SecurityGroupRuleTemplate()
997
self.index_serializer = security_groups.SecurityGroupsTemplate()
998
self.default_serializer = security_groups.SecurityGroupTemplate()
1000
def _tag(self, elem):
1002
self.assertEqual(tagname[0], '{')
1003
tmp = tagname.partition('}')
1004
namespace = tmp[0][1:]
1005
self.assertEqual(namespace, self.namespace)
1008
def _verify_security_group_rule(self, raw_rule, tree):
1009
self.assertEqual(raw_rule['id'], tree.get('id'))
1010
self.assertEqual(raw_rule['parent_group_id'],
1011
tree.get('parent_group_id'))
1014
expected = set(['ip_protocol', 'from_port', 'to_port',
1015
'group', 'group/name', 'group/tenant_id',
1016
'ip_range', 'ip_range/cidr'])
1019
child_tag = self._tag(child)
1020
self.assertTrue(child_tag in raw_rule)
1022
if child_tag in ('group', 'ip_range'):
1023
for gr_child in child:
1024
gr_child_tag = self._tag(gr_child)
1025
self.assertTrue(gr_child_tag in raw_rule[child_tag])
1026
seen.add('%s/%s' % (child_tag, gr_child_tag))
1027
self.assertEqual(gr_child.text,
1028
raw_rule[child_tag][gr_child_tag])
1030
self.assertEqual(child.text, raw_rule[child_tag])
1031
self.assertEqual(seen, expected)
1033
def _verify_security_group(self, raw_group, tree):
1034
rules = raw_group['rules']
1035
self.assertEqual('security_group', self._tag(tree))
1036
self.assertEqual(raw_group['id'], tree.get('id'))
1037
self.assertEqual(raw_group['tenant_id'], tree.get('tenant_id'))
1038
self.assertEqual(raw_group['name'], tree.get('name'))
1039
self.assertEqual(2, len(tree))
1041
child_tag = self._tag(child)
1042
if child_tag == 'rules':
1043
self.assertEqual(2, len(child))
1044
for idx, gr_child in enumerate(child):
1045
self.assertEqual(self._tag(gr_child), 'rule')
1046
self._verify_security_group_rule(rules[idx], gr_child)
1048
self.assertEqual('description', child_tag)
1049
self.assertEqual(raw_group['description'], child.text)
1051
def test_rule_serializer(self):
1054
parent_group_id='456',
1058
group=dict(name='group', tenant_id='tenant'),
1059
ip_range=dict(cidr='10.0.0.0/8'))
1060
rule = dict(security_group_rule=raw_rule)
1061
text = self.rule_serializer.serialize(rule)
1064
tree = etree.fromstring(text)
1066
self.assertEqual('security_group_rule', self._tag(tree))
1067
self._verify_security_group_rule(raw_rule, tree)
1069
def test_group_serializer(self):
1072
parent_group_id='456',
1076
group=dict(name='group1', tenant_id='tenant1'),
1077
ip_range=dict(cidr='10.55.44.0/24')),
1080
parent_group_id='321',
1084
group=dict(name='group2', tenant_id='tenant2'),
1085
ip_range=dict(cidr='10.44.55.0/24'))]
1088
description='description',
1092
sg_group = dict(security_group=raw_group)
1093
text = self.default_serializer.serialize(sg_group)
1096
tree = etree.fromstring(text)
1098
self._verify_security_group(raw_group, tree)
1100
def test_groups_serializer(self):
1103
parent_group_id='1234',
1107
group=dict(name='group1', tenant_id='tenant1'),
1108
ip_range=dict(cidr='10.123.0.0/24')),
1111
parent_group_id='2345',
1115
group=dict(name='group2', tenant_id='tenant2'),
1116
ip_range=dict(cidr='10.234.0.0/24')),
1119
parent_group_id='3456',
1123
group=dict(name='group3', tenant_id='tenant3'),
1124
ip_range=dict(cidr='10.345.0.0/24')),
1127
parent_group_id='4567',
1131
group=dict(name='group4', tenant_id='tenant4'),
1132
ip_range=dict(cidr='10.456.0.0/24'))]
1135
description='description1',
1137
tenant_id='tenant1',
1141
description='description2',
1143
tenant_id='tenant2',
1145
sg_groups = dict(security_groups=groups)
1146
text = self.index_serializer.serialize(sg_groups)
1149
tree = etree.fromstring(text)
1151
self.assertEqual('security_groups', self._tag(tree))
1152
self.assertEqual(len(groups), len(tree))
1153
for idx, child in enumerate(tree):
1154
self._verify_security_group(groups[idx], child)