~ubuntu-branches/ubuntu/saucy/nova/saucy-proposed

« back to all changes in this revision

Viewing changes to .pc/CVE-2012-2101.patch/nova/tests/api/openstack/compute/contrib/test_security_groups.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2012-05-24 13:12:53 UTC
  • mfrom: (1.1.55)
  • Revision ID: package-import@ubuntu.com-20120524131253-ommql08fg1en06ut
Tags: 2012.2~f1-0ubuntu1
* New upstream release.
* Prepare for quantal:
  - Dropped debian/patches/upstream/0006-Use-project_id-in-ec2.cloud._format_image.patch
  - Dropped debian/patches/upstream/0005-Populate-image-properties-with-project_id-again.patch
  - Dropped debian/patches/upstream/0004-Fixed-bug-962840-added-a-test-case.patch
  - Dropped debian/patches/upstream/0003-Allow-unprivileged-RADOS-users-to-access-rbd-volumes.patch
  - Dropped debian/patches/upstream/0002-Stop-libvirt-test-from-deleting-instances-dir.patch
  - Dropped debian/patches/upstream/0001-fix-bug-where-nova-ignores-glance-host-in-imageref.patch 
  - Dropped debian/patches/0001-fix-useexisting-deprecation-warnings.patch
* debian/control: Add python-keystone as a dependency. (LP: #907197)
* debian/patches/kombu_tests_timeout.patch: Refreshed.
* debian/nova.conf, debian/nova-common.postinst: Convert to new ini
  file configuration
* debian/patches/nova-manage_flagfile_location.patch: Refreshed

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
 
 
3
 
# Copyright 2011 OpenStack LLC
4
 
# Copyright 2012 Justin Santa Barbara
5
 
#
6
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
7
 
#    not use this file except in compliance with the License. You may obtain
8
 
#    a copy of the License at
9
 
#
10
 
#         http://www.apache.org/licenses/LICENSE-2.0
11
 
#
12
 
#    Unless required by applicable law or agreed to in writing, software
13
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15
 
#    License for the specific language governing permissions and limitations
16
 
#    under the License.
17
 
 
18
 
import unittest
19
 
 
20
 
from lxml import etree
21
 
import mox
22
 
import webob
23
 
 
24
 
from nova.api.openstack.compute.contrib import security_groups
25
 
from nova.api.openstack import wsgi
26
 
import nova.db
27
 
from nova import exception
28
 
from nova import test
29
 
from nova.tests.api.openstack import fakes
30
 
 
31
 
 
32
 
FAKE_UUID = 'a47ae74e-ab08-447f-8eee-ffd43fc46c16'
33
 
 
34
 
 
35
 
class AttrDict(dict):
36
 
    def __getattr__(self, k):
37
 
        return self[k]
38
 
 
39
 
 
40
 
def security_group_template(**kwargs):
41
 
    sg = kwargs.copy()
42
 
    sg.setdefault('tenant_id', '123')
43
 
    sg.setdefault('name', 'test')
44
 
    sg.setdefault('description', 'test-description')
45
 
    return sg
46
 
 
47
 
 
48
 
def security_group_db(security_group, id=None):
49
 
    attrs = security_group.copy()
50
 
    if 'tenant_id' in attrs:
51
 
        attrs['project_id'] = attrs.pop('tenant_id')
52
 
    if id is not None:
53
 
        attrs['id'] = id
54
 
    attrs.setdefault('rules', [])
55
 
    attrs.setdefault('instances', [])
56
 
    return AttrDict(attrs)
57
 
 
58
 
 
59
 
def security_group_rule_template(**kwargs):
60
 
    rule = kwargs.copy()
61
 
    rule.setdefault('ip_protocol', 'tcp')
62
 
    rule.setdefault('from_port', 22)
63
 
    rule.setdefault('to_port', 22)
64
 
    rule.setdefault('parent_group_id', 2)
65
 
    return rule
66
 
 
67
 
 
68
 
def security_group_rule_db(rule, id=None):
69
 
    attrs = rule.copy()
70
 
    if 'ip_protocol' in attrs:
71
 
        attrs['protocol'] = attrs.pop('ip_protocol')
72
 
    return AttrDict(attrs)
73
 
 
74
 
 
75
 
def return_server(context, server_id):
76
 
    return {'id': int(server_id),
77
 
            'power_state': 0x01,
78
 
            'host': "localhost",
79
 
            'uuid': FAKE_UUID,
80
 
            'name': 'asdf'}
81
 
 
82
 
 
83
 
def return_server_by_uuid(context, server_uuid):
84
 
    return {'id': 1,
85
 
            'power_state': 0x01,
86
 
            'host': "localhost",
87
 
            'uuid': server_uuid,
88
 
            'name': 'asdf'}
89
 
 
90
 
 
91
 
def return_non_running_server(context, server_id):
92
 
    return {'id': server_id, 'power_state': 0x02, 'uuid': FAKE_UUID,
93
 
            'host': "localhost", 'name': 'asdf'}
94
 
 
95
 
 
96
 
def return_security_group_by_name(context, project_id, group_name):
97
 
    return {'id': 1, 'name': group_name,
98
 
            "instances": [{'id': 1, 'uuid': FAKE_UUID}]}
99
 
 
100
 
 
101
 
def return_security_group_without_instances(context, project_id, group_name):
102
 
    return {'id': 1, 'name': group_name}
103
 
 
104
 
 
105
 
def return_server_nonexistent(context, server_id):
106
 
    raise exception.InstanceNotFound(instance_id=server_id)
107
 
 
108
 
 
109
 
class TestSecurityGroups(test.TestCase):
110
 
    def setUp(self):
111
 
        super(TestSecurityGroups, self).setUp()
112
 
 
113
 
        self.controller = security_groups.SecurityGroupController()
114
 
        self.server_controller = (
115
 
            security_groups.ServerSecurityGroupController())
116
 
        self.manager = security_groups.SecurityGroupActionController()
117
 
 
118
 
    def test_create_security_group(self):
119
 
        sg = security_group_template()
120
 
 
121
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
122
 
        res_dict = self.controller.create(req, {'security_group': sg})
123
 
        self.assertEqual(res_dict['security_group']['name'], 'test')
124
 
        self.assertEqual(res_dict['security_group']['description'],
125
 
                         'test-description')
126
 
 
127
 
    def test_create_security_group_with_no_name(self):
128
 
        sg = security_group_template()
129
 
        del sg['name']
130
 
 
131
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
132
 
        self.assertRaises(webob.exc.HTTPUnprocessableEntity,
133
 
                          self.controller.create, req, sg)
134
 
 
135
 
    def test_create_security_group_with_no_description(self):
136
 
        sg = security_group_template()
137
 
        del sg['description']
138
 
 
139
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
140
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
141
 
                          req, {'security_group': sg})
