1
# Copyright 2012 OpenStack Foundation
4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
5
# not use this file except in compliance with the License. You may obtain
6
# a copy of the License at
8
# http://www.apache.org/licenses/LICENSE-2.0
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13
# License for the specific language governing permissions and limitations
17
from oslo_log import log as logging
18
from tempest_lib.common.utils import data_utils
19
from tempest_lib import exceptions as lib_exc
21
from neutron.tests.api.contrib import clients
22
from neutron.tests.tempest import config
23
from neutron.tests.tempest import exceptions
24
import neutron.tests.tempest.test
28
LOG = logging.getLogger(__name__)
31
class BaseNetworkTest(neutron.tests.tempest.test.BaseTestCase):
34
Base class for the Neutron tests that use the Tempest Neutron REST client
36
Per the Neutron API Guide, API v1.x was removed from the source code tree
37
(docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
38
Therefore, v2.x of the Neutron API is assumed. It is also assumed that the
39
following options are defined in the [network] section of etc/tempest.conf:
41
tenant_network_cidr with a block of cidr's from which smaller blocks
42
can be allocated for tenant networks
44
tenant_network_mask_bits with the mask bits to be used to partition the
45
block defined by tenant-network_cidr
47
Finally, it is assumed that the following option is defined in the
48
[service_available] section of etc/tempest.conf
53
force_tenant_isolation = False
59
def resource_setup(cls):
60
# Create no network resources for these test.
61
cls.set_network_resources()
62
super(BaseNetworkTest, cls).resource_setup()
63
if not CONF.service_available.neutron:
64
raise cls.skipException("Neutron support is required")
65
if cls._ip_version == 6 and not CONF.network_feature_enabled.ipv6:
66
raise cls.skipException("IPv6 Tests are disabled.")
68
os = cls.get_client_manager()
70
cls.network_cfg = CONF.network
71
cls.client = os.network_client
79
cls.health_monitors = []
83
cls.metering_labels = []
84
cls.metering_label_rules = []
87
cls.ipsecpolicies = []
88
cls.ethertype = "IPv" + str(cls._ip_version)
91
def resource_cleanup(cls):
92
if CONF.service_available.neutron:
93
# Clean up ipsec policies
94
for ipsecpolicy in cls.ipsecpolicies:
95
cls._try_delete_resource(cls.client.delete_ipsecpolicy,
97
# Clean up firewall policies
98
for fw_policy in cls.fw_policies:
99
cls._try_delete_resource(cls.client.delete_firewall_policy,
101
# Clean up firewall rules
102
for fw_rule in cls.fw_rules:
103
cls._try_delete_resource(cls.client.delete_firewall_rule,
105
# Clean up ike policies
106
for ikepolicy in cls.ikepolicies:
107
cls._try_delete_resource(cls.client.delete_ikepolicy,
109
# Clean up vpn services
110
for vpnservice in cls.vpnservices:
111
cls._try_delete_resource(cls.client.delete_vpnservice,
113
# Clean up floating IPs
114
for floating_ip in cls.floating_ips:
115
cls._try_delete_resource(cls.client.delete_floatingip,
118
for router in cls.routers:
119
cls._try_delete_resource(cls.delete_router,
122
# Clean up health monitors
123
for health_monitor in cls.health_monitors:
124
cls._try_delete_resource(cls.client.delete_health_monitor,
125
health_monitor['id'])
127
for member in cls.members:
128
cls._try_delete_resource(cls.client.delete_member,
132
cls._try_delete_resource(cls.client.delete_vip,
135
for pool in cls.pools:
136
cls._try_delete_resource(cls.client.delete_pool,
138
# Clean up metering label rules
139
for metering_label_rule in cls.metering_label_rules:
140
cls._try_delete_resource(
141
cls.admin_client.delete_metering_label_rule,
142
metering_label_rule['id'])
143
# Clean up metering labels
144
for metering_label in cls.metering_labels:
145
cls._try_delete_resource(
146
cls.admin_client.delete_metering_label,
147
metering_label['id'])
149
for port in cls.ports:
150
cls._try_delete_resource(cls.client.delete_port,
153
for subnet in cls.subnets:
154
cls._try_delete_resource(cls.client.delete_subnet,
157
for network in cls.networks:
158
cls._try_delete_resource(cls.client.delete_network,
160
cls.clear_isolated_creds()
161
super(BaseNetworkTest, cls).resource_cleanup()
164
def _try_delete_resource(self, delete_callable, *args, **kwargs):
165
"""Cleanup resources in case of test-failure
167
Some resources are explicitly deleted by the test.
168
If the test failed to delete a resource, this method will execute
169
the appropriate delete methods. Otherwise, the method ignores NotFound
170
exceptions thrown for resources that were correctly deleted by the
173
:param delete_callable: delete method
174
:param args: arguments for delete method
175
:param kwargs: keyword arguments for delete method
178
delete_callable(*args, **kwargs)
179
# if resource is not found, this means it was deleted in the test
180
except lib_exc.NotFound:
184
def create_network(cls, network_name=None):
185
"""Wrapper utility that returns a test network."""
186
network_name = network_name or data_utils.rand_name('test-network-')
188
body = cls.client.create_network(name=network_name)
189
network = body['network']
190
cls.networks.append(network)
194
def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None,
195
ip_version=None, client=None, **kwargs):
196
"""Wrapper utility that returns a test subnet."""
198
# allow tests to use admin client
202
# The cidr and mask_bits depend on the ip version.
203
ip_version = ip_version if ip_version is not None else cls._ip_version
204
gateway_not_set = gateway == ''
206
cidr = cidr or netaddr.IPNetwork(CONF.network.tenant_network_cidr)
207
mask_bits = mask_bits or CONF.network.tenant_network_mask_bits
208
elif ip_version == 6:
210
cidr or netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr))
211
mask_bits = mask_bits or CONF.network.tenant_network_v6_mask_bits
212
# Find a cidr that is not in use yet and create a subnet with it
213
for subnet_cidr in cidr.subnet(mask_bits):
215
gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
219
body = client.create_subnet(
220
network_id=network['id'],
221
cidr=str(subnet_cidr),
222
ip_version=ip_version,
223
gateway_ip=gateway_ip,
226
except lib_exc.BadRequest as e:
227
is_overlapping_cidr = 'overlaps with another subnet' in str(e)
228
if not is_overlapping_cidr:
231
message = 'Available CIDR for subnet creation could not be found'
232
raise exceptions.BuildErrorException(message)
233
subnet = body['subnet']
234
cls.subnets.append(subnet)
238
def create_port(cls, network, **kwargs):
239
"""Wrapper utility that returns a test port."""
240
body = cls.client.create_port(network_id=network['id'],
243
cls.ports.append(port)
247
def update_port(cls, port, **kwargs):
248
"""Wrapper utility that updates a test port."""
249
body = cls.client.update_port(port['id'],
254
def create_router(cls, router_name=None, admin_state_up=False,
255
external_network_id=None, enable_snat=None):
257
if external_network_id:
258
ext_gw_info['network_id'] = external_network_id
260
ext_gw_info['enable_snat'] = enable_snat
261
body = cls.client.create_router(
262
router_name, external_gateway_info=ext_gw_info,
263
admin_state_up=admin_state_up)
264
router = body['router']
265
cls.routers.append(router)
269
def create_floatingip(cls, external_network_id):
270
"""Wrapper utility that returns a test floating IP."""
271
body = cls.client.create_floatingip(
272
floating_network_id=external_network_id)
273
fip = body['floatingip']
274
cls.floating_ips.append(fip)
278
def create_pool(cls, name, lb_method, protocol, subnet):
279
"""Wrapper utility that returns a test pool."""
280
body = cls.client.create_pool(
284
subnet_id=subnet['id'])
286
cls.pools.append(pool)
290
def update_pool(cls, name):
291
"""Wrapper utility that returns a test pool."""
292
body = cls.client.update_pool(name=name)
297
def create_vip(cls, name, protocol, protocol_port, subnet, pool):
298
"""Wrapper utility that returns a test vip."""
299
body = cls.client.create_vip(name=name,
301
protocol_port=protocol_port,
302
subnet_id=subnet['id'],
309
def update_vip(cls, name):
310
body = cls.client.update_vip(name=name)
315
def create_member(cls, protocol_port, pool, ip_version=None):
316
"""Wrapper utility that returns a test member."""
317
ip_version = ip_version if ip_version is not None else cls._ip_version
318
member_address = "fd00::abcd" if ip_version == 6 else "10.0.9.46"
319
body = cls.client.create_member(address=member_address,
320
protocol_port=protocol_port,
322
member = body['member']
323
cls.members.append(member)
327
def update_member(cls, admin_state_up):
328
body = cls.client.update_member(admin_state_up=admin_state_up)
329
member = body['member']
333
def create_health_monitor(cls, delay, max_retries, Type, timeout):
334
"""Wrapper utility that returns a test health monitor."""
335
body = cls.client.create_health_monitor(delay=delay,
336
max_retries=max_retries,
339
health_monitor = body['health_monitor']
340
cls.health_monitors.append(health_monitor)
341
return health_monitor
344
def update_health_monitor(cls, admin_state_up):
345
body = cls.client.update_vip(admin_state_up=admin_state_up)
346
health_monitor = body['health_monitor']
347
return health_monitor
350
def create_router_interface(cls, router_id, subnet_id):
351
"""Wrapper utility that returns a router interface."""
352
interface = cls.client.add_router_interface_with_subnet_id(
353
router_id, subnet_id)
357
def create_vpnservice(cls, subnet_id, router_id):
358
"""Wrapper utility that returns a test vpn service."""
359
body = cls.client.create_vpnservice(
360
subnet_id=subnet_id, router_id=router_id, admin_state_up=True,
361
name=data_utils.rand_name("vpnservice-"))
362
vpnservice = body['vpnservice']
363
cls.vpnservices.append(vpnservice)
367
def create_ikepolicy(cls, name):
368
"""Wrapper utility that returns a test ike policy."""
369
body = cls.client.create_ikepolicy(name=name)
370
ikepolicy = body['ikepolicy']
371
cls.ikepolicies.append(ikepolicy)
375
def create_firewall_rule(cls, action, protocol):
376
"""Wrapper utility that returns a test firewall rule."""
377
body = cls.client.create_firewall_rule(
378
name=data_utils.rand_name("fw-rule"),
381
fw_rule = body['firewall_rule']
382
cls.fw_rules.append(fw_rule)
386
def create_firewall_policy(cls):
387
"""Wrapper utility that returns a test firewall policy."""
388
body = cls.client.create_firewall_policy(
389
name=data_utils.rand_name("fw-policy"))
390
fw_policy = body['firewall_policy']
391
cls.fw_policies.append(fw_policy)
395
def delete_router(cls, router):
396
body = cls.client.list_router_interfaces(router['id'])
397
interfaces = body['ports']
400
cls.client.remove_router_interface_with_subnet_id(
401
router['id'], i['fixed_ips'][0]['subnet_id'])
402
except lib_exc.NotFound:
404
cls.client.delete_router(router['id'])
407
def create_ipsecpolicy(cls, name):
408
"""Wrapper utility that returns a test ipsec policy."""
409
body = cls.client.create_ipsecpolicy(name=name)
410
ipsecpolicy = body['ipsecpolicy']
411
cls.ipsecpolicies.append(ipsecpolicy)
415
class BaseAdminNetworkTest(BaseNetworkTest):
418
def resource_setup(cls):
419
super(BaseAdminNetworkTest, cls).resource_setup()
422
creds = cls.isolated_creds.get_admin_creds()
423
cls.os_adm = clients.Manager(credentials=creds)
424
except NotImplementedError:
425
msg = ("Missing Administrative Network API credentials "
427
raise cls.skipException(msg)
428
cls.admin_client = cls.os_adm.network_client
431
def create_metering_label(cls, name, description):
432
"""Wrapper utility that returns a test metering label."""
433
body = cls.admin_client.create_metering_label(
434
description=description,
435
name=data_utils.rand_name("metering-label"))
436
metering_label = body['metering_label']
437
cls.metering_labels.append(metering_label)
438
return metering_label
441
def create_metering_label_rule(cls, remote_ip_prefix, direction,
443
"""Wrapper utility that returns a test metering label rule."""
444
body = cls.admin_client.create_metering_label_rule(
445
remote_ip_prefix=remote_ip_prefix, direction=direction,
446
metering_label_id=metering_label_id)
447
metering_label_rule = body['metering_label_rule']
448
cls.metering_label_rules.append(metering_label_rule)
449
return metering_label_rule