1
# Copyright 2014 Cisco Systems, Inc. All rights reserved.
3
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4
# not use this file except in compliance with the License. You may obtain
5
# a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12
# License for the specific language governing permissions and limitations
19
from oslo import messaging
20
from oslo.utils import excutils
22
from neutron.common import constants as l3_constants
23
from neutron.common import rpc as n_rpc
24
from neutron.common import topics
25
from neutron.common import utils as common_utils
26
from neutron import context as n_context
27
from neutron.i18n import _LE, _LI, _LW
28
from neutron.openstack.common import log as logging
29
from neutron.plugins.cisco.cfg_agent import cfg_exceptions
30
from neutron.plugins.cisco.cfg_agent.device_drivers import driver_mgr
31
from neutron.plugins.cisco.cfg_agent import device_status
32
from neutron.plugins.cisco.common import cisco_constants as c_constants
34
LOG = logging.getLogger(__name__)
36
N_ROUTER_PREFIX = 'nrouter-'
39
class RouterInfo(object):
40
"""Wrapper class around the (neutron) router dictionary.
42
Information about the neutron router is exchanged as a python dictionary
43
between plugin and config agent. RouterInfo is a wrapper around that dict,
44
with attributes for common parameters. These attributes keep the state
45
of the current router configuration, and are used for detecting router
46
state changes when an updated router dict is received.
48
This is a modified version of the RouterInfo class defined in the
49
(reference) l3-agent implementation, for use with cisco config agent.
52
def __init__(self, router_id, router):
53
self.router_id = router_id
54
self.ex_gw_port = None
55
self._snat_enabled = None
56
self._snat_action = None
57
self.internal_ports = []
58
self.floating_ips = []
62
self.ha_info = router.get('ha_info')
73
def snat_enabled(self):
74
return self._snat_enabled
77
def router(self, value):
81
# enable_snat by default if it wasn't specified by plugin
82
self._snat_enabled = self._router.get('enable_snat', True)
84
def router_name(self):
85
return N_ROUTER_PREFIX + self.router_id
88
class CiscoRoutingPluginApi(object):
89
"""RoutingServiceHelper(Agent) side of the routing RPC API."""
91
def __init__(self, topic, host):
93
target = messaging.Target(topic=topic, version='1.0')
94
self.client = n_rpc.get_client(target)
96
def get_routers(self, context, router_ids=None, hd_ids=None):
97
"""Make a remote process call to retrieve the sync data for routers.
99
:param context: session context
100
:param router_ids: list of routers to fetch
101
:param hd_ids : hosting device ids, only routers assigned to these
102
hosting devices will be returned.
104
cctxt = self.client.prepare(version='1.1')
105
return cctxt.call(context, 'cfg_sync_routers', host=self.host,
106
router_ids=router_ids, hosting_device_ids=hd_ids)
109
class RoutingServiceHelper(object):
111
def __init__(self, host, conf, cfg_agent):
113
self.cfg_agent = cfg_agent
114
self.context = n_context.get_admin_context_without_session()
115
self.plugin_rpc = CiscoRoutingPluginApi(topics.L3PLUGIN, host)
116
self._dev_status = device_status.DeviceStatus()
117
self._drivermgr = driver_mgr.DeviceDriverManager()
119
self.router_info = {}
120
self.updated_routers = set()
121
self.removed_routers = set()
122
self.sync_devices = set()
124
self.topic = '%s.%s' % (c_constants.CFG_AGENT_L3_ROUTING, host)
128
def _setup_rpc(self):
129
self.conn = n_rpc.create_connection(new=True)
130
self.endpoints = [self]
131
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
132
self.conn.consume_in_threads()
134
### Notifications from Plugin ####
136
def router_deleted(self, context, routers):
137
"""Deal with router deletion RPC message."""
138
LOG.debug('Got router deleted notification for %s', routers)
139
self.removed_routers.update(routers)
141
def routers_updated(self, context, routers):
142
"""Deal with routers modification and creation RPC message."""
143
LOG.debug('Got routers updated notification :%s', routers)
145
# This is needed for backward compatibility
146
if isinstance(routers[0], dict):
147
routers = [router['id'] for router in routers]
148
self.updated_routers.update(routers)
150
def router_removed_from_agent(self, context, payload):
151
LOG.debug('Got router removed from agent :%r', payload)
152
self.removed_routers.add(payload['router_id'])
154
def router_added_to_agent(self, context, payload):
155
LOG.debug('Got router added to agent :%r', payload)
156
self.routers_updated(context, payload)
158
# Routing service helper public methods
160
def process_service(self, device_ids=None, removed_devices_info=None):
162
LOG.debug("Routing service processing started")
166
all_routers_flag = False
168
LOG.debug("FullSync flag is on. Starting fullsync")
169
# Setting all_routers_flag and clear the global full_sync flag
170
all_routers_flag = True
171
self.fullsync = False
172
self.updated_routers.clear()
173
self.removed_routers.clear()
174
self.sync_devices.clear()
175
routers = self._fetch_router_info(all_routers=True)
177
if self.updated_routers:
178
router_ids = list(self.updated_routers)
179
LOG.debug("Updated routers:%s", router_ids)
180
self.updated_routers.clear()
181
routers = self._fetch_router_info(router_ids=router_ids)
183
LOG.debug("Adding new devices:%s", device_ids)
184
self.sync_devices = set(device_ids) | self.sync_devices
185
if self.sync_devices:
186
sync_devices_list = list(self.sync_devices)
187
LOG.debug("Fetching routers on:%s", sync_devices_list)
188
routers.extend(self._fetch_router_info(
189
device_ids=sync_devices_list))
190
self.sync_devices.clear()
191
if removed_devices_info:
192
if removed_devices_info.get('deconfigure'):
193
ids = self._get_router_ids_from_removed_devices_info(
194
removed_devices_info)
195
self.removed_routers = self.removed_routers | set(ids)
196
if self.removed_routers:
197
removed_routers_ids = list(self.removed_routers)
198
LOG.debug("Removed routers:%s", removed_routers_ids)
199
for r in removed_routers_ids:
200
if r in self.router_info:
201
removed_routers.append(self.router_info[r].router)
203
# Sort on hosting device
205
resources['routers'] = routers
207
resources['removed_routers'] = removed_routers
208
hosting_devices = self._sort_resources_per_hosting_device(
211
# Dispatch process_services() for each hosting device
212
pool = eventlet.GreenPool()
213
for device_id, resources in hosting_devices.items():
214
routers = resources.get('routers')
215
removed_routers = resources.get('removed_routers')
216
pool.spawn_n(self._process_routers, routers, removed_routers,
217
device_id, all_routers=all_routers_flag)
219
if removed_devices_info:
220
for hd_id in removed_devices_info['hosting_data']:
221
self._drivermgr.remove_driver_for_hosting_device(hd_id)
222
LOG.debug("Routing service processing successfully completed")
224
LOG.exception(_LE("Failed processing routers"))
227
def collect_state(self, configurations):
228
"""Collect state from this helper.
230
A set of attributes which summarizes the state of the routers and
231
configurations managed by this config agent.
232
:param configurations: dict of configuration values
233
:return dict of updated configuration values
238
router_infos = self.router_info.values()
239
num_routers = len(router_infos)
240
num_hd_routers = collections.defaultdict(int)
241
for ri in router_infos:
242
ex_gw_port = ri.router.get('gw_port')
245
num_interfaces += len(ri.router.get(
246
l3_constants.INTERFACE_KEY, []))
247
num_floating_ips += len(ri.router.get(
248
l3_constants.FLOATINGIP_KEY, []))
249
hd = ri.router['hosting_device']
251
num_hd_routers[hd['id']] += 1
252
routers_per_hd = dict((hd_id, {'routers': num})
253
for hd_id, num in num_hd_routers.items())
254
non_responding = self._dev_status.get_backlogged_hosting_devices()
255
configurations['total routers'] = num_routers
256
configurations['total ex_gw_ports'] = num_ex_gw_ports
257
configurations['total interfaces'] = num_interfaces
258
configurations['total floating_ips'] = num_floating_ips
259
configurations['hosting_devices'] = routers_per_hd
260
configurations['non_responding_hosting_devices'] = non_responding
261
return configurations
263
# Routing service helper internal methods
265
def _fetch_router_info(self, router_ids=None, device_ids=None,
267
"""Fetch router dict from the routing plugin.
269
:param router_ids: List of router_ids of routers to fetch
270
:param device_ids: List of device_ids whose routers to fetch
271
:param all_routers: If True fetch all the routers for this agent.
272
:return: List of router dicts of format:
273
[ {router_dict1}, {router_dict2},.....]
277
return self.plugin_rpc.get_routers(self.context)
279
return self.plugin_rpc.get_routers(self.context,
280
router_ids=router_ids)
282
return self.plugin_rpc.get_routers(self.context,
284
except messaging.MessagingException:
285
LOG.exception(_LE("RPC Error in fetching routers from plugin"))
289
def _get_router_ids_from_removed_devices_info(removed_devices_info):
290
"""Extract router_ids from the removed devices info dict.
292
:param removed_devices_info: Dict of removed devices and their
293
associated resources.
296
'hosting_data': {'hd_id1': {'routers': [id1, id2, ...]},
297
'hd_id2': {'routers': [id3, id4, ...]},
300
'deconfigure': True/False
302
:return removed_router_ids: List of removed router ids
304
removed_router_ids = []
305
for hd_id, resources in removed_devices_info['hosting_data'].items():
306
removed_router_ids += resources.get('routers', [])
307
return removed_router_ids
310
def _sort_resources_per_hosting_device(resources):
311
"""This function will sort the resources on hosting device.
313
The sorting on hosting device is done by looking up the
314
`hosting_device` attribute of the resource, and its `id`.
316
:param resources: a dict with key of resource name
317
:return dict sorted on the hosting device of input resource. Format:
319
'hd_id1' : {'routers':[routers],
320
'removed_routers':[routers], .... }
321
'hd_id2' : {'routers':[routers], .. }
326
for key in resources.keys():
327
for r in resources.get(key) or []:
328
hd_id = r['hosting_device']['id']
329
hosting_devices.setdefault(hd_id, {})
330
hosting_devices[hd_id].setdefault(key, []).append(r)
331
return hosting_devices
333
def _process_routers(self, routers, removed_routers,
334
device_id=None, all_routers=False):
335
"""Process the set of routers.
337
Iterating on the set of routers received and comparing it with the
338
set of routers already in the routing service helper, new routers
339
which are added are identified. Before processing check the
340
reachability (via ping) of hosting device where the router is hosted.
341
If device is not reachable it is backlogged.
343
For routers which are only updated, call `_process_router()` on them.
345
When all_routers is set to True (because of a full sync),
346
this will result in the detection and deletion of routers which
349
Whether the router can only be assigned to a particular hosting device
350
is decided and enforced by the plugin. No checks are done here.
352
:param routers: The set of routers to be processed
353
:param removed_routers: the set of routers which where removed
354
:param device_id: Id of the hosting device
355
:param all_routers: Flag for specifying a partial list of routers
360
prev_router_ids = set(self.router_info)
362
prev_router_ids = set(self.router_info) & set(
363
[router['id'] for router in routers])
364
cur_router_ids = set()
367
if not r['admin_state_up']:
369
cur_router_ids.add(r['id'])
370
hd = r['hosting_device']
371
if not self._dev_status.is_hosting_device_reachable(hd):
372
LOG.info(_LI("Router: %(id)s is on an unreachable "
373
"hosting device. "), {'id': r['id']})
375
if r['id'] not in self.router_info:
376
self._router_added(r['id'], r)
377
ri = self.router_info[r['id']]
379
self._process_router(ri)
380
except KeyError as e:
381
LOG.exception(_LE("Key Error, missing key: %s"), e)
382
self.updated_routers.add(r['id'])
384
except cfg_exceptions.DriverException as e:
385
LOG.exception(_LE("Driver Exception on router:%(id)s. "
386
"Error is %(e)s"), {'id': r['id'], 'e': e})
387
self.updated_routers.update(r['id'])
389
# identify and remove routers that no longer exist
390
for router_id in prev_router_ids - cur_router_ids:
391
self._router_removed(router_id)
393
for router in removed_routers:
394
self._router_removed(router['id'])
396
LOG.exception(_LE("Exception in processing routers on device:%s"),
398
self.sync_devices.add(device_id)
400
def _process_router(self, ri):
401
"""Process a router, apply latest configuration and update router_info.
403
Get the router dict from RouterInfo and proceed to detect changes
404
from the last known state. When new ports or deleted ports are
405
detected, `internal_network_added()` or `internal_networks_removed()`
406
are called accordingly. Similarly changes in ex_gw_port causes
407
`external_gateway_added()` or `external_gateway_removed()` calls.
408
Next, floating_ips and routes are processed. Also, latest state is
409
stored in ri.internal_ports and ri.ex_gw_port for future comparisons.
411
:param ri : RouterInfo object of the router being processed.
413
:raises: neutron.plugins.cisco.cfg_agent.cfg_exceptions.DriverException
414
if the configuration operation fails.
417
ex_gw_port = ri.router.get('gw_port')
418
ri.ha_info = ri.router.get('ha_info', None)
419
internal_ports = ri.router.get(l3_constants.INTERFACE_KEY, [])
420
existing_port_ids = set([p['id'] for p in ri.internal_ports])
421
current_port_ids = set([p['id'] for p in internal_ports
422
if p['admin_state_up']])
423
new_ports = [p for p in internal_ports
425
p['id'] in (current_port_ids - existing_port_ids)]
426
old_ports = [p for p in ri.internal_ports
427
if p['id'] not in current_port_ids]
430
self._set_subnet_info(p)
431
self._internal_network_added(ri, p, ex_gw_port)
432
ri.internal_ports.append(p)
435
self._internal_network_removed(ri, p, ri.ex_gw_port)
436
ri.internal_ports.remove(p)
438
if ex_gw_port and not ri.ex_gw_port:
439
self._set_subnet_info(ex_gw_port)
440
self._external_gateway_added(ri, ex_gw_port)
441
elif not ex_gw_port and ri.ex_gw_port:
442
self._external_gateway_removed(ri, ri.ex_gw_port)
445
self._process_router_floating_ips(ri, ex_gw_port)
447
ri.ex_gw_port = ex_gw_port
448
self._routes_updated(ri)
449
except cfg_exceptions.DriverException as e:
450
with excutils.save_and_reraise_exception():
451
self.updated_routers.update(ri.router_id)
454
def _process_router_floating_ips(self, ri, ex_gw_port):
455
"""Process a router's floating ips.
457
Compare current floatingips (in ri.floating_ips) with the router's
458
updated floating ips (in ri.router.floating_ips) and detect
459
flaoting_ips which were added or removed. Notify driver of
460
the change via `floating_ip_added()` or `floating_ip_removed()`.
462
:param ri: RouterInfo object of the router being processed.
463
:param ex_gw_port: Port dict of the external gateway port.
465
:raises: neutron.plugins.cisco.cfg_agent.cfg_exceptions.DriverException
466
if the configuration operation fails.
469
floating_ips = ri.router.get(l3_constants.FLOATINGIP_KEY, [])
470
existing_floating_ip_ids = set(
471
[fip['id'] for fip in ri.floating_ips])
472
cur_floating_ip_ids = set([fip['id'] for fip in floating_ips])
476
for fip in floating_ips:
478
# store to see if floatingip was remapped
479
id_to_fip_map[fip['id']] = fip
480
if fip['id'] not in existing_floating_ip_ids:
481
ri.floating_ips.append(fip)
482
self._floating_ip_added(ri, ex_gw_port,
483
fip['floating_ip_address'],
484
fip['fixed_ip_address'])
486
floating_ip_ids_to_remove = (existing_floating_ip_ids -
488
for fip in ri.floating_ips:
489
if fip['id'] in floating_ip_ids_to_remove:
490
ri.floating_ips.remove(fip)
491
self._floating_ip_removed(ri, ri.ex_gw_port,
492
fip['floating_ip_address'],
493
fip['fixed_ip_address'])
495
# handle remapping of a floating IP
496
new_fip = id_to_fip_map[fip['id']]
497
new_fixed_ip = new_fip['fixed_ip_address']
498
existing_fixed_ip = fip['fixed_ip_address']
499
if (new_fixed_ip and existing_fixed_ip and
500
new_fixed_ip != existing_fixed_ip):
501
floating_ip = fip['floating_ip_address']
502
self._floating_ip_removed(ri, ri.ex_gw_port,
505
self._floating_ip_added(ri, ri.ex_gw_port,
506
floating_ip, new_fixed_ip)
507
ri.floating_ips.remove(fip)
508
ri.floating_ips.append(new_fip)
510
def _router_added(self, router_id, router):
511
"""Operations when a router is added.
513
Create a new RouterInfo object for this router and add it to the
514
service helpers router_info dictionary. Then `router_added()` is
515
called on the device driver.
517
:param router_id: id of the router
518
:param router: router dict
521
ri = RouterInfo(router_id, router)
522
driver = self._drivermgr.set_driver(router)
523
driver.router_added(ri)
524
self.router_info[router_id] = ri
526
def _router_removed(self, router_id, deconfigure=True):
527
"""Operations when a router is removed.
529
Get the RouterInfo object corresponding to the router in the service
530
helpers's router_info dict. If deconfigure is set to True,
531
remove this router's configuration from the hosting device.
532
:param router_id: id of the router
533
:param deconfigure: if True, the router's configuration is deleted from
537
ri = self.router_info.get(router_id)
539
LOG.warning(_LW("Info for router %s was not found. "
540
"Skipping router removal"), router_id)
542
ri.router['gw_port'] = None
543
ri.router[l3_constants.INTERFACE_KEY] = []
544
ri.router[l3_constants.FLOATINGIP_KEY] = []
547
self._process_router(ri)
548
driver = self._drivermgr.get_driver(router_id)
549
driver.router_removed(ri, deconfigure)
550
self._drivermgr.remove_driver(router_id)
551
del self.router_info[router_id]
552
self.removed_routers.discard(router_id)
553
except cfg_exceptions.DriverException:
554
LOG.warning(_LW("Router remove for router_id: %s was incomplete. "
555
"Adding the router to removed_routers list"), router_id)
556
self.removed_routers.add(router_id)
557
# remove this router from updated_routers if it is there. It might
558
# end up there too if exception was thrown earlier inside
559
# `_process_router()`
560
self.updated_routers.discard(router_id)
562
def _internal_network_added(self, ri, port, ex_gw_port):
563
driver = self._drivermgr.get_driver(ri.id)
564
driver.internal_network_added(ri, port)
565
if ri.snat_enabled and ex_gw_port:
566
driver.enable_internal_network_NAT(ri, port, ex_gw_port)
568
def _internal_network_removed(self, ri, port, ex_gw_port):
569
driver = self._drivermgr.get_driver(ri.id)
570
driver.internal_network_removed(ri, port)
571
if ri.snat_enabled and ex_gw_port:
572
driver.disable_internal_network_NAT(ri, port, ex_gw_port)
574
def _external_gateway_added(self, ri, ex_gw_port):
575
driver = self._drivermgr.get_driver(ri.id)
576
driver.external_gateway_added(ri, ex_gw_port)
577
if ri.snat_enabled and ri.internal_ports:
578
for port in ri.internal_ports:
579
driver.enable_internal_network_NAT(ri, port, ex_gw_port)
581
def _external_gateway_removed(self, ri, ex_gw_port):
582
driver = self._drivermgr.get_driver(ri.id)
583
if ri.snat_enabled and ri.internal_ports:
584
for port in ri.internal_ports:
585
driver.disable_internal_network_NAT(ri, port, ex_gw_port)
586
driver.external_gateway_removed(ri, ex_gw_port)
588
def _floating_ip_added(self, ri, ex_gw_port, floating_ip, fixed_ip):
589
driver = self._drivermgr.get_driver(ri.id)
590
driver.floating_ip_added(ri, ex_gw_port, floating_ip, fixed_ip)
592
def _floating_ip_removed(self, ri, ex_gw_port, floating_ip, fixed_ip):
593
driver = self._drivermgr.get_driver(ri.id)
594
driver.floating_ip_removed(ri, ex_gw_port, floating_ip, fixed_ip)
596
def _routes_updated(self, ri):
597
"""Update the state of routes in the router.
599
Compares the current routes with the (configured) existing routes
600
and detect what was removed or added. Then configure the
601
logical router in the hosting device accordingly.
602
:param ri: RouterInfo corresponding to the router.
604
:raises: neutron.plugins.cisco.cfg_agent.cfg_exceptions.DriverException
605
if the configuration operation fails.
607
new_routes = ri.router['routes']
608
old_routes = ri.routes
609
adds, removes = common_utils.diff_list_of_dict(old_routes,
612
LOG.debug("Added route entry is '%s'", route)
613
# remove replaced route from deleted route
614
for del_route in removes:
615
if route['destination'] == del_route['destination']:
616
removes.remove(del_route)
617
driver = self._drivermgr.get_driver(ri.id)
618
driver.routes_updated(ri, 'replace', route)
620
for route in removes:
621
LOG.debug("Removed route entry is '%s'", route)
622
driver = self._drivermgr.get_driver(ri.id)
623
driver.routes_updated(ri, 'delete', route)
624
ri.routes = new_routes
627
def _set_subnet_info(port):
628
ips = port['fixed_ips']
630
raise Exception(_("Router port %s has no IP address") % port['id'])
632
LOG.error(_LE("Ignoring multiple IPs on router port %s"),
634
prefixlen = netaddr.IPNetwork(port['subnet']['cidr']).prefixlen
635
port['ip_cidr'] = "%s/%s" % (ips[0]['ip_address'], prefixlen)