142
 
 
143
 
    def test_create_security_group_with_blank_name(self):
144
 
        sg = security_group_template(name='')
145
 
 
146
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
147
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
148
 
                          req, {'security_group': sg})
149
 
 
150
 
    def test_create_security_group_with_whitespace_name(self):
151
 
        sg = security_group_template(name=' ')
152
 
 
153
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
154
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
155
 
                          req, {'security_group': sg})
156
 
 
157
 
    def test_create_security_group_with_blank_description(self):
158
 
        sg = security_group_template(description='')
159
 
 
160
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
161
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
162
 
                          req, {'security_group': sg})
163
 
 
164
 
    def test_create_security_group_with_whitespace_description(self):
165
 
        sg = security_group_template(description=' ')
166
 
 
167
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
168
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
169
 
                          req, {'security_group': sg})
170
 
 
171
 
    def test_create_security_group_with_duplicate_name(self):
172
 
        sg = security_group_template()
173
 
 
174
 
        # FIXME: Stub out _get instead of creating twice
175
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
176
 
        self.controller.create(req, {'security_group': sg})
177
 
 
178
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
179
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
180
 
                          req, {'security_group': sg})
181
 
 
182
 
    def test_create_security_group_with_no_body(self):
183
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
184
 
        self.assertRaises(webob.exc.HTTPUnprocessableEntity,
185
 
                          self.controller.create, req, None)
186
 
 
187
 
    def test_create_security_group_with_no_security_group(self):
188
 
        body = {'no-securityGroup': None}
189
 
 
190
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
191
 
        self.assertRaises(webob.exc.HTTPUnprocessableEntity,
192
 
                          self.controller.create, req, body)
193
 
 
194
 
    def test_create_security_group_above_255_characters_name(self):
195
 
        sg = security_group_template(name='1234567890' * 26)
196
 
 
197
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
198
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
199
 
                          req, {'security_group': sg})
200
 
 
201
 
    def test_create_security_group_above_255_characters_description(self):
202
 
        sg = security_group_template(description='1234567890' * 26)
203
 
 
204
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
205
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
206
 
                          req, {'security_group': sg})
207
 
 
208
 
    def test_create_security_group_non_string_name(self):
209
 
        sg = security_group_template(name=12)
210
 
 
211
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
212
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
213
 
                          req, {'security_group': sg})
214
 
 
215
 
    def test_create_security_group_non_string_description(self):
216
 
        sg = security_group_template(description=12)
217
 
 
218
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
219
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
220
 
                          req, {'security_group': sg})
221
 
 
222
 
    def test_get_security_group_list(self):
223
 
        groups = []
224
 
        for i, name in enumerate(['default', 'test']):
225
 
            sg = security_group_template(id=i + 1,
226
 
                                         name=name,
227
 
                                         description=name + '-desc',
228
 
                                         rules=[])
229
 
            groups.append(sg)
230
 
        expected = {'security_groups': groups}
231
 
 
232
 
        def return_security_groups(context, project_id):
233
 
            return [security_group_db(sg) for sg in groups]
234
 
 
235
 
        self.stubs.Set(nova.db, 'security_group_get_by_project',
236
 
                       return_security_groups)
237
 
 
238
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
239
 
        res_dict = self.controller.index(req)
240
 
 
241
 
        self.assertEquals(res_dict, expected)
242
 
 
243
 
    def test_get_security_group_by_instance(self):
244
 
        groups = []
245
 
        for i, name in enumerate(['default', 'test']):
246
 
            sg = security_group_template(id=i + 1,
247
 
                                         name=name,
248
 
                                         description=name + '-desc',
249
 
                                         rules=[])
250
 
            groups.append(sg)
251
 
        expected = {'security_groups': groups}
252
 
 
253
 
        def return_instance(context, server_id):
254
 
            self.assertEquals(server_id, FAKE_UUID)
255
 
            return return_server_by_uuid(context, server_id)
256
 
 
257
 
        self.stubs.Set(nova.db, 'instance_get_by_uuid',
258
 
                       return_instance)
259
 
 
260
 
        def return_security_groups(context, instance_id):
261
 
            self.assertEquals(instance_id, 1)
262
 
            return [security_group_db(sg) for sg in groups]
263
 
 
264
 
        self.stubs.Set(nova.db, 'security_group_get_by_instance',
265
 
                       return_security_groups)
266
 
 
267
 
        req = fakes.HTTPRequest.blank('/v2/%s/servers/%s/os-security-groups' %
268
 
                                      ('fake', FAKE_UUID))
269
 
        res_dict = self.server_controller.index(req, FAKE_UUID)
270
 
 
271
 
        self.assertEquals(res_dict, expected)
272
 
 
273
 
    def test_get_security_group_by_id(self):
274
 
        sg = security_group_template(id=2, rules=[])
275
 
 
276
 
        def return_security_group(context, group_id):
277
 
            self.assertEquals(sg['id'], group_id)
278
 
            return security_group_db(sg)
279
 
 
280
 
        self.stubs.Set(nova.db, 'security_group_get',
281
 
                       return_security_group)
282
 
 
283
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/2')
284
 
        res_dict = self.controller.show(req, '2')
285
 
 
286
 
        expected = {'security_group': sg}
287
 
        self.assertEquals(res_dict, expected)
288
 
 
289
 
    def test_get_security_group_by_invalid_id(self):
290
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/invalid')
291
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,
292
 
                          req, 'invalid')
293
 
 
294
 
    def test_get_security_group_by_non_existing_id(self):
295
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/111111111')
296
 
        self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete,
297
 
                          req, '111111111')
298
 
 
299
 
    def test_delete_security_group_by_id(self):
300
 
        sg = security_group_template(id=1, rules=[])
301
 
 
302
 
        self.called = False
