~corey.bryant/ubuntu/trusty/neutron/lp1318721

« back to all changes in this revision

Viewing changes to neutron/tests/unit/test_policy.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Corey Bryant
  • Date: 2014-10-06 09:15:06 UTC
  • mfrom: (28.1.4 trusty-proposed)
  • Revision ID: package-import@ubuntu.com-20141006091506-cesvev43moce4y74
Tags: 1:2014.1.3-0ubuntu1
[ Corey Bryant ]
* Resynchronize with stable/icehouse (4a0210e) (LP: #1377136):
  - [3a30d19] Deletes floating ip related connection states
  - [dd4b77f] Forbid regular users to reset admin-only attrs to default values
  - [dc2c893] Add delete operations for the ODL MechanismDriver
  - [b51e2c7] Add missing ml2 plugin to migration 1fcfc149aca4
  - [a17a500] Don't convert numeric protocol values to int
  - [3a85946] NSX: Optionally not enforce nat rule match length check
  - [645f984] Don't spawn metadata-proxy for non-isolated nets
  - [b464d89] Big Switch: Check for 'id' in port before lookup
  - [3116ffa] use TRUE in SQL for boolean var
  - [3520e66] call security_groups_member_updated in port_update
  - [50e1534] Don't allow user to set firewall rule with port and no protocol
  - [0061533] BSN: Add context to backend request for debugging
  - [6de6d61] Improve ODL ML2 Exception Handling
  - [2a4153d] Send network name and uuid to subnet create
  - [b5e3c9a] BSN: Allow concurrent reads to consistency DB
  - [b201432] Big Switch: Retry on 503 errors from backend
  - [f6c47ee] NSX: log request body to NSX as debug
  - [97d622a] Fix metadata agent's auth info caching
  - [255df45] NSX: Correct allowed_address_pair return value on create_port
  - [5bea041] Neutron should not use the neutronclient utils module for import_class
  - [d5314e2] Cisco N1kv plugin to send subtype on network profile creation
  - [f32d1ce] Pass object to policy when finding fields to strip
  - [8b5f6be] Call policy.init() once per API request
  - [9a6d811] Perform policy checks only once on list responses
  - [c48db90] Datacenter moid should not be tuple
  - [161d465] Allow unsharing a network used as gateway/floatingip
  - [9574a2f] Add support for router scheduling in Cisco N1kv Plugin
  - [6f54565] Fix func job hook script permission problems
  - [ea43103] Add hook scripts for the functional infra job
  - [8161cb7] Fixes Hyper-V agent issue on Hyper-V 2008 R2
  - [8e99cfd] Fixes Hyper-V issue due to ML2 RPC versioning
  - [69f9121] Ensure ip6tables are used only if ipv6 is enabled in kernel
  - [399b809] Remove explicit dependency on amqplib
  - [a872143] Clear entries in Cisco N1KV specific tables on rollback
  - [ad82fad] Verify ML2 type driver exists before calling del
  - [af2cc98] Big Switch: Only update hash header on success
  - [b1e5eec] Ignore variable column widths in ovsdb functional tests
  - [4a0210e] VMWare: don't notify on disassociate_floatingips()

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
 
25
25
import neutron
26
26
from neutron.api.v2 import attributes
 
27
from neutron.common import constants as const
27
28
from neutron.common import exceptions
28
29
from neutron import context
29
30
from neutron import manager
53
54
            action = "example:test"
54
55
            with open(tmpfilename, "w") as policyfile:
55
56
                policyfile.write("""{"example:test": ""}""")
 
57
            policy.init()
56
58
            policy.enforce(self.context, action, self.target)
57
59
            with open(tmpfilename, "w") as policyfile:
58
60
                policyfile.write("""{"example:test": "!"}""")
59
61
            # NOTE(vish): reset stored policy cache so we don't have to
60
62
            # sleep(1)
61
63
            policy._POLICY_CACHE = {}
 
64
            policy.init()
62
65
            self.assertRaises(exceptions.PolicyNotAuthorized,
63
66
                              policy.enforce,
64
67
                              self.context,
106
109
        result = policy.check(self.context, action, self.target)
107
110
        self.assertEqual(result, False)
108
111
 
109
 
    def test_check_if_exists_non_existent_action_raises(self):
 
112
    def test_check_non_existent_action(self):
110
113
        action = "example:idonotexist"
111
 
        self.assertRaises(exceptions.PolicyRuleNotFound,
112
 
                          policy.check_if_exists,
113
 
                          self.context, action, self.target)
 
114
        result_1 = policy.check(self.context, action, self.target)
 
115
        self.assertFalse(result_1)
 
116
        result_2 = policy.check(self.context, action, self.target,
 
117
                                might_not_exist=True)
 
118
        self.assertTrue(result_2)
114
119
 
115
120
    def test_enforce_good_action(self):
116
121
        action = "example:allowed"
280
285
        self.addCleanup(self.manager_patcher.stop)
281
286
 
282
287
    def _test_action_on_attr(self, context, action, attr, value,
283
 
                             exception=None):
 
288
                             exception=None, **kwargs):
284
289
        action = "%s_network" % action
285
290
        target = {'tenant_id': 'the_owner', attr: value}
 
291
        if kwargs:
 
292
            target.update(kwargs)
286
293
        if exception:
287
294
            self.assertRaises(exception, policy.enforce,
288
295
                              context, action, target)
291
298
            self.assertEqual(result, True)
292
299
 
293
300
    def _test_nonadmin_action_on_attr(self, action, attr, value,
294
 
                                      exception=None):
 
301
                                      exception=None, **kwargs):
