~zulcss/ubuntu/precise/quantum/trunk

« back to all changes in this revision

Viewing changes to quantum/tests/unit/test_extension_security_group.py

  • Committer: Chuck Short
  • Date: 2012-11-26 19:51:11 UTC
  • mfrom: (26.1.1 raring-proposed)
  • Revision ID: zulcss@ubuntu.com-20121126195111-jnz2cr4xi6whemw2
* New upstream release for the Ubuntu Cloud Archive.
* debian/patches/*: Refreshed for opening of Grizzly.
* New upstream release.
* debian/rules: FTFBS if there is missing binaries.
* debian/quantum-server.install: Add quantum-debug.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2012 OpenStack, LLC.
 
2
#
 
3
# Licensed under the Apache License, Version 2.0 (the "License");
 
4
# you may not use this file except in compliance with the License.
 
5
# You may obtain a copy of the License at
 
6
#
 
7
#    http://www.apache.org/licenses/LICENSE-2.0
 
8
#
 
9
# Unless required by applicable law or agreed to in writing, software
 
10
# distributed under the License is distributed on an "AS IS" BASIS,
 
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
12
# implied.
 
13
# See the License for the specific language governing permissions and
 
14
# limitations under the License.
 
15
 
 
16
import contextlib
 
17
import os
 
18
 
 
19
import mock
 
20
import unittest2
 
21
import webob.exc
 
22
 
 
23
from quantum.api.extensions import PluginAwareExtensionManager
 
24
from quantum.api.v2 import attributes
 
25
from quantum.api.v2.router import APIRouter
 
26
from quantum.common import config
 
27
from quantum.common.test_lib import test_config
 
28
from quantum import context
 
29
from quantum.db import api as db
 
30
from quantum.db import db_base_plugin_v2
 
31
from quantum.db import securitygroups_db
 
32
from quantum.extensions import securitygroup as ext_sg
 
33
from quantum.manager import QuantumManager
 
34
from quantum.openstack.common import cfg
 
35
from quantum.tests.unit import test_db_plugin
 
36
from quantum.tests.unit import test_extensions
 
37
from quantum.wsgi import JSONDeserializer
 
38
 
 
39
DB_PLUGIN_KLASS = ('quantum.tests.unit.test_extension_security_group.'
 
40
                   'SecurityGroupTestPlugin')
 
41
ROOTDIR = os.path.dirname(os.path.dirname(__file__))
 
42
ETCDIR = os.path.join(ROOTDIR, 'etc')
 
43
 
 
44
 
 
45
def etcdir(*p):
 
46
    return os.path.join(ETCDIR, *p)
 
47
 
 
48
 
 
49
class SecurityGroupTestExtensionManager(object):
 
50
 
 
51
    def get_resources(self):
 
52
        return ext_sg.Securitygroup.get_resources()
 
53
 
 
54
    def get_actions(self):
 
55
        return []
 
56
 
 
57
    def get_request_extensions(self):
 
58
        return []
 
59
 
 
60
 
 
61
class SecurityGroupsTestCase(test_db_plugin.QuantumDbPluginV2TestCase,
 
62
                             unittest2.TestCase):
 
63
    def setUp(self, plugin=None):
 
64
        super(SecurityGroupsTestCase, self).setUp()
 
65
        db._ENGINE = None
 
66
        db._MAKER = None
 
67
        # Make sure at each test a new instance of the plugin is returned
 
68
        QuantumManager._instance = None
 
69
        # Make sure at each test according extensions for the plugin is loaded
 
70
        PluginAwareExtensionManager._instance = None
 
71
        # Save the attributes map in case the plugin will alter it
 
72
        # loading extensions
 
73
        # Note(salvatore-orlando): shallow copy is not good enough in
 
74
        # this case, but copy.deepcopy does not seem to work, since it
 
75
        # causes test failures
 
76
        self._attribute_map_bk = {}
 
77
        for item in attributes.RESOURCE_ATTRIBUTE_MAP:
 
78
            self._attribute_map_bk[item] = (attributes.
 
79
                                            RESOURCE_ATTRIBUTE_MAP[item].
 
80
                                            copy())
 
81
        json_deserializer = JSONDeserializer()
 
82
        self._deserializers = {
 
83
            'application/json': json_deserializer,
 
84
        }
 
85
 
 
86
        if not plugin:
 
87
            plugin = test_config.get('plugin_name_v2', DB_PLUGIN_KLASS)
 
88
 
 
89
        # Create the default configurations
 
90
        args = ['--config-file', etcdir('quantum.conf.test')]
 
91
        # If test_config specifies some config-file, use it, as well
 
92
        for config_file in test_config.get('config_files', []):
 
93
            args.extend(['--config-file', config_file])
 
94
        config.parse(args=args)
 
95
        # Update the plugin
 
96
        cfg.CONF.set_override('core_plugin', plugin)
 
97
        self.api = APIRouter()
 
98
 
 
99
        def _is_native_bulk_supported():
 
100
            plugin_obj = QuantumManager.get_plugin()
 
101
            native_bulk_attr_name = ("_%s__native_bulk_support"
 
102
                                     % plugin_obj.__class__.__name__)
 
103
            return getattr(plugin_obj, native_bulk_attr_name, False)
 
104
 
 
105
        self._skip_native_bulk = not _is_native_bulk_supported()
 
106
 
 
107
        QuantumManager.get_plugin().supported_extension_aliases = (
 
108
            ["security-groups"])
 
109
        ext_mgr = SecurityGroupTestExtensionManager()
 
110
        if ext_mgr:
 
111
            self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
 
112
 
 
113
    def tearDown(self):
 
114
        super(SecurityGroupsTestCase, self).tearDown()
 
115
        db._ENGINE = None
 
116
        db._MAKER = None
 
117
        cfg.CONF.reset()
 
118
        # Restore the original attribute map
 
119
        attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk
 
120
 
 
121
    def _create_security_group(self, fmt, name, description, external_id=None,
 
122
                               **kwargs):
 
123
 
 
124
        data = {'security_group': {'name': name,
 
125
                                   'tenant_id': kwargs.get('tenant_id',
 
126
                                                           'test_tenant'),
 
127
                                   'description': description}}
 
128
        if external_id:
 
129
            data['security_group']['external_id'] = external_id
 
130
        security_group_req = self.new_create_request('security-groups', data,
 
131
                                                     fmt)
 
132
        if (kwargs.get('set_context') and 'tenant_id' in kwargs):
 
133
            # create a specific auth context for this request
 
134
            security_group_req.environ['quantum.context'] = (
 
135
                context.Context('', kwargs['tenant_id']))
 
136
        return security_group_req.get_response(self.ext_api)
 
137
 
 
138
    def _build_security_group_rule(self, security_group_id, direction,
 
139
                                   protocol, port_range_min, port_range_max,
 
140
                                   source_ip_prefix=None, source_group_id=None,
 
141
                                   external_id=None, tenant_id='test_tenant'):
 
142
 
 
143
        data = {'security_group_rule': {'security_group_id': security_group_id,
 
144
                                        'direction': direction,
 
145
                                        'protocol': protocol,
 
146
                                        'port_range_min': port_range_min,
 
147
                                        'port_range_max': port_range_max,
 
148
                                        'tenant_id': tenant_id}}
 
149
        if external_id:
 
150
            data['security_group_rule']['external_id'] = external_id
 
151
 
 
152
        if source_ip_prefix:
 
153
            data['security_group_rule']['source_ip_prefix'] = source_ip_prefix
 
154
 
 
155
        if source_group_id:
 
156
            data['security_group_rule']['source_group_id'] = source_group_id
 
157
 
 
158
        return data
 
159
 
 
160
    def _create_security_group_rule(self, fmt, rules, **kwargs):
 
161
 
 
162
        security_group_rule_req = self.new_create_request(
 
163
            'security-group-rules', rules, fmt)
 
164
 
 
165
        if (kwargs.get('set_context') and 'tenant_id' in kwargs):
 
166
            # create a specific auth context for this request
 
167
            security_group_rule_req.environ['quantum.context'] = (
 
168
                context.Context('', kwargs['tenant_id']))
 
169
        return security_group_rule_req.get_response(self.ext_api)
 
170
 
 
171
    @contextlib.contextmanager
 
172
    def security_group(self, name='webservers', description='webservers',
 
173
                       external_id=None, fmt='json', no_delete=False):
 
174
        res = self._create_security_group(fmt, name, description,
 
175
                                          external_id)
 
176
        security_group = self.deserialize(fmt, res)
 
177
        if res.status_int >= 400:
 
178
            raise webob.exc.HTTPClientError(code=res.status_int)
 
179
        yield security_group
 
180
        if not no_delete:
 
181
            self._delete('security-groups',
 
182
                         security_group['security_group']['id'])
 
183
 
 
184
    @contextlib.contextmanager
 
185
    def security_group_rule(self, security_group_id='4cd70774-cc67-4a87-9b39-7'
 
186
                                                    'd1db38eb087',
 
187
                            direction='ingress', protocol='tcp',
 
188
                            port_range_min='22', port_range_max='22',
 
189
                            source_ip_prefix=None, source_group_id=None,
 
190
                            external_id=None, fmt='json', no_delete=False):
 
191
 
 
192
        rule = self._build_security_group_rule(security_group_id, direction,
 
193
                                               protocol, port_range_min,
 
194
                                               port_range_max,
 
195
                                               source_ip_prefix,
 
196
                                               source_group_id, external_id)
 
197
        res = self._create_security_group_rule('json', rule)
 
198
        security_group_rule = self.deserialize(fmt, res)
 
199
        if res.status_int >= 400:
 
200
            raise webob.exc.HTTPClientError(code=res.status_int)
 
201
        yield security_group_rule
 
202
        if not no_delete:
 
203
            self._delete('security-group-rules',
 
204
                         security_group_rule['security_group_rule']['id'])
 
205
 
 
206
 
 
207
class SecurityGroupTestPlugin(db_base_plugin_v2.QuantumDbPluginV2,
 
208
                              securitygroups_db.SecurityGroupDbMixin):
 
209
    """ Test plugin that implements necessary calls on create/delete port for
 
210
    associating ports with security groups.
 
211
    """
 
212
 
 
213
    supported_extension_aliases = ["security-group"]
 
214
 
 
215
    def create_port(self, context, port):
 
216
        tenant_id = self._get_tenant_id_for_create(context, port['port'])
 
217
        default_sg = self._ensure_default_security_group(context, tenant_id)
 
218
        if not port['port'].get(ext_sg.SECURITYGROUP):
 
219
            port['port'][ext_sg.SECURITYGROUP] = [default_sg]
 
220
        self._validate_security_groups_on_port(context, port)
 
221
        session = context.session
 
222
        with session.begin(subtransactions=True):
 
223
            sgids = port['port'].get(ext_sg.SECURITYGROUP)
 
224
            port = super(SecurityGroupTestPlugin, self).create_port(context,
 
225
                                                                    port)
 
226
            self._process_port_create_security_group(context, port['id'],
 
227
                                                     sgids)
 
228
            self._extend_port_dict_security_group(context, port)
 
229
        return port
 
230
 
 
231
    def update_port(self, context, id, port):
 
232
        session = context.session
 
233
        with session.begin(subtransactions=True):
 
234
            self._validate_security_groups_on_port(context, port)
 
235
            # delete the port binding and read it with the new rules
 
236
            self._delete_port_security_group_bindings(context, id)
 
237
            self._process_port_create_security_group(context, id,
 
238
                                                     port['port'].get(
 
239
                                                     ext_sg.SECURITYGROUP))
 
240
            port = super(SecurityGroupTestPlugin, self).update_port(
 
241
                context, id, port)
 
242
            self._extend_port_dict_security_group(context, port)
 
243
        return port
 
244
 
 
245
    def delete_port(self, context, id):
 
246
        session = context.session
 
247
        with session.begin(subtransactions=True):
 
248
            super(SecurityGroupTestPlugin, self).delete_port(context, id)
 
249
            self._delete_port_security_group_bindings(context, id)
 
250
 
 
251
    def create_network(self, context, network):
 
252
        tenant_id = self._get_tenant_id_for_create(context, network['network'])
 
253
        self._ensure_default_security_group(context, tenant_id)
 
254
        return super(SecurityGroupTestPlugin, self).create_network(context,
 
255
                                                                   network)
 
256
 
 
257
 
 
258
class SecurityGroupDBTestCase(SecurityGroupsTestCase):
 
259
    def setUp(self, plugin=None):
 
260
        test_config['plugin_name_v2'] = DB_PLUGIN_KLASS
 
261
        ext_mgr = SecurityGroupTestExtensionManager()
 
262
        test_config['extension_manager'] = ext_mgr
 
263
        super(SecurityGroupDBTestCase, self).setUp()
 
264
 
 
265
 
 
266
class TestSecurityGroups(SecurityGroupDBTestCase):
 
267
    def test_create_security_group(self):
 
268
        name = 'webservers'
 
269
        description = 'my webservers'
 
270
        keys = [('name', name,), ('description', description)]
 
271
        with self.security_group(name, description) as security_group:
 
272
            for k, v, in keys:
 
273
                self.assertEquals(security_group['security_group'][k], v)
 
274
 
 
275
    def test_create_security_group_external_id(self):
 
276
        cfg.CONF.SECURITYGROUP.proxy_mode = True
 
277
        name = 'webservers'
 
278
        description = 'my webservers'
 
279
        external_id = 10
 
280
        keys = [('name', name,), ('description', description),
 
281
                ('external_id', external_id)]
 
282
        with self.security_group(name, description, external_id) as sg:
 
283
            for k, v, in keys:
 
284
                self.assertEquals(sg['security_group'][k], v)
 
285
 
 
286
    def test_default_security_group(self):
 
287
        with self.network():
 
288
            res = self.new_list_request('security-groups')
 
289
            groups = self.deserialize('json', res.get_response(self.ext_api))
 
290
            self.assertEquals(len(groups['security_groups']), 1)
 
291
 
 
292
    def test_create_security_group_proxy_mode_not_admin(self):
 
293
        cfg.CONF.SECURITYGROUP.proxy_mode = True
 
294
        res = self._create_security_group('json', 'webservers',
 
295
                                          'webservers', '1',
 
296
                                          tenant_id='bad_tenant',
 
297
                                          set_context=True)
 
298
        self.deserialize('json', res)
 
299
        self.assertEquals(res.status_int, 500)
 
300
 
 
301
    def test_create_security_group_no_external_id_proxy_mode(self):
 
302
        cfg.CONF.SECURITYGROUP.proxy_mode = True
 
303
        res = self._create_security_group('json', 'webservers',
 
304
                                          'webservers')
 
305
        self.deserialize('json', res)
 
306
        self.assertEquals(res.status_int, 400)
 
307
 
 
308
    def test_create_security_group_no_external_id_not_proxy_mode(self):
 
309
        res = self._create_security_group('json', 'webservers',
 
310
                                          'webservers', '1')
 
311
        self.deserialize('json', res)
 
312
        self.assertEquals(res.status_int, 409)
 
313
 
 
314
    def test_create_default_security_group_fail(self):
 
315
        name = 'default'
 
316
        description = 'my webservers'
 
317
        res = self._create_security_group('json', name, description)
 
318
        self.deserialize('json', res)
 
319
        self.assertEquals(res.status_int, 409)
 
320
 
 
321
    def test_create_security_group_duplicate_external_id(self):
 
322
        cfg.CONF.SECURITYGROUP.proxy_mode = True
 
323
        name = 'webservers'
 
324
        description = 'my webservers'
 
325
        external_id = 1
 
326
        with self.security_group(name, description, external_id):
 
327
            res = self._create_security_group('json', name, description,
 
328
                                              external_id)
 
329
            self.deserialize('json', res)
 
330
            self.assertEquals(res.status_int, 409)
 
331
 
 
332
    def test_list_security_groups(self):
 
333
        name = 'webservers'
 
334
        description = 'my webservers'
 
335
        with self.security_group(name, description):
 
336
            res = self.new_list_request('security-groups')
 
337
            groups = self.deserialize('json', res.get_response(self.ext_api))
 
338
            self.assertEquals(len(groups['security_groups']), 2)
 
339
 
 
340
    def test_get_security_group(self):
 
341
        name = 'webservers'
 
342
        description = 'my webservers'
 
343
        with self.security_group(name, description) as sg:
 
344
            source_group_id = sg['security_group']['id']
 
345
            res = self.new_show_request('security-groups', source_group_id)
 
346
            group = self.deserialize('json', res.get_response(self.ext_api))
 
347
            self.assertEquals(group['security_group']['id'], source_group_id)
 
348
 
 
349
    def test_delete_security_group(self):
 
350
        name = 'webservers'
 
351
        description = 'my webservers'
 
352
        with self.security_group(name, description, no_delete=True) as sg:
 
353
            source_group_id = sg['security_group']['id']
 
354
            self._delete('security-groups', source_group_id, 204)
 
355
 
 
356
    def test_delete_default_security_group_fail(self):
 
357
        with self.network():
 
358
            res = self.new_list_request('security-groups')
 
359
            sg = self.deserialize('json', res.get_response(self.ext_api))
 
360
            self._delete('security-groups', sg['security_groups'][0]['id'],
 
361
                         409)
 
362
 
 
363
    def test_default_security_group_rules(self):
 
364
        with self.network():
 
365
            res = self.new_list_request('security-groups')
 
366
            groups = self.deserialize('json', res.get_response(self.ext_api))
 
367
            self.assertEquals(len(groups['security_groups']), 1)
 
368
            res = self.new_list_request('security-group-rules')
 
369
            rules = self.deserialize('json', res.get_response(self.ext_api))
 
370
            self.assertEquals(len(rules['security_group_rules']), 2)
 
371
            # just generic rules to allow default egress and
 
372
            # intergroup communicartion
 
373
            for rule in rules['security_group_rules']:
 
374
                self.assertEquals(rule['port_range_max'], None)
 
375
                self.assertEquals(rule['port_range_min'], None)
 
376
                self.assertEquals(rule['protocol'], None)
 
377
 
 
378
    def test_create_security_group_rule_source_ip_prefix(self):
 
379
        name = 'webservers'
 
380
        description = 'my webservers'
 
381
        with self.security_group(name, description) as sg:
 
382
            security_group_id = sg['security_group']['id']
 
383
            direction = "ingress"
 
384
            source_ip_prefix = "10.0.0.0/24"
 
385
            protocol = 'tcp'
 
386
            port_range_min = 22
 
387
            port_range_max = 22
 
388
            keys = [('source_ip_prefix', source_ip_prefix),
 
389
                    ('security_group_id', security_group_id),
 
390
                    ('direction', direction),
 
391
                    ('protocol', protocol),
 
392
                    ('port_range_min', port_range_min),
 
393
                    ('port_range_max', port_range_max)]
 
394
            with self.security_group_rule(security_group_id, direction,
 
395
                                          protocol, port_range_min,
 
396
                                          port_range_max,
 
397
                                          source_ip_prefix) as rule:
 
398
                for k, v, in keys:
 
399
                    self.assertEquals(rule['security_group_rule'][k], v)
 
400
 
 
401
    def test_create_security_group_rule_group_id(self):
 
402
        name = 'webservers'
 
403
        description = 'my webservers'
 
404
        with self.security_group(name, description) as sg:
 
405
            with self.security_group(name, description) as sg2:
 
406
                security_group_id = sg['security_group']['id']
 
407
                direction = "ingress"
 
408
                source_group_id = sg2['security_group']['id']
 
409
                protocol = 'tcp'
 
410
                port_range_min = 22
 
411
                port_range_max = 22
 
412
                keys = [('source_group_id', source_group_id),
 
413
                        ('security_group_id', security_group_id),
 
414
                        ('direction', direction),
 
415
                        ('protocol', protocol),
 
416
                        ('port_range_min', port_range_min),
 
417
                        ('port_range_max', port_range_max)]
 
418
                with self.security_group_rule(security_group_id, direction,
 
419
                                              protocol, port_range_min,
 
420
                                              port_range_max,
 
421
                                              source_group_id=source_group_id
 
422
                                              ) as rule:
 
423
                    for k, v, in keys:
 
424
                        self.assertEquals(rule['security_group_rule'][k], v)
 
425
 
 
426
    def test_create_security_group_source_group_ip_and_ip_prefix(self):
 
427
        security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
 
428
        direction = "ingress"
 
429
        source_ip_prefix = "10.0.0.0/24"
 
430
        protocol = 'tcp'
 
431
        port_range_min = 22
 
432
        port_range_max = 22
 
433
        source_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"
 
434
        rule = self._build_security_group_rule(security_group_id, direction,
 
435
                                               protocol, port_range_min,
 
436
                                               port_range_max,
 
437
                                               source_ip_prefix,
 
438
                                               source_group_id)
 
439
        res = self._create_security_group_rule('json', rule)
 
440
        self.deserialize('json', res)
 
441
        self.assertEquals(res.status_int, 400)
 
442
 
 
443
    def test_create_security_group_rule_bad_security_group_id(self):
 
444
        security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
 
445
        direction = "ingress"
 
446
        source_ip_prefix = "10.0.0.0/24"
 
447
        protocol = 'tcp'
 
448
        port_range_min = 22
 
449
        port_range_max = 22
 
450
        rule = self._build_security_group_rule(security_group_id, direction,
 
451
                                               protocol, port_range_min,
 
452
                                               port_range_max,
 
453
                                               source_ip_prefix)
 
454
        res = self._create_security_group_rule('json', rule)
 
455
        self.deserialize('json', res)
 
456
        self.assertEquals(res.status_int, 404)
 
457
 
 
458
    def test_create_security_group_rule_bad_tenant(self):
 
459
        with self.security_group() as sg:
 
460
            rule = {'security_group_rule':
 
461
                    {'security_group_id': sg['security_group']['id'],
 
462
                     'direction': 'ingress',
 
463
                     'protocol': 'tcp',
 
464
                     'port_range_min': '22',
 
465
                     'port_range_max': '22',
 
466
                     'tenant_id': "bad_tenant"}}
 
467
 
 
468
        res = self._create_security_group_rule('json', rule)
 
469
        self.deserialize('json', res)
 
470
        self.assertEquals(res.status_int, 404)
 
471
 
 
472
    def test_create_security_group_rule_exteral_id_proxy_mode(self):
 
473
        cfg.CONF.SECURITYGROUP.proxy_mode = True
 
474
        with self.security_group(external_id=1) as sg:
 
475
            rule = {'security_group_rule':
 
476
                    {'security_group_id': sg['security_group']['id'],
 
477
                     'direction': 'ingress',
 
478
                     'protocol': 'tcp',
 
479
                     'port_range_min': '22',
 
480
                     'port_range_max': '22',
 
481
                     'external_id': '1',
 
482
                     'tenant_id': 'test_tenant',
 
483
                     'source_group_id': sg['security_group']['id']}}
 
484
 
 
485
            res = self._create_security_group_rule('json', rule)
 
486
            self.deserialize('json', res)
 
487
            self.assertEquals(res.status_int, 201)
 
488
 
 
489
    def test_create_security_group_rule_exteral_id_not_proxy_mode(self):
 
490
        with self.security_group() as sg:
 
491
            rule = {'security_group_rule':
 
492
                    {'security_group_id': sg['security_group']['id'],
 
493
                     'direction': 'ingress',
 
494
                     'protocol': 'tcp',
 
495
                     'port_range_min': '22',
 
496
                     'port_range_max': '22',
 
497
                     'external_id': 1,
 
498
                     'tenant_id': 'test_tenant',
 
499
                     'source_group_id': sg['security_group']['id']}}
 
500
 
 
501
            res = self._create_security_group_rule('json', rule)
 
502
            self.deserialize('json', res)
 
503
            self.assertEquals(res.status_int, 409)
 
504
 
 
505
    def test_create_security_group_rule_not_admin(self):
 
506
        cfg.CONF.SECURITYGROUP.proxy_mode = True
 
507
        with self.security_group(external_id='1') as sg:
 
508
            rule = {'security_group_rule':
 
509
                    {'security_group_id': sg['security_group']['id'],
 
510
                     'direction': 'ingress',
 
511
                     'protocol': 'tcp',
 
512
                     'port_range_min': '22',
 
513
                     'port_range_max': '22',
 
514
                     'tenant_id': 'bad_tenant',
 
515
                     'external_id': 1,
 
516
                     'source_group_id': sg['security_group']['id']}}
 
517
 
 
518
            res = self._create_security_group_rule('json', rule,
 
519
                                                   tenant_id='bad_tenant',
 
520
                                                   set_context=True)
 
521
            self.deserialize('json', res)
 
522
            self.assertEquals(res.status_int, 500)
 
523
 
 
524
    def test_create_security_group_rule_bad_tenant_source_group_id(self):
 
525
        with self.security_group() as sg:
 
526
            res = self._create_security_group('json', 'webservers',
 
527
                                              'webservers',
 
528
                                              tenant_id='bad_tenant')
 
529
            sg2 = self.deserialize('json', res)
 
530
            rule = {'security_group_rule':
 
531
                    {'security_group_id': sg2['security_group']['id'],
 
532
                     'direction': 'ingress',
 
533
                     'protocol': 'tcp',
 
534
                     'port_range_min': '22',
 
535
                     'port_range_max': '22',
 
536
                     'tenant_id': 'bad_tenant',
 
537
                     'source_group_id': sg['security_group']['id']}}
 
538
 
 
539
            res = self._create_security_group_rule('json', rule,
 
540
                                                   tenant_id='bad_tenant',
 
541
                                                   set_context=True)
 
542
            self.deserialize('json', res)
 
543
            self.assertEquals(res.status_int, 404)
 
544
 
 
545
    def test_create_security_group_rule_bad_tenant_security_group_rule(self):
 
546
        with self.security_group() as sg:
 
547
            res = self._create_security_group('json', 'webservers',
 
548
                                              'webservers',
 
549
                                              tenant_id='bad_tenant')
 
550
            self.deserialize('json', res)
 
551
            rule = {'security_group_rule':
 
552
                    {'security_group_id': sg['security_group']['id'],
 
553
                     'direction': 'ingress',
 
554
                     'protocol': 'tcp',
 
555
                     'port_range_min': '22',
 
556
                     'port_range_max': '22',
 
557
                     'tenant_id': 'bad_tenant'}}
 
558
 
 
559
            res = self._create_security_group_rule('json', rule,
 
560
                                                   tenant_id='bad_tenant',
 
561
                                                   set_context=True)
 
562
            self.deserialize('json', res)
 
563
            self.assertEquals(res.status_int, 404)
 
564
 
 
565
    def test_create_security_group_rule_bad_source_group_id(self):
 
566
        name = 'webservers'
 
567
        description = 'my webservers'
 
568
        with self.security_group(name, description) as sg:
 
569
            security_group_id = sg['security_group']['id']
 
570
            source_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
 
571
            direction = "ingress"
 
572
            protocol = 'tcp'
 
573
            port_range_min = 22
 
574
            port_range_max = 22
 
575
        rule = self._build_security_group_rule(security_group_id, direction,
 
576
                                               protocol, port_range_min,
 
577
                                               port_range_max,
 
578
                                               source_group_id=source_group_id)
 
579
        res = self._create_security_group_rule('json', rule)
 
580
        self.deserialize('json', res)
 
581
        self.assertEquals(res.status_int, 404)
 
582
 
 
583
    def test_create_security_group_rule_duplicate_rules(self):
 
584
        name = 'webservers'
 
585
        description = 'my webservers'
 
586
        with self.security_group(name, description) as sg:
 
587
            security_group_id = sg['security_group']['id']
 
588
            with self.security_group_rule(security_group_id):
 
589
                rule = self._build_security_group_rule(
 
590
                    sg['security_group']['id'], 'ingress', 'tcp', '22', '22')
 
591
                self._create_security_group_rule('json', rule)
 
592
                res = self._create_security_group_rule('json', rule)
 
593
                self.deserialize('json', res)
 
594
                self.assertEquals(res.status_int, 409)
 
595
 
 
596
    def test_create_security_group_rule_min_port_greater_max(self):
 
597
        name = 'webservers'
 
598
        description = 'my webservers'
 
599
        with self.security_group(name, description) as sg:
 
600
            security_group_id = sg['security_group']['id']
 
601
            with self.security_group_rule(security_group_id):
 
602
                rule = self._build_security_group_rule(
 
603
                    sg['security_group']['id'], 'ingress', 'tcp', '50', '22')
 
604
                self._create_security_group_rule('json', rule)
 
605
                res = self._create_security_group_rule('json', rule)
 
606
                self.deserialize('json', res)
 
607
                self.assertEquals(res.status_int, 400)
 
608
 
 
609
    def test_create_security_group_rule_ports_but_no_protocol(self):
 
610
        name = 'webservers'
 
611
        description = 'my webservers'
 
612
        with self.security_group(name, description) as sg:
 
613
            security_group_id = sg['security_group']['id']
 
614
            with self.security_group_rule(security_group_id):
 
615
                rule = self._build_security_group_rule(
 
616
                    sg['security_group']['id'], 'ingress', None, '22', '22')
 
617
                self._create_security_group_rule('json', rule)
 
618
                res = self._create_security_group_rule('json', rule)
 
619
                self.deserialize('json', res)
 
620
                self.assertEquals(res.status_int, 400)
 
621
 
 
622
    def test_update_port_with_security_group(self):
 
623
        with self.network() as n:
 
624
            with self.subnet(n):
 
625
                with self.security_group() as sg:
 
626
                    res = self._create_port('json', n['network']['id'])
 
627
                    port = self.deserialize('json', res)
 
628
 
 
629
                    data = {'port': {'fixed_ips': port['port']['fixed_ips'],
 
630
                                     'name': port['port']['name'],
 
631
                                     ext_sg.SECURITYGROUP:
 
632
                                     [sg['security_group']['id']]}}
 
633
 
 
634
                    req = self.new_update_request('ports', data,
 
635
                                                  port['port']['id'])
 
636
                    res = self.deserialize('json', req.get_response(self.api))
 
637
                    self.assertEquals(res['port'][ext_sg.SECURITYGROUP][0],
 
638
                                      sg['security_group']['id'])
 
639
                    self._delete('ports', port['port']['id'])
 
640
 
 
641
    def test_update_port_with_multiple_security_groups(self):
 
642
        with self.network() as n:
 
643
            with self.subnet(n):
 
644
                with self.security_group() as sg1:
 
645
                    with self.security_group() as sg2:
 
646
                        res = self._create_port(
 
647
                            'json', n['network']['id'],
 
648
                            security_groups=[sg1['security_group']['id'],
 
649
                                             sg2['security_group']['id']])
 
650
                        port = self.deserialize('json', res)
 
651
                        self.assertEquals(len(
 
652
                            port['port'][ext_sg.SECURITYGROUP]), 2)
 
653
                        self._delete('ports', port['port']['id'])
 
654
 
 
655
    def test_update_port_remove_security_group(self):
 
656
        with self.network() as n:
 
657
            with self.subnet(n):
 
658
                with self.security_group() as sg:
 
659
                    res = self._create_port('json', n['network']['id'],
 
660
                                            security_groups=(
 
661
                                            [sg['security_group']['id']]))
 
662
                    port = self.deserialize('json', res)
 
663
 
 
664
                    data = {'port': {'fixed_ips': port['port']['fixed_ips'],
 
665
                                     'name': port['port']['name']}}
 
666
 
 
667
                    req = self.new_update_request('ports', data,
 
668
                                                  port['port']['id'])
 
669
                    res = self.deserialize('json', req.get_response(self.api))
 
670
                    self.assertEquals(res['port'][ext_sg.SECURITYGROUP], [])
 
671
                    self._delete('ports', port['port']['id'])
 
672
 
 
673
    def test_create_port_with_bad_security_group(self):
 
674
        with self.network() as n:
 
675
            with self.subnet(n):
 
676
                res = self._create_port('json', n['network']['id'],
 
677
                                        security_groups=['bad_id'])
 
678
 
 
679
                self.deserialize('json', res)
 
680
                self.assertEquals(res.status_int, 404)
 
681
 
 
682
    def test_create_delete_security_group_port_in_use(self):
 
683
        with self.network() as n:
 
684
            with self.subnet(n):
 
685
                with self.security_group() as sg:
 
686
                    res = self._create_port('json', n['network']['id'],
 
687
                                            security_groups=(
 
688
                                            [sg['security_group']['id']]))
 
689
                    port = self.deserialize('json', res)
 
690
                    self.assertEquals(port['port'][ext_sg.SECURITYGROUP][0],
 
691
                                      sg['security_group']['id'])
 
692
                    # try to delete security group that's in use
 
693
                    res = self._delete('security-groups',
 
694
                                       sg['security_group']['id'], 409)
 
695
                    # delete the blocking port
 
696
                    self._delete('ports', port['port']['id'])
 
697
 
 
698
    def test_create_security_group_rule_bulk_native(self):
 
699
        if self._skip_native_bulk:
 
700
            self.skipTest("Plugin does not support native bulk "
 
701
                          "security_group_rule create")
 
702
        with self.security_group() as sg:
 
703
            rule1 = self._build_security_group_rule(sg['security_group']['id'],
 
704
                                                    'ingress', 'tcp', '22',
 
705
                                                    '22', '10.0.0.1/24')
 
706
            rule2 = self._build_security_group_rule(sg['security_group']['id'],
 
707
                                                    'ingress', 'tcp', '23',
 
708
                                                    '23', '10.0.0.1/24')
 
709
            rules = {'security_group_rules': [rule1['security_group_rule'],
 
710
                                              rule2['security_group_rule']]}
 
711
            res = self._create_security_group_rule('json', rules)
 
712
            self.deserialize('json', res)
 
713
            self.assertEquals(res.status_int, 201)
 
714
 
 
715
    def test_create_security_group_rule_bulk_emulated(self):
 
716
        real_has_attr = hasattr
 
717
 
 
718
        #ensures the API choose the emulation code path
 
719
        def fakehasattr(item, attr):
 
720
            if attr.endswith('__native_bulk_support'):
 
721
                return False
 
722
            return real_has_attr(item, attr)
 
723
 
 
724
        with mock.patch('__builtin__.hasattr',
 
725
                        new=fakehasattr):
 
726
            with self.security_group() as sg:
 
727
                rule1 = self._build_security_group_rule(
 
728
                    sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
 
729
                    '10.0.0.1/24')
 
730
                rule2 = self._build_security_group_rule(
 
731
                    sg['security_group']['id'], 'ingress', 'tcp', '23', '23',
 
732
                    '10.0.0.1/24')
 
733
                rules = {'security_group_rules': [rule1['security_group_rule'],
 
734
                                                  rule2['security_group_rule']]
 
735
                         }
 
736
                res = self._create_security_group_rule('json', rules)
 
737
                self.deserialize('json', res)
 
738
                self.assertEquals(res.status_int, 201)
 
739
 
 
740
    def test_create_security_group_rule_duplicate_rule_in_post(self):
 
741
        if self._skip_native_bulk:
 
742
            self.skipTest("Plugin does not support native bulk "
 
743
                          "security_group_rule create")
 
744
        with self.security_group() as sg:
 
745
            rule = self._build_security_group_rule(sg['security_group']['id'],
 
746
                                                   'ingress', 'tcp', '22',
 
747
                                                   '22', '10.0.0.1/24')
 
748
            rules = {'security_group_rules': [rule['security_group_rule'],
 
749
                                              rule['security_group_rule']]}
 
750
            res = self._create_security_group_rule('json', rules)
 
751
            rule = self.deserialize('json', res)
 
752
            self.assertEquals(res.status_int, 409)
 
753
 
 
754
    def test_create_security_group_rule_duplicate_rule_in_post_emulated(self):
 
755
        real_has_attr = hasattr
 
756
 
 
757
        #ensures the API choose the emulation code path
 
758
        def fakehasattr(item, attr):
 
759
            if attr.endswith('__native_bulk_support'):
 
760
                return False
 
761
            return real_has_attr(item, attr)
 
762
 
 
763
        with mock.patch('__builtin__.hasattr',
 
764
                        new=fakehasattr):
 
765
 
 
766
            with self.security_group() as sg:
 
767
                rule = self._build_security_group_rule(
 
768
                    sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
 
769
                    '10.0.0.1/24')
 
770
                rules = {'security_group_rules': [rule['security_group_rule'],
 
771
                                                  rule['security_group_rule']]}
 
772
                res = self._create_security_group_rule('json', rules)
 
773
                rule = self.deserialize('json', res)
 
774
                self.assertEquals(res.status_int, 409)
 
775
 
 
776
    def test_create_security_group_rule_duplicate_rule_db(self):
 
777
        if self._skip_native_bulk:
 
778
            self.skipTest("Plugin does not support native bulk "
 
779
                          "security_group_rule create")
 
780
        with self.security_group() as sg:
 
781
            rule = self._build_security_group_rule(sg['security_group']['id'],
 
782
                                                   'ingress', 'tcp', '22',
 
783
                                                   '22', '10.0.0.1/24')
 
784
            rules = {'security_group_rules': [rule]}
 
785
            self._create_security_group_rule('json', rules)
 
786
            res = self._create_security_group_rule('json', rules)
 
787
            rule = self.deserialize('json', res)
 
788
            self.assertEquals(res.status_int, 409)
 
789
 
 
790
    def test_create_security_group_rule_duplicate_rule_db_emulated(self):
 
791
        real_has_attr = hasattr
 
792
 
 
793
        #ensures the API choose the emulation code path
 
794
        def fakehasattr(item, attr):
 
795
            if attr.endswith('__native_bulk_support'):
 
796
                return False
 
797
            return real_has_attr(item, attr)
 
798
 
 
799
        with mock.patch('__builtin__.hasattr',
 
800
                        new=fakehasattr):
 
801
            with self.security_group() as sg:
 
802
                rule = self._build_security_group_rule(
 
803
                    sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
 
804
                    '10.0.0.1/24')
 
805
                rules = {'security_group_rules': [rule]}
 
806
                self._create_security_group_rule('json', rules)
 
807
                res = self._create_security_group_rule('json', rule)
 
808
                self.deserialize('json', res)
 
809
                self.assertEquals(res.status_int, 409)
 
810
 
 
811
    def test_create_security_group_rule_differnt_security_group_ids(self):
 
812
        if self._skip_native_bulk:
 
813
            self.skipTest("Plugin does not support native bulk "
 
814
                          "security_group_rule create")
 
815
        with self.security_group() as sg1:
 
816
            with self.security_group() as sg2:
 
817
                rule1 = self._build_security_group_rule(
 
818
                    sg1['security_group']['id'], 'ingress', 'tcp', '22', '22',
 
819
                    '10.0.0.1/24')
 
820
                rule2 = self._build_security_group_rule(
 
821
                    sg2['security_group']['id'], 'ingress', 'tcp', '23', '23',
 
822
                    '10.0.0.1/24')
 
823
 
 
824
                rules = {'security_group_rules': [rule1['security_group_rule'],
 
825
                                                  rule2['security_group_rule']]
 
826
                         }
 
827
                res = self._create_security_group_rule('json', rules)
 
828
                self.deserialize('json', res)
 
829
                self.assertEquals(res.status_int, 400)