303
 
 
304
 
        def security_group_destroy(context, id):
305
 
            self.called = True
306
 
 
307
 
        def return_security_group(context, group_id):
308
 
            self.assertEquals(sg['id'], group_id)
309
 
            return security_group_db(sg)
310
 
 
311
 
        self.stubs.Set(nova.db, 'security_group_destroy',
312
 
                       security_group_destroy)
313
 
        self.stubs.Set(nova.db, 'security_group_get',
314
 
                       return_security_group)
315
 
 
316
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/1')
317
 
        self.controller.delete(req, '1')
318
 
 
319
 
        self.assertTrue(self.called)
320
 
 
321
 
    def test_delete_security_group_by_invalid_id(self):
322
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/invalid')
323
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,
324
 
                          req, 'invalid')
325
 
 
326
 
    def test_delete_security_group_by_non_existing_id(self):
327
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/11111111')
328
 
        self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete,
329
 
                          req, '11111111')
330
 
 
331
 
    def test_delete_security_group_in_use(self):
332
 
        sg = security_group_template(id=1, rules=[])
333
 
 
334
 
        def security_group_in_use(context, id):
335
 
            return True
336
 
 
337
 
        def return_security_group(context, group_id):
338
 
            self.assertEquals(sg['id'], group_id)
339
 
            return security_group_db(sg)
340
 
 
341
 
        self.stubs.Set(nova.db, 'security_group_in_use',
342
 
                       security_group_in_use)
343
 
        self.stubs.Set(nova.db, 'security_group_get',
344
 
                       return_security_group)
345
 
 
346
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/1')
347
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,
348
 
                          req, '1')
349
 
 
350
 
    def test_associate_by_non_existing_security_group_name(self):
351
 
        body = dict(addSecurityGroup=dict(name='non-existing'))
352
 
 
353
 
        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
354
 
        self.assertRaises(webob.exc.HTTPNotFound,
355
 
                          self.manager._addSecurityGroup, req, '1', body)
356
 
 
357
 
    def test_associate_by_invalid_server_id(self):
358
 
        body = dict(addSecurityGroup=dict(name='test'))
359
 
 
360
 
        req = fakes.HTTPRequest.blank('/v2/fake/servers/invalid/action')
361
 
        self.assertRaises(webob.exc.HTTPNotFound,
362
 
                          self.manager._addSecurityGroup, req, 'invalid', body)
363
 
 
364
 
    def test_associate_without_body(self):
365
 
        self.stubs.Set(nova.db, 'instance_get', return_server)
366
 
        body = dict(addSecurityGroup=None)
367
 
 
368
 
        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
369
 
        self.assertRaises(webob.exc.HTTPBadRequest,
370
 
                          self.manager._addSecurityGroup, req, '1', body)
371
 
 
372
 
    def test_associate_no_security_group_name(self):
373
 
        self.stubs.Set(nova.db, 'instance_get', return_server)
374
 
        body = dict(addSecurityGroup=dict())
375
 
 
376
 
        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
377
 
        self.assertRaises(webob.exc.HTTPBadRequest,
378
 
                          self.manager._addSecurityGroup, req, '1', body)
379
 
 
380
 
    def test_associate_security_group_name_with_whitespaces(self):
381
 
        self.stubs.Set(nova.db, 'instance_get', return_server)
382
 
        body = dict(addSecurityGroup=dict(name="   "))
383
 
 
384
 
        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
385
 
        self.assertRaises(webob.exc.HTTPBadRequest,
386
 
                          self.manager._addSecurityGroup, req, '1', body)
387
 
 
388
 
    def test_associate_non_existing_instance(self):
389
 
        self.stubs.Set(nova.db, 'instance_get', return_server_nonexistent)
390
 
        self.stubs.Set(nova.db, 'instance_get_by_uuid',
391
 
                       return_server_nonexistent)
392
 
        body = dict(addSecurityGroup=dict(name="test"))
393
 
 
394
 
        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
395
 
        self.assertRaises(webob.exc.HTTPNotFound,
396
 
                          self.manager._addSecurityGroup, req, '1', body)
397
 
 
398
 
    def test_associate_non_running_instance(self):
399
 
        self.stubs.Set(nova.db, 'instance_get', return_non_running_server)
400
 
        self.stubs.Set(nova.db, 'instance_get_by_uuid',
401
 
                       return_non_running_server)
402
 
        self.stubs.Set(nova.db, 'security_group_get_by_name',
403
 
                       return_security_group_without_instances)
404
 
        body = dict(addSecurityGroup=dict(name="test"))
405
 
 
406
 
        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
407
 
        self.assertRaises(webob.exc.HTTPBadRequest,
408
 
                          self.manager._addSecurityGroup, req, '1', body)
409
 
 
410
 
    def test_associate_already_associated_security_group_to_instance(self):
411
 
        self.stubs.Set(nova.db, 'instance_get', return_server)
412
 
        self.stubs.Set(nova.db, 'instance_get_by_uuid',
413
 
                       return_server_by_uuid)
414
 
        self.stubs.Set(nova.db, 'security_group_get_by_name',
415
 
                       return_security_group_by_name)
416
 
        body = dict(addSecurityGroup=dict(name="test"))
417
 
 
418
 
        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
419
 
        self.assertRaises(webob.exc.HTTPBadRequest,
420
 
                          self.manager._addSecurityGroup, req, '1', body)
421
 
 
422
 
    def test_associate(self):
423
 
        self.stubs.Set(nova.db, 'instance_get', return_server)
424
 
        self.stubs.Set(nova.db, 'instance_get_by_uuid',
425
 
                       return_server_by_uuid)
426
 
        self.mox.StubOutWithMock(nova.db, 'instance_add_security_group')
427
 
        nova.db.instance_add_security_group(mox.IgnoreArg(),
428
 
                                            mox.IgnoreArg(),
429
 
                                            mox.IgnoreArg())
430
 
        self.stubs.Set(nova.db, 'security_group_get_by_name',
431
 
                       return_security_group_without_instances)
432
 
        self.mox.ReplayAll()
433
 
 
434
 
        body = dict(addSecurityGroup=dict(name="test"))
435
 
 
436
 
        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
437
 
        self.manager._addSecurityGroup(req, '1', body)
438
 
 
439
 
    def test_disassociate_by_non_existing_security_group_name(self):