295
302
        user_context = context.Context('', "user", roles=['user'])
296
303
        self._test_action_on_attr(user_context, action, attr,
297
 
                                  value, exception)
 
304
                                  value, exception, **kwargs)
298
305
 
299
306
    def test_nonadmin_write_on_private_fails(self):
300
307
        self._test_nonadmin_action_on_attr('create', 'shared', False,
311
318
    def test_nonadmin_read_on_shared_succeeds(self):
312
319
        self._test_nonadmin_action_on_attr('get', 'shared', True)
313
320
 
314
 
    def _test_enforce_adminonly_attribute(self, action):
 
321
    def _test_enforce_adminonly_attribute(self, action, **kwargs):
315
322
        admin_context = context.get_admin_context()
316
323
        target = {'shared': True}
 
324
        if kwargs:
 
325
            target.update(kwargs)
317
326
        result = policy.enforce(admin_context, action, target)
318
327
        self.assertEqual(result, True)
319
328
 
321
330
        self._test_enforce_adminonly_attribute('create_network')
322
331
 
323
332
    def test_enforce_adminonly_attribute_update(self):
324
 
        self._test_enforce_adminonly_attribute('update_network')
 
333
        kwargs = {const.ATTRIBUTES_TO_UPDATE: ['shared']}
 
334
        self._test_enforce_adminonly_attribute('update_network', **kwargs)
 
335
 
 
336
    def test_reset_adminonly_attr_to_default_fails(self):
 
337
        kwargs = {const.ATTRIBUTES_TO_UPDATE: ['shared']}
 
338
        self._test_nonadmin_action_on_attr('update', 'shared', False,
 
339
                                           exceptions.PolicyNotAuthorized,
 
340
                                           **kwargs)
325
341
 
326
342
    def test_enforce_adminonly_attribute_no_context_is_admin_policy(self):
327
343
        del self.rules[policy.ADMIN_CTX_POLICY]
471
487
        # Trigger a policy with rule admin_or_owner
472
488
        action = "create_network"
473
489
        target = {'tenant_id': 'fake'}
 
490
        policy.init()
474
491
        self.assertRaises(exceptions.PolicyCheckError,
475
492
                          policy.enforce,
476
493
                          self.context, action, target)