269
269
delete = self.cloud.delete_security_group
270
270
self.assertRaises(exception.ApiError, delete, self.context)
272
def test_authorize_revoke_security_group_ingress(self):
273
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
274
sec = db.security_group_create(self.context, kwargs)
275
authz = self.cloud.authorize_security_group_ingress
276
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
277
authz(self.context, group_name=sec['name'], **kwargs)
272
def test_authorize_security_group_ingress(self):
273
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
274
sec = db.security_group_create(self.context, kwargs)
275
authz = self.cloud.authorize_security_group_ingress
276
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
277
self.assertTrue(authz(self.context, group_name=sec['name'], **kwargs))
279
def test_authorize_security_group_ingress_ip_permissions_ip_ranges(self):
280
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
281
sec = db.security_group_create(self.context, kwargs)
282
authz = self.cloud.authorize_security_group_ingress
283
kwargs = {'ip_permissions': [{'to_port': 81, 'from_port': 81,
285
{'1': {'cidr_ip': u'0.0.0.0/0'},
286
'2': {'cidr_ip': u'10.10.10.10/32'}},
287
'ip_protocol': u'tcp'}]}
288
self.assertTrue(authz(self.context, group_name=sec['name'], **kwargs))
290
def test_authorize_security_group_ingress_ip_permissions_groups(self):
291
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
292
sec = db.security_group_create(self.context, kwargs)
293
authz = self.cloud.authorize_security_group_ingress
294
kwargs = {'ip_permissions': [{'to_port': 81, 'from_port': 81,
295
'ip_ranges':{'1': {'cidr_ip': u'0.0.0.0/0'},
296
'2': {'cidr_ip': u'10.10.10.10/32'}},
297
'groups': {'1': {'user_id': u'someuser',
298
'group_name': u'somegroup1'},
299
'2': {'user_id': u'someuser',
300
'group_name': u'othergroup2'}},
301
'ip_protocol': u'tcp'}]}
302
self.assertTrue(authz(self.context, group_name=sec['name'], **kwargs))
304
def test_revoke_security_group_ingress(self):
305
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
306
sec = db.security_group_create(self.context, kwargs)
307
authz = self.cloud.authorize_security_group_ingress
308
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
309
authz(self.context, group_id=sec['id'], **kwargs)
278
310
revoke = self.cloud.revoke_security_group_ingress
279
311
self.assertTrue(revoke(self.context, group_name=sec['name'], **kwargs))
281
def test_authorize_revoke_security_group_ingress_by_id(self):
282
sec = db.security_group_create(self.context,
283
{'project_id': self.context.project_id,
313
def test_revoke_security_group_ingress_by_id(self):
314
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
315
sec = db.security_group_create(self.context, kwargs)
285
316
authz = self.cloud.authorize_security_group_ingress
286
317
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
287
318
authz(self.context, group_id=sec['id'], **kwargs)
288
319
revoke = self.cloud.revoke_security_group_ingress
289
320
self.assertTrue(revoke(self.context, group_id=sec['id'], **kwargs))
322
def test_authorize_security_group_ingress_by_id(self):
323
sec = db.security_group_create(self.context,
324
{'project_id': self.context.project_id,
326
authz = self.cloud.authorize_security_group_ingress
327
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
328
self.assertTrue(authz(self.context, group_id=sec['id'], **kwargs))
291
330
def test_authorize_security_group_ingress_missing_protocol_params(self):
292
331
sec = db.security_group_create(self.context,
293
332
{'project_id': self.context.project_id,