440
 
        body = dict(removeSecurityGroup=dict(name='non-existing'))
441
 
 
442
 
        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
443
 
        self.assertRaises(webob.exc.HTTPNotFound,
444
 
                          self.manager._removeSecurityGroup, req, '1', body)
445
 
 
446
 
    def test_disassociate_by_invalid_server_id(self):
447
 
        self.stubs.Set(nova.db, 'security_group_get_by_name',
448
 
                       return_security_group_by_name)
449
 
        body = dict(removeSecurityGroup=dict(name='test'))
450
 
 
451
 
        req = fakes.HTTPRequest.blank('/v2/fake/servers/invalid/action')
452
 
        self.assertRaises(webob.exc.HTTPNotFound,
453
 
                          self.manager._removeSecurityGroup, req, 'invalid',
454
 
                          body)
455
 
 
456
 
    def test_disassociate_without_body(self):
457
 
        self.stubs.Set(nova.db, 'instance_get', return_server)
458
 
        body = dict(removeSecurityGroup=None)
459
 
 
460
 
        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
461
 
        self.assertRaises(webob.exc.HTTPBadRequest,
462
 
                          self.manager._removeSecurityGroup, req, '1', body)
463
 
 
464
 
    def test_disassociate_no_security_group_name(self):
465
 
        self.stubs.Set(nova.db, 'instance_get', return_server)
466
 
        body = dict(removeSecurityGroup=dict())
467
 
 
468
 
        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
469
 
        self.assertRaises(webob.exc.HTTPBadRequest,
470
 
                          self.manager._removeSecurityGroup, req, '1', body)
471
 
 
472
 
    def test_disassociate_security_group_name_with_whitespaces(self):
473
 
        self.stubs.Set(nova.db, 'instance_get', return_server)
474
 
        body = dict(removeSecurityGroup=dict(name="   "))
475
 
 
476
 
        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
477
 
        self.assertRaises(webob.exc.HTTPBadRequest,
478
 
                          self.manager._removeSecurityGroup, req, '1', body)
479
 
 
480
 
    def test_disassociate_non_existing_instance(self):
481
 
        self.stubs.Set(nova.db, 'instance_get', return_server_nonexistent)
482
 
        self.stubs.Set(nova.db, 'security_group_get_by_name',
483
 
                       return_security_group_by_name)
484
 
        body = dict(removeSecurityGroup=dict(name="test"))
485
 
 
486
 
        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
487
 
        self.assertRaises(webob.exc.HTTPNotFound,
488
 
                          self.manager._removeSecurityGroup, req, '1', body)
489
 
 
490
 
    def test_disassociate_non_running_instance(self):
491
 
        self.stubs.Set(nova.db, 'instance_get', return_non_running_server)
492
 
        self.stubs.Set(nova.db, 'instance_get_by_uuid',
493
 
                       return_non_running_server)
494
 
        self.stubs.Set(nova.db, 'security_group_get_by_name',
495
 
                       return_security_group_by_name)
496
 
        body = dict(removeSecurityGroup=dict(name="test"))
497
 
 
498
 
        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
499
 
        self.assertRaises(webob.exc.HTTPBadRequest,
500
 
                          self.manager._removeSecurityGroup, req, '1', body)
501
 
 
502
 
    def test_disassociate_already_associated_security_group_to_instance(self):
503
 
        self.stubs.Set(nova.db, 'instance_get', return_server)
504
 
        self.stubs.Set(nova.db, 'instance_get_by_uuid',
505
 
                       return_server_by_uuid)
506
 
        self.stubs.Set(nova.db, 'security_group_get_by_name',
507
 
                       return_security_group_without_instances)
508
 
        body = dict(removeSecurityGroup=dict(name="test"))
509
 
 
510
 
        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
511
 
        self.assertRaises(webob.exc.HTTPBadRequest,
512
 
                          self.manager._removeSecurityGroup, req, '1', body)
513
 
 
514
 
    def test_disassociate(self):
515
 
        self.stubs.Set(nova.db, 'instance_get', return_server)
516
 
        self.stubs.Set(nova.db, 'instance_get_by_uuid',
517
 
                       return_server_by_uuid)
518
 
        self.mox.StubOutWithMock(nova.db, 'instance_remove_security_group')
519
 
        nova.db.instance_remove_security_group(mox.IgnoreArg(),
520
 
                                    mox.IgnoreArg(),
521
 
                                    mox.IgnoreArg())
522
 
        self.stubs.Set(nova.db, 'security_group_get_by_name',
523
 
                       return_security_group_by_name)
524
 
        self.mox.ReplayAll()
525
 
 
526
 
        body = dict(removeSecurityGroup=dict(name="test"))
527
 
 
528
 
        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
529
 
        self.manager._removeSecurityGroup(req, '1', body)
530
 
 
531
 
 
532
 
class TestSecurityGroupRules(test.TestCase):
533
 
    def setUp(self):
534
 
        super(TestSecurityGroupRules, self).setUp()
535
 
 
536
 
        sg1 = security_group_template(id=1)
537
 
        sg2 = security_group_template(id=2,
538
 
                                      name='authorize_revoke',
539
 
                                      description='authorize-revoke testing')
540
 
        db1 = security_group_db(sg1)
541
 
        db2 = security_group_db(sg2)
542
 
 
543
 
        def return_security_group(context, group_id):
544
 
            if group_id == db1['id']:
545
 
                return db1
546
 
            if group_id == db2['id']:
547
 
                return db2
548
 
            raise exception.NotFound()
549
 
 
550
 
        self.stubs.Set(nova.db, 'security_group_get',
551
 
                       return_security_group)
552
 
 
553
 
        self.parent_security_group = db2
554
 
 
555
 
        self.controller = security_groups.SecurityGroupRulesController()
556
 
 
557
 
    def test_create_by_cidr(self):
558
 
        rule = security_group_rule_template(cidr='10.2.3.124/24')
559
 
 
560
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
561
 
        res_dict = self.controller.create(req, {'security_group_rule': rule})
562
 
 
563
 
        security_group_rule = res_dict['security_group_rule']
564
 
        self.assertNotEquals(security_group_rule['id'], 0)
565
 
        self.assertEquals(security_group_rule['parent_group_id'], 2)
