1
# Copyright 2014 Cisco Systems, Inc. All rights reserved.
3
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4
# not use this file except in compliance with the License. You may obtain
5
# a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12
# License for the specific language governing permissions and limitations
16
eventlet.monkey_patch()
21
from oslo.config import cfg
22
from oslo import messaging
23
from oslo.utils import importutils
24
from oslo.utils import timeutils
25
from oslo_concurrency import lockutils
27
from neutron.agent.common import config
28
from neutron.agent.linux import external_process
29
from neutron.agent.linux import interface
30
from neutron.agent import rpc as agent_rpc
31
from neutron.common import config as common_config
32
from neutron.common import rpc as n_rpc
33
from neutron.common import topics
34
from neutron import context as n_context
35
from neutron.i18n import _LE, _LI, _LW
36
from neutron import manager
37
from neutron.openstack.common import log as logging
38
from neutron.openstack.common import loopingcall
39
from neutron.openstack.common import periodic_task
40
from neutron.openstack.common import service
41
from neutron.plugins.cisco.cfg_agent import device_status
42
from neutron.plugins.cisco.common import cisco_constants as c_constants
43
from neutron import service as neutron_service
45
LOG = logging.getLogger(__name__)
47
# Constants for agent registration.
48
REGISTRATION_RETRY_DELAY = 2
49
MAX_REGISTRATION_ATTEMPTS = 30
52
class CiscoDeviceManagementApi(object):
53
"""Agent side of the device manager RPC API."""
55
def __init__(self, topic, host):
57
target = messaging.Target(topic=topic, version='1.0')
58
self.client = n_rpc.get_client(target)
60
def report_dead_hosting_devices(self, context, hd_ids=None):
61
"""Report that a hosting device cannot be contacted (presumed dead).
63
:param: context: session context
64
:param: hosting_device_ids: list of non-responding hosting devices
67
cctxt = self.client.prepare()
68
cctxt.cast(context, 'report_non_responding_hosting_devices',
69
host=self.host, hosting_device_ids=hd_ids)
71
def register_for_duty(self, context):
72
"""Report that a config agent is ready for duty."""
73
cctxt = self.client.prepare()
74
return cctxt.call(context, 'register_for_duty', host=self.host)
77
class CiscoCfgAgent(manager.Manager):
80
This class defines a generic configuration agent for cisco devices which
81
implement network services in the cloud backend. It is based on the
82
(reference) l3-agent, but has been enhanced to support multiple services
83
in addition to routing.
85
The agent acts like as a container for services and does not do any
86
service specific processing or configuration itself.
87
All service specific processing is delegated to service helpers which
88
the agent loads. Thus routing specific updates are processed by the
89
routing service helper, firewall by firewall helper etc.
90
A further layer of abstraction is implemented by using device drivers for
91
encapsulating all configuration operations of a service on a device.
92
Device drivers are specific to a particular device/service VM eg: CSR1kv.
94
The main entry points in this class are the `process_services()` and
97
target = messaging.Target(version='1.1')
100
cfg.IntOpt('rpc_loop_interval', default=10,
101
help=_("Interval when the process_services() loop "
102
"executes in seconds. This is when the config agent "
103
"lets each service helper to process its neutron "
105
cfg.StrOpt('routing_svc_helper_class',
106
default='neutron.plugins.cisco.cfg_agent.service_helpers'
107
'.routing_svc_helper.RoutingServiceHelper',
108
help=_("Path of the routing service helper class.")),
111
def __init__(self, host, conf=None):
112
self.conf = conf or cfg.CONF
113
self._dev_status = device_status.DeviceStatus()
114
self.context = n_context.get_admin_context_without_session()
116
self._initialize_rpc(host)
117
self._initialize_service_helpers(host)
118
self._start_periodic_tasks()
119
super(CiscoCfgAgent, self).__init__(host=self.conf.host)
121
def _initialize_rpc(self, host):
122
self.devmgr_rpc = CiscoDeviceManagementApi(topics.L3PLUGIN, host)
124
def _initialize_service_helpers(self, host):
125
svc_helper_class = self.conf.cfg_agent.routing_svc_helper_class
127
self.routing_service_helper = importutils.import_object(
128
svc_helper_class, host, self.conf, self)
129
except ImportError as e:
130
LOG.warning(_LW("Error in loading routing service helper. Class "
131
"specified is %(class)s. Reason:%(reason)s"),
132
{'class': self.conf.cfg_agent.routing_svc_helper_class,
134
self.routing_service_helper = None
136
def _start_periodic_tasks(self):
137
self.loop = loopingcall.FixedIntervalLoopingCall(self.process_services)
138
self.loop.start(interval=self.conf.cfg_agent.rpc_loop_interval)
140
def after_start(self):
141
LOG.info(_LI("Cisco cfg agent started"))
143
def get_routing_service_helper(self):
144
return self.routing_service_helper
147
@periodic_task.periodic_task
148
def _backlog_task(self, context):
149
"""Process backlogged devices."""
150
LOG.debug("Processing backlog.")
151
self._process_backlogged_hosting_devices(context)
153
## Main orchestrator ##
154
@lockutils.synchronized('cisco-cfg-agent', 'neutron-')
155
def process_services(self, device_ids=None, removed_devices_info=None):
156
"""Process services managed by this config agent.
158
This method is invoked by any of three scenarios.
160
1. Invoked by a periodic task running every `RPC_LOOP_INTERVAL`
161
seconds. This is the most common scenario.
162
In this mode, the method is called without any arguments.
164
2. Called by the `_process_backlogged_hosting_devices()` as part of
165
the backlog processing task. In this mode, a list of device_ids
166
are passed as arguments. These are the list of backlogged
167
hosting devices that are now reachable and we want to sync services
170
3. Called by the `hosting_devices_removed()` method. This is when
171
the config agent has received a notification from the plugin that
172
some hosting devices are going to be removed. The payload contains
173
the details of the hosting devices and the associated neutron
174
resources on them which should be processed and removed.
176
To avoid race conditions with these scenarios, this function is
179
This method goes on to invoke `process_service()` on the
180
different service helpers.
182
:param device_ids : List of devices that are now available and needs
184
:param removed_devices_info: Info about the hosting devices which
185
are going to be removed and details of the resources hosted on them.
188
'hosting_data': {'hd_id1': {'routers': [id1, id2, ...]},
189
'hd_id2': {'routers': [id3, id4, ...]}, ...},
190
'deconfigure': True/False
194
LOG.debug("Processing services started")
195
# Now we process only routing service, additional services will be
197
if self.routing_service_helper:
198
self.routing_service_helper.process_service(device_ids,
199
removed_devices_info)
201
LOG.warning(_LW("No routing service helper loaded"))
202
LOG.debug("Processing services completed")
204
def _process_backlogged_hosting_devices(self, context):
205
"""Process currently backlogged devices.
207
Go through the currently backlogged devices and process them.
208
For devices which are now reachable (compared to last time), we call
209
`process_services()` passing the now reachable device's id.
210
For devices which have passed the `hosting_device_dead_timeout` and
211
hence presumed dead, execute a RPC to the plugin informing that.
212
:param context: RPC context
215
res = self._dev_status.check_backlogged_hosting_devices()
217
self.process_services(device_ids=res['reachable'])
219
LOG.debug("Reporting dead hosting devices: %s", res['dead'])
220
self.devmgr_rpc.report_dead_hosting_devices(context,
223
def hosting_devices_removed(self, context, payload):
224
"""Deal with hosting device removed RPC message."""
226
if payload['hosting_data']:
227
if payload['hosting_data'].keys():
228
self.process_services(removed_devices_info=payload)
229
except KeyError as e:
230
LOG.error(_LE("Invalid payload format for received RPC message "
231
"`hosting_devices_removed`. Error is %(error)s. "
232
"Payload is %(payload)s"),
233
{'error': e, 'payload': payload})
236
class CiscoCfgAgentWithStateReport(CiscoCfgAgent):
238
def __init__(self, host, conf=None):
239
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
241
'binary': 'neutron-cisco-cfg-agent',
243
'topic': c_constants.CFG_AGENT,
244
'configurations': {},
246
'agent_type': c_constants.AGENT_TYPE_CFG}
247
report_interval = cfg.CONF.AGENT.report_interval
249
self._initialize_rpc(host)
250
self._agent_registration()
251
super(CiscoCfgAgentWithStateReport, self).__init__(host=host,
254
self.heartbeat = loopingcall.FixedIntervalLoopingCall(
256
self.heartbeat.start(interval=report_interval)
258
def _agent_registration(self):
259
"""Register this agent with the server.
261
This method registers the cfg agent with the neutron server so hosting
262
devices can be assigned to it. In case the server is not ready to
263
accept registration (it sends a False) then we retry registration
264
for `MAX_REGISTRATION_ATTEMPTS` with a delay of
265
`REGISTRATION_RETRY_DELAY`. If there is no server response or a
266
failure to register after the required number of attempts,
267
the agent stops itself.
269
for attempts in xrange(MAX_REGISTRATION_ATTEMPTS):
270
context = n_context.get_admin_context_without_session()
271
self.send_agent_report(self.agent_state, context)
272
res = self.devmgr_rpc.register_for_duty(context)
274
LOG.info(_LI("[Agent registration] Agent successfully "
278
LOG.warning(_LW("[Agent registration] Neutron server said "
279
"that device manager was not ready. Retrying "
280
"in %0.2f seconds "), REGISTRATION_RETRY_DELAY)
281
time.sleep(REGISTRATION_RETRY_DELAY)
283
LOG.error(_LE("[Agent registration] Neutron server said that "
284
"no device manager was found. Cannot continue. "
286
raise SystemExit("Cfg Agent exiting")
287
LOG.error(_LE("[Agent registration] %d unsuccessful registration "
288
"attempts. Exiting!"), MAX_REGISTRATION_ATTEMPTS)
289
raise SystemExit("Cfg Agent exiting")
291
def _report_state(self):
292
"""Report state to the plugin.
294
This task run every `report_interval` period.
295
Collects, creates and sends a summary of the services currently
296
managed by this agent. Data is collected from the service helper(s).
297
Refer the `configurations` dict for the parameters reported.
300
LOG.debug("Report state task started")
302
if self.routing_service_helper:
303
configurations = self.routing_service_helper.collect_state(
304
self.agent_state['configurations'])
305
non_responding = self._dev_status.get_backlogged_hosting_devices_info()
306
configurations['non_responding_hosting_devices'] = non_responding
307
self.agent_state['configurations'] = configurations
308
self.agent_state['local_time'] = str(timeutils.utcnow())
309
LOG.debug("State report data: %s", pprint.pformat(self.agent_state))
310
self.send_agent_report(self.agent_state, self.context)
312
def send_agent_report(self, report, context):
313
"""Send the agent report via RPC."""
315
self.state_rpc.report_state(context, report, self.use_call)
316
report.pop('start_flag', None)
317
self.use_call = False
318
LOG.debug("Send agent report successfully completed")
319
except AttributeError:
320
# This means the server does not support report_state
321
LOG.warning(_LW("Neutron server does not support state report. "
322
"State report for this agent will be disabled."))
323
self.heartbeat.stop()
326
LOG.exception(_LE("Failed sending agent report!"))
329
def main(manager='neutron.plugins.cisco.cfg_agent.'
330
'cfg_agent.CiscoCfgAgentWithStateReport'):
332
conf.register_opts(CiscoCfgAgent.OPTS, "cfg_agent")
333
config.register_agent_state_opts_helper(conf)
334
config.register_root_helper(conf)
335
conf.register_opts(interface.OPTS)
336
conf.register_opts(external_process.OPTS)
337
common_config.init(sys.argv[1:])
338
conf(project='neutron')
339
config.setup_logging()
340
server = neutron_service.Service.create(
341
binary='neutron-cisco-cfg-agent',
342
topic=c_constants.CFG_AGENT,
343
report_interval=cfg.CONF.AGENT.report_interval,
345
service.launch(server).wait()