1
# Copyright 2014 Cisco Systems, Inc. All rights reserved.
3
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4
# not use this file except in compliance with the License. You may obtain
5
# a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12
# License for the specific language governing permissions and limitations
16
from novaclient import exceptions as nova_exc
17
from oslo.config import cfg
18
from oslo.utils import excutils
20
from neutron import context as n_context
21
from neutron.i18n import _LE
22
from neutron import manager
23
from neutron.openstack.common import log as logging
24
from neutron.openstack.common import uuidutils
25
from neutron.plugins.common import constants
27
LOG = logging.getLogger(__name__)
30
_uuid = uuidutils.generate_uuid
33
class DeviceHandlingTestSupportMixin(object):
36
def _core_plugin(self):
37
return manager.NeutronManager.get_plugin()
39
def _mock_l3_admin_tenant(self):
40
# Mock l3 admin tenant
41
self.tenant_id_fcn_p = mock.patch(
42
'neutron.plugins.cisco.db.l3.device_handling_db.'
43
'DeviceHandlingMixin.l3_tenant_id')
44
self.tenant_id_fcn = self.tenant_id_fcn_p.start()
45
self.tenant_id_fcn.return_value = "L3AdminTenantId"
47
def _create_mgmt_nw_for_tests(self, fmt):
48
self._mgmt_nw = self._make_network(fmt,
49
cfg.CONF.general.management_network,
50
True, tenant_id="L3AdminTenantId",
52
self._mgmt_subnet = self._make_subnet(fmt, self._mgmt_nw,
53
"10.0.100.1", "10.0.100.0/24",
56
def _remove_mgmt_nw_for_tests(self):
57
q_p = "network_id=%s" % self._mgmt_nw['network']['id']
58
subnets = self._list('subnets', query_params=q_p)
60
for p in self._list('ports', query_params=q_p).get('ports'):
61
self._delete('ports', p['id'])
62
self._delete('subnets', self._mgmt_subnet['subnet']['id'])
63
self._delete('networks', self._mgmt_nw['network']['id'])
65
# Function used to mock novaclient services list
66
def _novaclient_services_list(self, all=True):
67
services = set(['nova-conductor', 'nova-cert', 'nova-scheduler',
68
'nova-compute', 'nova-consoleauth'])
69
full_list = [FakeResource(binary=res) for res in services]
79
# Function used to mock novaclient servers create
80
def _novaclient_servers_create(self, instance_name, image_id, flavor_id,
81
nics, files, config_drive):
82
fake_vm = FakeResource()
84
p_dict = {'port': {'device_id': fake_vm.id,
85
'device_owner': 'nova'}}
86
self._core_plugin.update_port(n_context.get_admin_context(),
87
nic['port-id'], p_dict)
90
# Function used to mock novaclient servers delete
91
def _novaclient_servers_delete(self, vm_id):
92
q_p = "device_id=%s" % vm_id
93
ports = self._list('ports', query_params=q_p)
94
for port in ports.get('ports', []):
96
self._delete('ports', port['id'])
97
except Exception as e:
98
with excutils.save_and_reraise_exception(reraise=False):
99
LOG.error(_LE('Failed to delete port %(p_id)s for vm '
100
'instance %(v_id)s due to %(err)s'),
101
{'p_id': port['id'], 'v_id': vm_id, 'err': e})
102
raise nova_exc.InternalServerError()
104
def _mock_svc_vm_create_delete(self, plugin):
105
# Mock novaclient methods for creation/deletion of service VMs
107
'neutron.plugins.cisco.l3.service_vm_lib.n_utils.find_resource',
108
lambda *args, **kw: FakeResource()).start()
109
self._nclient_services_mock = mock.MagicMock()
110
self._nclient_services_mock.list = self._novaclient_services_list()
111
mock.patch.object(plugin._svc_vm_mgr._nclient, 'services',
112
self._nclient_services_mock).start()
113
nclient_servers_mock = mock.MagicMock()
114
nclient_servers_mock.create = self._novaclient_servers_create
115
nclient_servers_mock.delete = self._novaclient_servers_delete
116
mock.patch.object(plugin._svc_vm_mgr._nclient, 'servers',
117
nclient_servers_mock).start()
119
def _mock_io_file_ops(self):
120
# Mock library functions for config drive file operations
121
cfg_template = '\n'.join(['interface GigabitEthernet1',
122
'ip address <ip> <mask>',
124
m = mock.mock_open(read_data=cfg_template)
125
m.return_value.__iter__.return_value = cfg_template.splitlines()
126
mock.patch('neutron.plugins.cisco.l3.hosting_device_drivers.'
127
'csr1kv_hd_driver.open', m, create=True).start()
129
def _test_remove_all_hosting_devices(self):
130
"""Removes all hosting devices created during a test."""
131
plugin = manager.NeutronManager.get_service_plugins()[
132
constants.L3_ROUTER_NAT]
133
context = n_context.get_admin_context()
134
plugin.delete_all_hosting_devices(context, True)
136
def _get_fake_resource(self, tenant_id=None, id=None):
137
return {'id': id or _uuid(),
138
'tenant_id': tenant_id or _uuid()}
140
def _get_test_context(self, user_id=None, tenant_id=None, is_admin=False):
141
return n_context.Context(user_id, tenant_id, is_admin,
142
load_admin_roles=True)
145
# Used to fake Glance images, Nova VMs and Nova services
146
class FakeResource(object):
147
def __init__(self, id=None, enabled='enabled', state='up', binary=None):
148
self.id = id or _uuid()
149
self.status = enabled