566
 
        self.assertEquals(security_group_rule['ip_range']['cidr'],
567
 
                          "10.2.3.124/24")
568
 
 
569
 
    def test_create_by_group_id(self):
570
 
        rule = security_group_rule_template(group_id=1)
571
 
 
572
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
573
 
        res_dict = self.controller.create(req, {'security_group_rule': rule})
574
 
 
575
 
        security_group_rule = res_dict['security_group_rule']
576
 
        self.assertNotEquals(security_group_rule['id'], 0)
577
 
        self.assertEquals(security_group_rule['parent_group_id'], 2)
578
 
 
579
 
    def test_create_by_same_group_id(self):
580
 
        rule1 = security_group_rule_template(group_id=1, from_port=80,
581
 
                                             to_port=80)
582
 
        self.parent_security_group['rules'] = [security_group_rule_db(rule1)]
583
 
 
584
 
        rule2 = security_group_rule_template(group_id=1, from_port=81,
585
 
                                             to_port=81)
586
 
 
587
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
588
 
        res_dict = self.controller.create(req, {'security_group_rule': rule2})
589
 
 
590
 
        security_group_rule = res_dict['security_group_rule']
591
 
        self.assertNotEquals(security_group_rule['id'], 0)
592
 
        self.assertEquals(security_group_rule['parent_group_id'], 2)
593
 
        self.assertEquals(security_group_rule['from_port'], 81)
594
 
        self.assertEquals(security_group_rule['to_port'], 81)
595
 
 
596
 
    def test_create_by_invalid_cidr_json(self):
597
 
        rules = {
598
 
                  "security_group_rule": {
599
 
                        "ip_protocol": "tcp",
600
 
                        "from_port": "22",
601
 
                        "to_port": "22",
602
 
                        "parent_group_id": 2,
603
 
                        "cidr": "10.2.3.124/2433"}}
604
 
        rule = security_group_rule_template(
605
 
                ip_protocol="tcp",
606
 
                from_port=22,
607
 
                to_port=22,
608
 
                parent_group_id=2,
609
 
                cidr="10.2.3.124/2433")
610
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
611
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
612
 
                          req, {'security_group_rule': rule})
613
 
 
614
 
    def test_create_by_invalid_tcp_port_json(self):
615
 
        rule = security_group_rule_template(
616
 
                ip_protocol="tcp",
617
 
                from_port=75534,
618
 
                to_port=22,
619
 
                parent_group_id=2,
620
 
                cidr="10.2.3.124/24")
621
 
 
622
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
623
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
624
 
                          req, {'security_group_rule': rule})
625
 
 
626
 
    def test_create_by_invalid_icmp_port_json(self):
627
 
        rule = security_group_rule_template(
628
 
                ip_protocol="icmp",
629
 
                from_port=1,
630
 
                to_port=256,
631
 
                parent_group_id=2,
632
 
                cidr="10.2.3.124/24")
633
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
634
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
635
 
                          req, {'security_group_rule': rule})
636
 
 
637
 
    def test_create_add_existing_rules_by_cidr(self):
638
 
        rule = security_group_rule_template(cidr='10.0.0.0/24')
639
 
 
640
 
        self.parent_security_group['rules'] = [security_group_rule_db(rule)]
641
 
 
642
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
643
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
644
 
                          req, {'security_group_rule': rule})
645
 
 
646
 
    def test_create_add_existing_rules_by_group_id(self):
647
 
        rule = security_group_rule_template(group_id=1)
648
 
 
649
 
        self.parent_security_group['rules'] = [security_group_rule_db(rule)]
650
 
 
651
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
652
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
653
 
                          req, {'security_group_rule': rule})
654
 
 
655
 
    def test_create_with_no_body(self):
656
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
657
 
        self.assertRaises(webob.exc.HTTPUnprocessableEntity,
658
 
                          self.controller.create, req, None)
659
 
 
660
 
    def test_create_with_no_security_group_rule_in_body(self):
661
 
        rules = {'test': 'test'}
662
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
663
 
        self.assertRaises(webob.exc.HTTPUnprocessableEntity,
664
 
                          self.controller.create, req, rules)
665
 
 
666
 
    def test_create_with_invalid_parent_group_id(self):
667
 
        rule = security_group_rule_template(parent_group_id='invalid')
668
 
 
669
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
670
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
671
 
                          req, {'security_group_rule': rule})
672
 
 
673
 
    def test_create_with_non_existing_parent_group_id(self):
674
 
        rule = security_group_rule_template(group_id='invalid',
675
 
                                            parent_group_id='1111111111111')
676
 
 
677
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
678
 
        self.assertRaises(webob.exc.HTTPNotFound, self.controller.create,
679
 
                          req, {'security_group_rule': rule})
680
 
 
681
 
    def test_create_with_invalid_protocol(self):
682
 
        rule = security_group_rule_template(ip_protocol='invalid-protocol',
683
 
                                            cidr='10.2.2.0/24')
684
 
 
685
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
686
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
687
 
                          req, {'security_group_rule': rule})
688
 
 
689
 
    def test_create_with_no_protocol(self):
690
 
        rule = security_group_rule_template(cidr='10.2.2.0/24')
691
 
        del rule['ip_protocol']
692
 
 
693
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
694
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
695
 
                          req, {'security_group_rule': rule})
696
 
 
697
 
    def test_create_with_invalid_from_port(self):
698
 
        rule = security_group_rule_template(from_port='666666',
699
 
                                            cidr='10.2.2.0/24')
700
 
 
701
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
702
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
703
 
                          req, {'security_group_rule': rule})
704
 
 
705
 
    def test_create_with_invalid_to_port(self):
706
 
        rule = security_group_rule_template(to_port='666666',
707
 
                                            cidr='10.2.2.0/24')
708
 
 
709
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
710
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
711
 
                          req, {'security_group_rule': rule})
712
 
 
713
 
    def test_create_with_non_numerical_from_port(self):
714
 
        rule = security_group_rule_template(from_port='invalid',
715
 
                                            cidr='10.2.2.0/24')
716
 
 
717
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
718
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
719
 
                          req, {'security_group_rule': rule})
720
 
 
721
 
    def test_create_with_non_numerical_to_port(self):
722
 
        rule = security_group_rule_template(to_port='invalid',
723
 
                                            cidr='10.2.2.0/24')
