1
# Copyright 2014 Cisco Systems, Inc. All rights reserved.
3
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4
# not use this file except in compliance with the License. You may obtain
5
# a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12
# License for the specific language governing permissions and limitations
17
from oslo.config import cfg
18
from oslo_concurrency import lockutils
19
from sqlalchemy.orm import exc
20
from sqlalchemy.orm import joinedload
21
from sqlalchemy.sql import expression as expr
23
from neutron.common import constants as l3_constants
24
from neutron.common import exceptions as n_exc
25
from neutron.common import rpc as n_rpc
26
from neutron.common import topics
27
from neutron import context as n_context
28
from neutron.db import agents_db
29
from neutron.db import extraroute_db
30
from neutron.db import l3_db
31
from neutron.db import models_v2
32
from neutron.db import portbindings_db as p_binding
33
from neutron.extensions import providernet as pr_net
34
from neutron.i18n import _LE, _LI
35
from neutron.openstack.common import log as logging
36
from neutron.openstack.common import loopingcall
37
from neutron.plugins.cisco.common import cisco_constants as c_const
38
from neutron.plugins.cisco.db.l3 import l3_models
39
from neutron.plugins.cisco.l3.rpc import l3_router_rpc_joint_agent_api
41
LOG = logging.getLogger(__name__)
44
ROUTER_APPLIANCE_OPTS = [
45
cfg.IntOpt('backlog_processing_interval',
47
help=_('Time in seconds between renewed scheduling attempts of '
48
'non-scheduled routers.')),
51
cfg.CONF.register_opts(ROUTER_APPLIANCE_OPTS, "general")
54
class RouterCreateInternalError(n_exc.NeutronException):
55
message = _("Router could not be created due to internal error.")
58
class RouterInternalError(n_exc.NeutronException):
59
message = _("Internal error during router processing.")
62
class RouterBindingInfoError(n_exc.NeutronException):
63
message = _("Could not get binding information for router %(router_id)s.")
66
class L3RouterApplianceDBMixin(extraroute_db.ExtraRoute_dbonly_mixin):
67
"""Mixin class implementing Neutron's routing service using appliances."""
69
# Dictionary of routers for which new scheduling attempts should
70
# be made and the refresh setting and heartbeat for that.
71
_backlogged_routers = {}
72
_refresh_router_backlog = True
76
def l3_cfg_rpc_notifier(self):
77
if not hasattr(self, '_l3_cfg_rpc_notifier'):
78
self._l3_cfg_rpc_notifier = (l3_router_rpc_joint_agent_api.
79
L3RouterJointAgentNotifyAPI(self))
80
return self._l3_cfg_rpc_notifier
82
@l3_cfg_rpc_notifier.setter
83
def l3_cfg_rpc_notifier(self, value):
84
self._l3_cfg_rpc_notifier = value
86
def create_router(self, context, router):
87
with context.session.begin(subtransactions=True):
88
if self.mgmt_nw_id() is None:
89
raise RouterCreateInternalError()
90
router_created = (super(L3RouterApplianceDBMixin, self).
91
create_router(context, router))
92
r_hd_b_db = l3_models.RouterHostingDeviceBinding(
93
router_id=router_created['id'],
95
hosting_device_id=None)
96
context.session.add(r_hd_b_db)
97
# backlog so this new router gets scheduled asynchronously
98
self.backlog_router(r_hd_b_db['router'])
101
def update_router(self, context, id, router):
103
# Check if external gateway has changed so we may have to
105
o_r_db = self._get_router(context, id)
106
old_ext_gw = (o_r_db.gw_port or {}).get('network_id')
107
new_ext_gw = (r.get('external_gateway_info', {}) or {}).get(
109
with context.session.begin(subtransactions=True):
110
e_context = context.elevated()
111
if old_ext_gw is not None and old_ext_gw != new_ext_gw:
112
o_r = self._make_router_dict(o_r_db, process_extensions=False)
113
# no need to schedule now since we're only doing this to
114
# tear-down connectivity and there won't be any if not
116
self._add_type_and_hosting_device_info(e_context, o_r,
118
p_drv = self.get_hosting_device_plugging_driver()
119
if p_drv is not None:
120
p_drv.teardown_logical_port_connectivity(e_context,
123
super(L3RouterApplianceDBMixin, self).update_router(
124
context, id, router))
125
routers = [copy.deepcopy(router_updated)]
126
self._add_type_and_hosting_device_info(e_context, routers[0])
127
self.l3_cfg_rpc_notifier.routers_updated(context, routers)
128
return router_updated
130
def delete_router(self, context, id):
131
router_db = self._get_router(context, id)
132
router = self._make_router_dict(router_db)
133
with context.session.begin(subtransactions=True):
134
e_context = context.elevated()
135
r_hd_binding = self._get_router_binding_info(e_context, id)
136
self._add_type_and_hosting_device_info(
137
e_context, router, binding_info=r_hd_binding, schedule=False)
138
if router_db.gw_port is not None:
139
p_drv = self.get_hosting_device_plugging_driver()
140
if p_drv is not None:
141
p_drv.teardown_logical_port_connectivity(e_context,
143
# conditionally remove router from backlog just to be sure
144
self.remove_router_from_backlog(id)
145
if router['hosting_device'] is not None:
146
self.unschedule_router_from_hosting_device(context,
148
super(L3RouterApplianceDBMixin, self).delete_router(context, id)
149
self.l3_cfg_rpc_notifier.router_deleted(context, router)
151
def notify_router_interface_action(
152
self, context, router_interface_info, routers, action):
153
l3_method = '%s_router_interface' % action
154
self.l3_cfg_rpc_notifier.routers_updated(context, routers, l3_method)
156
mapping = {'add': 'create', 'remove': 'delete'}
157
notifier = n_rpc.get_notifier('network')
158
router_event = 'router.interface.%s' % mapping[action]
159
notifier.info(context, router_event,
160
{'router_interface': router_interface_info})
162
def add_router_interface(self, context, router_id, interface_info):
163
with context.session.begin(subtransactions=True):
164
info = (super(L3RouterApplianceDBMixin, self).
165
add_router_interface(context, router_id, interface_info))
166
routers = [self.get_router(context, router_id)]
167
self._add_type_and_hosting_device_info(context.elevated(),
169
self.notify_router_interface_action(context, info, routers, 'add')
172
def remove_router_interface(self, context, router_id, interface_info):
173
if 'port_id' in (interface_info or {}):
174
port_db = self._core_plugin._get_port(
175
context, interface_info['port_id'])
176
elif 'subnet_id' in (interface_info or {}):
177
subnet_db = self._core_plugin._get_subnet(
178
context, interface_info['subnet_id'])
179
port_db = self._get_router_port_db_on_subnet(
180
context, router_id, subnet_db)
182
msg = _("Either subnet_id or port_id must be specified")
183
raise n_exc.BadRequest(resource='router', msg=msg)
184
routers = [self.get_router(context, router_id)]
185
with context.session.begin(subtransactions=True):
186
e_context = context.elevated()
187
self._add_type_and_hosting_device_info(e_context, routers[0])
188
p_drv = self.get_hosting_device_plugging_driver()
189
if p_drv is not None:
190
p_drv.teardown_logical_port_connectivity(e_context, port_db)
191
info = (super(L3RouterApplianceDBMixin, self).
192
remove_router_interface(context, router_id,
194
self.notify_router_interface_action(context, info, routers, 'remove')
197
def create_floatingip(
198
self, context, floatingip,
199
initial_status=l3_constants.FLOATINGIP_STATUS_ACTIVE):
200
with context.session.begin(subtransactions=True):
201
info = super(L3RouterApplianceDBMixin, self).create_floatingip(
203
if info['router_id']:
204
routers = [self.get_router(context, info['router_id'])]
205
self._add_type_and_hosting_device_info(context.elevated(),
207
self.l3_cfg_rpc_notifier.routers_updated(context, routers,
211
def update_floatingip(self, context, id, floatingip):
212
orig_fl_ip = super(L3RouterApplianceDBMixin, self).get_floatingip(
214
before_router_id = orig_fl_ip['router_id']
215
with context.session.begin(subtransactions=True):
216
info = super(L3RouterApplianceDBMixin, self).update_floatingip(
217
context, id, floatingip)
220
router_ids.append(before_router_id)
221
router_id = info['router_id']
222
if router_id and router_id != before_router_id:
223
router_ids.append(router_id)
225
for router_id in router_ids:
226
router = self.get_router(context, router_id)
227
self._add_type_and_hosting_device_info(context.elevated(),
229
routers.append(router)
230
self.l3_cfg_rpc_notifier.routers_updated(context, routers,
234
def delete_floatingip(self, context, id):
235
floatingip_db = self._get_floatingip(context, id)
236
router_id = floatingip_db['router_id']
237
with context.session.begin(subtransactions=True):
238
super(L3RouterApplianceDBMixin, self).delete_floatingip(
241
routers = [self.get_router(context, router_id)]
242
self._add_type_and_hosting_device_info(context.elevated(),
244
self.l3_cfg_rpc_notifier.routers_updated(context, routers,
247
def disassociate_floatingips(self, context, port_id, do_notify=True):
248
with context.session.begin(subtransactions=True):
249
router_ids = super(L3RouterApplianceDBMixin,
250
self).disassociate_floatingips(context, port_id)
251
if router_ids and do_notify:
253
for router_id in router_ids:
254
router = self.get_router(context, router_id)
255
self._add_type_and_hosting_device_info(context.elevated(),
257
routers.append(router)
258
self.l3_cfg_rpc_notifier.routers_updated(
259
context, routers, 'disassociate_floatingips')
260
# since caller assumes that we handled notifications on its
261
# behalf, return nothing
265
@lockutils.synchronized('routerbacklog', 'neutron-')
266
def _handle_non_responding_hosting_devices(self, context, hosting_devices,
268
"""Handle hosting devices determined to be "dead".
270
This function is called by the hosting device manager.
271
Service plugins are supposed to extend the 'affected_resources'
272
dictionary. Hence, we add the id of Neutron routers that are
273
hosted in <hosting_devices>.
275
param: hosting_devices - list of dead hosting devices
276
param: affected_resources - dict with list of affected logical
277
resources per hosting device:
278
{'hd_id1': {'routers': [id1, id2, ...],
281
'hd_id2': {'routers': [id3, id4, ...],
286
LOG.debug('Processing affected routers in dead hosting devices')
287
with context.session.begin(subtransactions=True):
288
for hd in hosting_devices:
289
hd_bindings = self._get_hosting_device_bindings(context,
292
for binding in hd_bindings:
293
router_ids.append(binding['router_id'])
294
if binding['auto_schedule']:
295
self.backlog_router(binding['router'])
297
affected_resources[hd['id']].update(
298
{'routers': router_ids})
300
affected_resources[hd['id']] = {'routers': router_ids}
302
def get_sync_data_ext(self, context, router_ids=None, active=None):
303
"""Query routers and their related floating_ips, interfaces.
305
Adds information about hosting device as well as trunking.
307
with context.session.begin(subtransactions=True):
308
sync_data = (super(L3RouterApplianceDBMixin, self).
309
get_sync_data(context, router_ids, active))
310
for router in sync_data:
311
self._add_type_and_hosting_device_info(context, router)
312
plg_drv = self.get_hosting_device_plugging_driver()
313
if plg_drv and router['hosting_device']:
314
self._add_hosting_port_info(context, router, plg_drv)
317
def schedule_router_on_hosting_device(self, context, r_hd_binding):
318
LOG.info(_LI('Attempting to schedule router %s.'),
319
r_hd_binding['router']['id'])
320
result = self._create_csr1kv_vm_hosting_device(context.elevated())
322
# CSR1kv hosting device creation was unsuccessful so backlog
323
# it for another scheduling attempt later.
324
self.backlog_router(r_hd_binding['router'])
326
with context.session.begin(subtransactions=True):
327
router = r_hd_binding['router']
328
r_hd_binding.hosting_device = result
329
self.remove_router_from_backlog(router['id'])
330
LOG.info(_LI('Successfully scheduled router %(r_id)s to '
331
'hosting device %(d_id)s'),
332
{'r_id': r_hd_binding['router']['id'],
333
'd_id': result['id']})
336
def unschedule_router_from_hosting_device(self, context, r_hd_binding):
337
LOG.info(_LI('Un-schedule router %s.'),
338
r_hd_binding['router']['id'])
339
hosting_device = r_hd_binding['hosting_device']
340
if r_hd_binding['hosting_device'] is None:
342
self._delete_service_vm_hosting_device(context.elevated(),
345
@lockutils.synchronized('routers', 'neutron-')
346
def backlog_router(self, router):
347
if ((router or {}).get('id') is None or
348
router['id'] in self._backlogged_routers):
350
LOG.info(_LI('Backlogging router %s for renewed scheduling attempt '
351
'later'), router['id'])
352
self._backlogged_routers[router['id']] = router
354
@lockutils.synchronized('routers', 'neutron-')
355
def remove_router_from_backlog(self, id):
356
self._backlogged_routers.pop(id, None)
357
LOG.info(_LI('Router %s removed from backlog'), id)
359
@lockutils.synchronized('routerbacklog', 'neutron-')
360
def _process_backlogged_routers(self):
361
if self._refresh_router_backlog:
362
self._sync_router_backlog()
363
if not self._backlogged_routers:
365
context = n_context.get_admin_context()
366
scheduled_routers = []
367
LOG.info(_LI('Processing router (scheduling) backlog'))
369
for r_id, router in self._backlogged_routers.items():
370
self._add_type_and_hosting_device_info(context, router)
371
if router.get('hosting_device'):
372
# scheduling attempt succeeded
373
scheduled_routers.append(router)
374
self._backlogged_routers.pop(r_id, None)
375
# notify cfg agents so the scheduled routers are instantiated
376
if scheduled_routers:
377
self.l3_cfg_rpc_notifier.routers_updated(context,
380
def _setup_backlog_handling(self):
381
self._heartbeat = loopingcall.FixedIntervalLoopingCall(
382
self._process_backlogged_routers)
383
self._heartbeat.start(
384
interval=cfg.CONF.general.backlog_processing_interval)
386
def _sync_router_backlog(self):
387
LOG.info(_LI('Synchronizing router (scheduling) backlog'))
388
context = n_context.get_admin_context()
389
query = context.session.query(l3_models.RouterHostingDeviceBinding)
390
query = query.options(joinedload('router'))
391
query = query.filter(
392
l3_models.RouterHostingDeviceBinding.hosting_device_id ==
394
for binding in query:
395
router = self._make_router_dict(binding.router,
396
process_extensions=False)
397
self._backlogged_routers[binding.router_id] = router
398
self._refresh_router_backlog = False
400
def _get_router_binding_info(self, context, id, load_hd_info=True):
401
query = context.session.query(l3_models.RouterHostingDeviceBinding)
403
query = query.options(joinedload('hosting_device'))
404
query = query.filter(l3_models.RouterHostingDeviceBinding.router_id ==
408
except exc.NoResultFound:
409
# This should not happen
410
LOG.error(_LE('DB inconsistency: No type and hosting info '
411
'associated with router %s'), id)
412
raise RouterBindingInfoError(router_id=id)
413
except exc.MultipleResultsFound:
414
# This should not happen either
415
LOG.error(_LE('DB inconsistency: Multiple type and hosting info '
416
'associated with router %s'), id)
417
raise RouterBindingInfoError(router_id=id)
419
def _get_hosting_device_bindings(self, context, id, load_routers=False,
420
load_hosting_device=False):
421
query = context.session.query(l3_models.RouterHostingDeviceBinding)
423
query = query.options(joinedload('router'))
424
if load_hosting_device:
425
query = query.options(joinedload('hosting_device'))
426
query = query.filter(
427
l3_models.RouterHostingDeviceBinding.hosting_device_id == id)
430
def _add_type_and_hosting_device_info(self, context, router,
431
binding_info=None, schedule=True):
432
"""Adds type and hosting device information to a router."""
434
if binding_info is None:
435
binding_info = self._get_router_binding_info(context,
437
except RouterBindingInfoError:
438
LOG.error(_LE('DB inconsistency: No hosting info associated with '
439
'router %s'), router['id'])
440
router['hosting_device'] = None
442
router['router_type'] = {
444
'name': 'CSR1kv_router',
445
'cfg_agent_driver': (cfg.CONF.hosting_devices
446
.csr1kv_cfgagent_router_driver)}
447
if binding_info.hosting_device is None and schedule:
448
# This router has not been scheduled to a hosting device
449
# so we try to do it now.
450
self.schedule_router_on_hosting_device(context, binding_info)
451
context.session.expire(binding_info)
452
if binding_info.hosting_device is None:
453
router['hosting_device'] = None
455
router['hosting_device'] = self.get_device_info_for_agent(
456
binding_info.hosting_device)
458
def _add_hosting_port_info(self, context, router, plugging_driver):
459
"""Adds hosting port information to router ports.
461
We only populate hosting port info, i.e., reach here, if the
462
router has been scheduled to a hosting device. Hence this
463
a good place to allocate hosting ports to the router ports.
465
# cache of hosting port information: {mac_addr: {'name': port_name}}
467
if router['external_gateway_info'] is not None:
468
h_info, did_allocation = self._populate_hosting_info_for_port(
469
context, router['id'], router['gw_port'],
470
router['hosting_device'], hosting_pdata, plugging_driver)
471
for itfc in router.get(l3_constants.INTERFACE_KEY, []):
472
h_info, did_allocation = self._populate_hosting_info_for_port(
473
context, router['id'], itfc, router['hosting_device'],
474
hosting_pdata, plugging_driver)
476
def _populate_hosting_info_for_port(self, context, router_id, port,
477
hosting_device, hosting_pdata,
479
port_db = self._core_plugin._get_port(context, port['id'])
480
h_info = port_db.hosting_info
481
new_allocation = False
483
# The port does not yet have a hosting port so allocate one now
484
h_info = self._allocate_hosting_port(
485
context, router_id, port_db, hosting_device['id'],
488
# This should not happen but just in case ...
489
port['hosting_info'] = None
490
return None, new_allocation
492
new_allocation = True
493
if hosting_pdata.get('mac') is None:
494
p_data = self._core_plugin.get_port(
495
context, h_info.hosting_port_id, ['mac_address', 'name'])
496
hosting_pdata['mac'] = p_data['mac_address']
497
hosting_pdata['name'] = p_data['name']
498
# Including MAC address of hosting port so L3CfgAgent can easily
499
# determine which VM VIF to configure VLAN sub-interface on.
500
port['hosting_info'] = {'hosting_port_id': h_info.hosting_port_id,
501
'hosting_mac': hosting_pdata.get('mac'),
502
'hosting_port_name': hosting_pdata.get('name')}
503
plugging_driver.extend_hosting_port_info(
504
context, port_db, port['hosting_info'])
505
return h_info, new_allocation
507
def _allocate_hosting_port(self, context, router_id, port_db,
508
hosting_device_id, plugging_driver):
509
net_data = self._core_plugin.get_network(
510
context, port_db['network_id'], [pr_net.NETWORK_TYPE])
511
network_type = net_data.get(pr_net.NETWORK_TYPE)
512
alloc = plugging_driver.allocate_hosting_port(
513
context, router_id, port_db, network_type, hosting_device_id)
515
LOG.error(_LE('Failed to allocate hosting port for port %s'),
518
with context.session.begin(subtransactions=True):
519
h_info = l3_models.HostedHostingPortBinding(
520
logical_resource_id=router_id,
521
logical_port_id=port_db['id'],
522
network_type=network_type,
523
hosting_port_id=alloc['allocated_port_id'],
524
segmentation_id=alloc['allocated_vlan'])
525
context.session.add(h_info)
526
context.session.expire(port_db)
527
# allocation succeeded so establish connectivity for logical port
528
context.session.expire(h_info)
529
plugging_driver.setup_logical_port_connectivity(context, port_db)
532
def _get_router_port_db_on_subnet(self, context, router_id, subnet):
534
rport_qry = context.session.query(models_v2.Port)
535
ports = rport_qry.filter_by(
537
device_owner=l3_db.DEVICE_OWNER_ROUTER_INTF,
538
network_id=subnet['network_id'])
540
if p['fixed_ips'][0]['subnet_id'] == subnet['id']:
542
except exc.NoResultFound:
545
def list_active_sync_routers_on_hosting_devices(self, context, host,
547
hosting_device_ids=None):
548
agent = self._get_agent_by_type_and_host(
549
context, c_const.AGENT_TYPE_CFG, host)
550
if not agent.admin_state_up:
552
query = context.session.query(
553
l3_models.RouterHostingDeviceBinding.router_id)
554
query = query.join(l3_models.HostingDevice)
555
query = query.filter(l3_models.HostingDevice.cfg_agent_id == agent.id)
557
if len(router_ids) == 1:
558
query = query.filter(
559
l3_models.RouterHostingDeviceBinding.router_id ==
562
query = query.filter(
563
l3_models.RouterHostingDeviceBinding.router_id.in_(
565
if hosting_device_ids:
566
if len(hosting_device_ids) == 1:
567
query = query.filter(
568
l3_models.RouterHostingDeviceBinding.hosting_device_id ==
569
hosting_device_ids[0])
570
elif len(hosting_device_ids) > 1:
571
query = query.filter(
572
l3_models.RouterHostingDeviceBinding.hosting_device_id.in_(
574
router_ids = [item[0] for item in query]
576
return self.get_sync_data_ext(context, router_ids=router_ids,
581
def get_active_routers_for_host(self, context, host):
582
query = context.session.query(
583
l3_models.RouterHostingDeviceBinding.router_id)
586
l3_models.RouterHostingDeviceBinding.hosting_device_id ==
587
models_v2.Port.device_id)
588
query = query.join(p_binding.PortBindingPort)
589
query = query.filter(p_binding.PortBindingPort.host == host)
590
query = query.filter(models_v2.Port.name == 'mgmt')
591
router_ids = [item[0] for item in query]
593
return self.get_sync_data_ext(context, router_ids=router_ids,
599
def _agent_state_filter(check_active, last_heartbeat):
600
"""Filters only active agents, if requested."""
603
return not agents_db.AgentDbMixin.is_agent_down(last_heartbeat)
605
def get_host_for_router(self, context, router, admin_state_up=None,
607
query = context.session.query(agents_db.Agent.host,
608
agents_db.Agent.heartbeat_timestamp)
610
p_binding.PortBindingPort,
611
p_binding.PortBindingPort.host == agents_db.Agent.host)
614
models_v2.Port.id == p_binding.PortBindingPort.port_id)
616
l3_models.RouterHostingDeviceBinding,
617
l3_models.RouterHostingDeviceBinding.hosting_device_id ==
618
models_v2.Port.device_id)
619
query = query.filter(
620
agents_db.Agent.topic == topics.L3_AGENT,
621
l3_models.RouterHostingDeviceBinding.router_id == router)
622
if admin_state_up is not None:
623
query = query.filter(
624
agents_db.Agent.admin_state_up == admin_state_up)
625
entry = query.first()
626
if entry and L3RouterApplianceDBMixin._agent_state_filter(check_active,