724
 
 
725
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
726
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
727
 
                          req, {'security_group_rule': rule})
728
 
 
729
 
    def test_create_with_no_from_port(self):
730
 
        rule = security_group_rule_template(cidr='10.2.2.0/24')
731
 
        del rule['from_port']
732
 
 
733
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
734
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
735
 
                          req, {'security_group_rule': rule})
736
 
 
737
 
    def test_create_with_no_to_port(self):
738
 
        rule = security_group_rule_template(cidr='10.2.2.0/24')
739
 
        del rule['to_port']
740
 
 
741
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
742
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
743
 
                          req, {'security_group_rule': rule})
744
 
 
745
 
    def test_create_with_invalid_cidr(self):
746
 
        rule = security_group_rule_template(cidr='10.2.2222.0/24')
747
 
 
748
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
749
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
750
 
                          req, {'security_group_rule': rule})
751
 
 
752
 
    def test_create_with_no_cidr_group(self):
753
 
        rule = security_group_rule_template()
754
 
 
755
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
756
 
        res_dict = self.controller.create(req, {'security_group_rule': rule})
757
 
 
758
 
        security_group_rule = res_dict['security_group_rule']
759
 
        self.assertNotEquals(security_group_rule['id'], 0)
760
 
        self.assertEquals(security_group_rule['parent_group_id'],
761
 
                          self.parent_security_group['id'])
762
 
        self.assertEquals(security_group_rule['ip_range']['cidr'],
763
 
                          "0.0.0.0/0")
764
 
 
765
 
    def test_create_with_invalid_group_id(self):
766
 
        rule = security_group_rule_template(group_id='invalid')
767
 
 
768
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
769
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
770
 
                          req, {'security_group_rule': rule})
771
 
 
772
 
    def test_create_with_empty_group_id(self):
773
 
        rule = security_group_rule_template(group_id='')
774
 
 
775
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
776
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
777
 
                          req, {'security_group_rule': rule})
778
 
 
779
 
    def test_create_with_nonexist_group_id(self):
780
 
        rule = security_group_rule_template(group_id='222222')
781
 
 
782
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
783
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
784
 
                          req, {'security_group_rule': rule})
785
 
 
786
 
    def test_create_with_same_group_parent_id_and_group_id(self):
787
 
        rule = security_group_rule_template(group_id=1, parent_group_id=1)
788
 
 
789
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
790
 
        res_dict = self.controller.create(req, {'security_group_rule': rule})
791
 
        security_group_rule = res_dict['security_group_rule']
792
 
        self.assertNotEquals(security_group_rule['id'], 0)
793
 
        self.assertEquals(security_group_rule['parent_group_id'], 1)
794
 
        self.assertEquals(security_group_rule['id'], 1)
795
 
 
796
 
    def _test_create_with_no_ports_and_no_group(self, proto):
797
 
        rule = {'ip_protocol': proto, 'parent_group_id': '2'}
798
 
 
799
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
800
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
801
 
                          req, {'security_group_rule': rule})
802
 
 
803
 
    def  _test_create_with_no_ports(self, proto):
804
 
        rule = {'ip_protocol': proto, 'parent_group_id': '2', 'group_id': '1'}
805
 
 
806
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
807
 
        res_dict = self.controller.create(req, {'security_group_rule': rule})
808
 
 
809
 
        security_group_rule = res_dict['security_group_rule']
810
 
        expected_rule = {
811
 
            'from_port': 1, 'group': {'tenant_id': '123', 'name': 'test'},
812
 
            'ip_protocol': proto, 'to_port': 65535, 'parent_group_id': 2,
813
 
            'ip_range': {}, 'id': 1
814
 
        }
815
 
        if proto == 'icmp':
816
 
            expected_rule['to_port'] = -1
817
 
            expected_rule['from_port'] = -1
818
 
        self.assertTrue(security_group_rule == expected_rule)
819
 
 
820
 
    def test_create_with_no_ports_icmp(self):
821
 
        self._test_create_with_no_ports_and_no_group('icmp')
822
 
        self._test_create_with_no_ports('icmp')
823
 
 
824
 
    def test_create_with_no_ports_tcp(self):
825
 
        self._test_create_with_no_ports_and_no_group('tcp')
826
 
        self._test_create_with_no_ports('tcp')
827
 
 
828
 
    def test_create_with_no_ports_udp(self):
829
 
        self._test_create_with_no_ports_and_no_group('udp')
830
 
        self._test_create_with_no_ports('udp')
831
 
 
832
 
    def  _test_create_with_ports(self, id_val, proto, from_port, to_port):
833
 
        rule = {
834
 
            'ip_protocol': proto, 'from_port': from_port, 'to_port': to_port,
835
 
            'parent_group_id': '2', 'group_id': '1'
836
 
        }
837
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
838
 
        res_dict = self.controller.create(req, {'security_group_rule': rule})
839
 
 
840
 
        security_group_rule = res_dict['security_group_rule']
841
 
        expected_rule = {
842
 
            'from_port': from_port,
843
 
            'group': {'tenant_id': '123', 'name': 'test'},
844
 
            'ip_protocol': proto, 'to_port': to_port, 'parent_group_id': 2,
845
 
            'ip_range': {}, 'id': id_val
846
 
        }
847
 
        self.assertTrue(security_group_rule['ip_protocol'] == proto)
848
 
        self.assertTrue(security_group_rule['id'] == id_val)
849
 
        self.assertTrue(security_group_rule['from_port'] == from_port)
850
 
        self.assertTrue(security_group_rule['to_port'] == to_port)
851
 
        self.assertTrue(security_group_rule == expected_rule)
852
 
 
853
 
    def test_create_with_ports_icmp(self):
854
 
        self._test_create_with_ports(1, 'icmp', 0, 1)
855
 
        self._test_create_with_ports(2, 'icmp', 0, 0)
856
 
        self._test_create_with_ports(3, 'icmp', 1, 0)
857
 
 
858
 
    def test_create_with_ports_tcp(self):
859
 
        self._test_create_with_ports(1, 'tcp', 1, 1)
860
 
        self._test_create_with_ports(2, 'tcp', 1, 65535)
861
 
        self._test_create_with_ports(3, 'tcp', 65535, 65535)
862
 
 
863
 
    def test_create_with_ports_udp(self):
864
 
        self._test_create_with_ports(1, 'udp', 1, 1)
865
 
        self._test_create_with_ports(2, 'udp', 1, 65535)
866
 
        self._test_create_with_ports(3, 'udp', 65535, 65535)
867
 
 
868
 
    def test_delete(self):
869
 
        rule = security_group_rule_template(id=10)
870
 
 
871
 
        def security_group_rule_get(context, id):
872
 
            return security_group_rule_db(rule)
873
 
 
874
 
        def security_group_rule_destroy(context, id):
875
 
            pass
876
 
 
877
 
        self.stubs.Set(nova.db, 'security_group_rule_get',
878
 
                       security_group_rule_get)
879
 
        self.stubs.Set(nova.db, 'security_group_rule_destroy',
880
 
                       security_group_rule_destroy)
881
 
 
882
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules/10')
883
 
        self.controller.delete(req, '10')
884
 
 
885
 
    def test_delete_invalid_rule_id(self):
886
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules' +
887
 
                                      '/invalid')
888
 
        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,
889
 
                          req, 'invalid')
890
 
 
891
 
    def test_delete_non_existing_rule_id(self):
892
 
        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules' +
893
 
                                      '/22222222222222')
894
 
        self.assertRaises(webob.exc.HTTPNotFound, self.controller.delete,
895
 
                          req, '22222222222222')
896
 
 
897
 
 
898
 
class TestSecurityGroupRulesXMLDeserializer(unittest.TestCase):
899
 
 
900
 
    def setUp(self):
901
 
        self.deserializer = security_groups.SecurityGroupRulesXMLDeserializer()
902
 
 
903
 
    def test_create_request(self):
904
 
        serial_request = """
905
 
<security_group_rule>
906
 
  <parent_group_id>12</parent_group_id>
907
 
  <from_port>22</from_port>
908
 
  <to_port>22</to_port>
909
 
  <group_id></group_id>
910
 
  <ip_protocol>tcp</ip_protocol>
911
 
  <cidr>10.0.0.0/24</cidr>
912
 
</security_group_rule>"""
913
 
        request = self.deserializer.deserialize(serial_request)
914
 
        expected = {
915
 
            "security_group_rule": {
916
 
                "parent_group_id": "12",
917
 
                "from_port": "22",
918
 
                "to_port": "22",
919
 
                "ip_protocol": "tcp",
920
 
                "group_id": "",
921
 
                "cidr": "10.0.0.0/24",
922
 
            },
923
 
        }
924
 
        self.assertEquals(request['body'], expected)
925
 
 
926
 
    def test_create_no_protocol_request(self):
927
 
        serial_request = """
928
 
<security_group_rule>
929
 
  <parent_group_id>12</parent_group_id>
930
 
  <from_port>22</from_port>
931
 
  <to_port>22</to_port>
932
 
  <group_id></group_id>
933
 
  <cidr>10.0.0.0/24</cidr>
934
 
</security_group_rule>"""
935
 
        request = self.deserializer.deserialize(serial_request)
936
 
        expected = {
937
 
            "security_group_rule": {
938
 
                "parent_group_id": "12",
939
 
                "from_port": "22",
940
 
                "to_port": "22",
941
 
                "group_id": "",
942
 
                "cidr": "10.0.0.0/24",
943
 
            },
944
 
        }
945
 
        self.assertEquals(request['body'], expected)
946
 
 
947
 
 
948
 
class TestSecurityGroupXMLDeserializer(unittest.TestCase):
949
 
 
950
 
    def setUp(self):
951
 
        self.deserializer = security_groups.SecurityGroupXMLDeserializer()
952
 
 
953
 
    def test_create_request(self):
954
 
        serial_request = """
955
 
<security_group name="test">
956
 
   <description>test</description>
957
 
</security_group>"""
958
 
        request = self.deserializer.deserialize(serial_request)
959
 
        expected = {
960
 
            "security_group": {
961
 
                "name": "test",
962
 
                "description": "test",
963
 
            },
964
 
        }
965
 
        self.assertEquals(request['body'], expected)
966
 
 
967
 
    def test_create_no_description_request(self):
968
 
        serial_request = """
969
 
<security_group name="test">
970
 
</security_group>"""
971
 
        request = self.deserializer.deserialize(serial_request)
972
 
        expected = {
973
 
            "security_group": {
974
 
                "name": "test",
975
 
            },
976
 
        }
977
 
        self.assertEquals(request['body'], expected)
978
 
 
979
 
    def test_create_no_name_request(self):
980
 
        serial_request = """
981
 
<security_group>
982
 
<description>test</description>
983
 
</security_group>"""
984
 
        request = self.deserializer.deserialize(serial_request)
985
 
        expected = {
986
 
            "security_group": {
987
 
                "description": "test",
988
 
            },
989
 
        }
990
 
        self.assertEquals(request['body'], expected)
991
 
 
992
 
 
993
 
class TestSecurityGroupXMLSerializer(unittest.TestCase):
994
 
    def setUp(self):
995
 
        self.namespace = wsgi.XMLNS_V11
996
 
        self.rule_serializer = security_groups.SecurityGroupRuleTemplate()
997
 
        self.index_serializer = security_groups.SecurityGroupsTemplate()
998
 
        self.default_serializer = security_groups.SecurityGroupTemplate()
999
 
 
1000
 
    def _tag(self, elem):
1001
 
        tagname = elem.tag
1002
 
        self.assertEqual(tagname[0], '{')
1003
 
        tmp = tagname.partition('}')
1004
 
        namespace = tmp[0][1:]
1005
 
        self.assertEqual(namespace, self.namespace)
1006
 
        return tmp[2]
1007
 
 
1008
 
    def _verify_security_group_rule(self, raw_rule, tree):
1009
 
        self.assertEqual(raw_rule['id'], tree.get('id'))
1010
 
        self.assertEqual(raw_rule['parent_group_id'],
1011
 
                         tree.get('parent_group_id'))
1012
 
 
1013
 
        seen = set()
1014
 
        expected = set(['ip_protocol', 'from_port', 'to_port',
1015
 
                        'group', 'group/name', 'group/tenant_id',
1016
 
                        'ip_range', 'ip_range/cidr'])
1017
 
 
1018
 
        for child in tree:
1019
 
            child_tag = self._tag(child)
1020
 
            self.assertTrue(child_tag in raw_rule)
1021
 
            seen.add(child_tag)
1022
 
            if child_tag in ('group', 'ip_range'):
1023
 
                for gr_child in child:
1024
 
                    gr_child_tag = self._tag(gr_child)
1025
 
                    self.assertTrue(gr_child_tag in raw_rule[child_tag])
1026
 
                    seen.add('%s/%s' % (child_tag, gr_child_tag))
1027
 
                    self.assertEqual(gr_child.text,
1028
 
                                     raw_rule[child_tag][gr_child_tag])
1029
 
            else:
1030
 
                self.assertEqual(child.text, raw_rule[child_tag])
1031
 
        self.assertEqual(seen, expected)
1032
 
 
1033
 
    def _verify_security_group(self, raw_group, tree):
1034
 
        rules = raw_group['rules']
1035
 
        self.assertEqual('security_group', self._tag(tree))
1036
 
        self.assertEqual(raw_group['id'], tree.get('id'))
1037
 
        self.assertEqual(raw_group['tenant_id'], tree.get('tenant_id'))
1038
 
        self.assertEqual(raw_group['name'], tree.get('name'))
1039
 
        self.assertEqual(2, len(tree))
1040
 
        for child in tree:
1041
 
            child_tag = self._tag(child)
1042
 
            if child_tag == 'rules':
1043
 
                self.assertEqual(2, len(child))
1044
 
                for idx, gr_child in enumerate(child):
1045
 
                    self.assertEqual(self._tag(gr_child), 'rule')
1046
 
                    self._verify_security_group_rule(rules[idx], gr_child)
1047
 
            else:
1048
 
                self.assertEqual('description', child_tag)
1049
 
                self.assertEqual(raw_group['description'], child.text)
1050
 
 
1051
 
    def test_rule_serializer(self):
1052
 
        raw_rule = dict(
1053
 
            id='123',
1054
 
            parent_group_id='456',
1055
 
            ip_protocol='tcp',
1056
 
            from_port='789',
1057
 
            to_port='987',
1058
 
            group=dict(name='group', tenant_id='tenant'),
1059
 
            ip_range=dict(cidr='10.0.0.0/8'))
1060
 
        rule = dict(security_group_rule=raw_rule)
1061
 
        text = self.rule_serializer.serialize(rule)
1062
 
 
1063
 
        print text
1064
 
        tree = etree.fromstring(text)
1065
 
 
1066
 
        self.assertEqual('security_group_rule', self._tag(tree))
1067
 
        self._verify_security_group_rule(raw_rule, tree)
1068
 
 
1069
 
    def test_group_serializer(self):
1070
 
        rules = [dict(
1071
 
                id='123',
1072
 
                parent_group_id='456',
1073
 
                ip_protocol='tcp',
1074
 
                from_port='789',
1075
 
                to_port='987',
1076
 
                group=dict(name='group1', tenant_id='tenant1'),
1077
 
                ip_range=dict(cidr='10.55.44.0/24')),
1078
 
                 dict(
1079
 
                id='654',
1080
 
                parent_group_id='321',
1081
 
                ip_protocol='udp',
1082
 
                from_port='234',
1083
 
                to_port='567',
1084
 
                group=dict(name='group2', tenant_id='tenant2'),
1085
 
                ip_range=dict(cidr='10.44.55.0/24'))]
1086
 
        raw_group = dict(
1087
 
            id='890',
1088
 
            description='description',
1089
 
            name='name',
1090
 
            tenant_id='tenant',
1091
 
            rules=rules)
1092
 
        sg_group = dict(security_group=raw_group)
1093
 
        text = self.default_serializer.serialize(sg_group)
1094
 
 
1095
 
        print text
1096
 
        tree = etree.fromstring(text)
1097
 
 
1098
 
        self._verify_security_group(raw_group, tree)
1099
 
 
1100
 
    def test_groups_serializer(self):
1101
 
        rules = [dict(
1102
 
                id='123',
1103
 
                parent_group_id='1234',
1104
 
                ip_protocol='tcp',
1105
 
                from_port='12345',
1106
 
                to_port='123456',
1107
 
                group=dict(name='group1', tenant_id='tenant1'),
1108
 
                ip_range=dict(cidr='10.123.0.0/24')),
1109
 
                 dict(
1110
 
                id='234',
1111
 
                parent_group_id='2345',
1112
 
                ip_protocol='udp',
1113
 
                from_port='23456',
1114
 
                to_port='234567',
1115
 
                group=dict(name='group2', tenant_id='tenant2'),
1116
 
                ip_range=dict(cidr='10.234.0.0/24')),
1117
 
                 dict(
1118
 
                id='345',
1119
 
                parent_group_id='3456',
1120
 
                ip_protocol='tcp',
1121
 
                from_port='34567',
1122
 
                to_port='345678',
1123
 
                group=dict(name='group3', tenant_id='tenant3'),
1124
 
                ip_range=dict(cidr='10.345.0.0/24')),
1125
 
                 dict(
1126
 
                id='456',
1127
 
                parent_group_id='4567',
1128
 
                ip_protocol='udp',
1129
 
                from_port='45678',
1130
 
                to_port='456789',
1131
 
                group=dict(name='group4', tenant_id='tenant4'),
1132
 
                ip_range=dict(cidr='10.456.0.0/24'))]
1133
 
        groups = [dict(
1134
 
                id='567',
1135
 
                description='description1',
1136
 
                name='name1',
1137
 
                tenant_id='tenant1',
1138
 
                rules=rules[0:2]),
1139
 
                  dict(
1140
 
                id='678',
1141
 
                description='description2',
1142
 
                name='name2',
1143
 
                tenant_id='tenant2',
1144
 
                rules=rules[2:4])]
1145
 
        sg_groups = dict(security_groups=groups)
1146
 
        text = self.index_serializer.serialize(sg_groups)
1147
 
 
1148
 
        print text
1149
 
        tree = etree.fromstring(text)
1150
 
 
1151
 
        self.assertEqual('security_groups', self._tag(tree))
1152
 
        self.assertEqual(len(groups), len(tree))
1153
 
        for idx, child in enumerate(tree):
1154
 
            self._verify_security_group(groups[idx], child)