~ubuntu-branches/ubuntu/vivid/neutron/vivid-updates

« back to all changes in this revision

Viewing changes to neutron/plugins/cisco/cfg_agent/cfg_agent.py

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2015-03-30 11:17:19 UTC
  • mfrom: (1.1.21)
  • Revision ID: package-import@ubuntu.com-20150330111719-h0gx7233p4jkkgfh
Tags: 1:2015.1~b3-0ubuntu1
* New upstream milestone release:
  - d/control: Align version requirements with upstream.
  - d/control: Add new dependency on oslo-log.
  - d/p/*: Rebase.
  - d/control,d/neutron-plugin-hyperv*: Dropped, decomposed into
    separate project upstream.
  - d/control,d/neutron-plugin-openflow*: Dropped, decomposed into
    separate project upstream.
  - d/neutron-common.install: Add neutron-rootwrap-daemon and 
    neutron-keepalived-state-change binaries.
  - d/rules: Ignore neutron-hyperv-agent when installing; only for Windows.
  - d/neutron-plugin-cisco.install: Drop neutron-cisco-cfg-agent as
    decomposed into separate project upstream.
  - d/neutron-plugin-vmware.install: Drop neutron-check-nsx-config and
    neutron-nsx-manage as decomposed into separate project upstream.
  - d/control: Add dependency on python-neutron-fwaas to neutron-l3-agent.
* d/pydist-overrides: Add overrides for oslo packages.
* d/control: Fixup type in package description (LP: #1263539).
* d/p/fixup-driver-test-execution.patch: Cherry pick fix from upstream VCS
  to support unit test exection in out-of-tree vendor drivers.
* d/neutron-common.postinst: Allow general access to /etc/neutron but limit
  access to root/neutron to /etc/neutron/neutron.conf to support execution
  of unit tests in decomposed vendor drivers.
* d/control: Add dependency on python-neutron-fwaas to neutron-l3-agent
  package.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright 2014 Cisco Systems, Inc.  All rights reserved.
2
 
#
3
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
4
 
#    not use this file except in compliance with the License. You may obtain
5
 
#    a copy of the License at
6
 
#
7
 
#         http://www.apache.org/licenses/LICENSE-2.0
8
 
#
9
 
#    Unless required by applicable law or agreed to in writing, software
10
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12
 
#    License for the specific language governing permissions and limitations
13
 
#    under the License.
14
 
 
15
 
import eventlet
16
 
eventlet.monkey_patch()
17
 
import pprint
18
 
import sys
19
 
import time
20
 
 
21
 
from oslo.config import cfg
22
 
from oslo import messaging
23
 
from oslo.utils import importutils
24
 
from oslo.utils import timeutils
25
 
from oslo_concurrency import lockutils
26
 
 
27
 
from neutron.agent.common import config
28
 
from neutron.agent.linux import external_process
29
 
from neutron.agent.linux import interface
30
 
from neutron.agent import rpc as agent_rpc
31
 
from neutron.common import config as common_config
32
 
from neutron.common import rpc as n_rpc
33
 
from neutron.common import topics
34
 
from neutron import context as n_context
35
 
from neutron.i18n import _LE, _LI, _LW
36
 
from neutron import manager
37
 
from neutron.openstack.common import log as logging
38
 
from neutron.openstack.common import loopingcall
39
 
from neutron.openstack.common import periodic_task
40
 
from neutron.openstack.common import service
41
 
from neutron.plugins.cisco.cfg_agent import device_status
42
 
from neutron.plugins.cisco.common import cisco_constants as c_constants
43
 
from neutron import service as neutron_service
44
 
 
45
 
LOG = logging.getLogger(__name__)
46
 
 
47
 
# Constants for agent registration.
48
 
REGISTRATION_RETRY_DELAY = 2
49
 
MAX_REGISTRATION_ATTEMPTS = 30
50
 
 
51
 
 
52
 
class CiscoDeviceManagementApi(object):
53
 
    """Agent side of the device manager RPC API."""
54
 
 
55
 
    def __init__(self, topic, host):
56
 
        self.host = host
57
 
        target = messaging.Target(topic=topic, version='1.0')
58
 
        self.client = n_rpc.get_client(target)
59
 
 
60
 
    def report_dead_hosting_devices(self, context, hd_ids=None):
61
 
        """Report that a hosting device cannot be contacted (presumed dead).
62
 
 
63
 
        :param: context: session context
64
 
        :param: hosting_device_ids: list of non-responding hosting devices
65
 
        :return: None
66
 
        """
67
 
        cctxt = self.client.prepare()
68
 
        cctxt.cast(context, 'report_non_responding_hosting_devices',
69
 
                   host=self.host, hosting_device_ids=hd_ids)
70
 
 
71
 
    def register_for_duty(self, context):
72
 
        """Report that a config agent is ready for duty."""
73
 
        cctxt = self.client.prepare()
74
 
        return cctxt.call(context, 'register_for_duty', host=self.host)
75
 
 
76
 
 
77
 
class CiscoCfgAgent(manager.Manager):
78
 
    """Cisco Cfg Agent.
79
 
 
80
 
    This class defines a generic configuration agent for cisco devices which
81
 
    implement network services in the cloud backend. It is based on the
82
 
    (reference) l3-agent, but has been enhanced to support multiple services
83
 
     in addition to routing.
84
 
 
85
 
    The agent acts like as a container for services and does not do any
86
 
    service specific processing or configuration itself.
87
 
    All service specific processing is delegated to service helpers which
88
 
    the agent loads. Thus routing specific updates are processed by the
89
 
    routing service helper, firewall by firewall helper etc.
90
 
    A further layer of abstraction is implemented by using device drivers for
91
 
    encapsulating all configuration operations of a service on a device.
92
 
    Device drivers are specific to a particular device/service VM eg: CSR1kv.
93
 
 
94
 
    The main entry points in this class are the `process_services()` and
95
 
    `_backlog_task()` .
96
 
    """
97
 
    target = messaging.Target(version='1.1')
98
 
 
99
 
    OPTS = [
100
 
        cfg.IntOpt('rpc_loop_interval', default=10,
101
 
                   help=_("Interval when the process_services() loop "
102
 
                          "executes in seconds. This is when the config agent "
103
 
                          "lets each service helper to process its neutron "
104
 
                          "resources.")),
105
 
        cfg.StrOpt('routing_svc_helper_class',
106
 
                   default='neutron.plugins.cisco.cfg_agent.service_helpers'
107
 
                           '.routing_svc_helper.RoutingServiceHelper',
108
 
                   help=_("Path of the routing service helper class.")),
109
 
    ]
110
 
 
111
 
    def __init__(self, host, conf=None):
112
 
        self.conf = conf or cfg.CONF
113
 
        self._dev_status = device_status.DeviceStatus()
114
 
        self.context = n_context.get_admin_context_without_session()
115
 
 
116
 
        self._initialize_rpc(host)
117
 
        self._initialize_service_helpers(host)
118
 
        self._start_periodic_tasks()
119
 
        super(CiscoCfgAgent, self).__init__(host=self.conf.host)
120
 
 
121
 
    def _initialize_rpc(self, host):
122
 
        self.devmgr_rpc = CiscoDeviceManagementApi(topics.L3PLUGIN, host)
123
 
 
124
 
    def _initialize_service_helpers(self, host):
125
 
        svc_helper_class = self.conf.cfg_agent.routing_svc_helper_class
126
 
        try:
127
 
            self.routing_service_helper = importutils.import_object(
128
 
                svc_helper_class, host, self.conf, self)
129
 
        except ImportError as e:
130
 
            LOG.warning(_LW("Error in loading routing service helper. Class "
131
 
                       "specified is %(class)s. Reason:%(reason)s"),
132
 
                     {'class': self.conf.cfg_agent.routing_svc_helper_class,
133
 
                      'reason': e})
134
 
            self.routing_service_helper = None
135
 
 
136
 
    def _start_periodic_tasks(self):
137
 
        self.loop = loopingcall.FixedIntervalLoopingCall(self.process_services)
138
 
        self.loop.start(interval=self.conf.cfg_agent.rpc_loop_interval)
139
 
 
140
 
    def after_start(self):
141
 
        LOG.info(_LI("Cisco cfg agent started"))
142
 
 
143
 
    def get_routing_service_helper(self):
144
 
        return self.routing_service_helper
145
 
 
146
 
    ## Periodic tasks ##
147
 
    @periodic_task.periodic_task
148
 
    def _backlog_task(self, context):
149
 
        """Process backlogged devices."""
150
 
        LOG.debug("Processing backlog.")
151
 
        self._process_backlogged_hosting_devices(context)
152
 
 
153
 
    ## Main orchestrator ##
154
 
    @lockutils.synchronized('cisco-cfg-agent', 'neutron-')
155
 
    def process_services(self, device_ids=None, removed_devices_info=None):
156
 
        """Process services managed by this config agent.
157
 
 
158
 
        This method is invoked by any of three scenarios.
159
 
 
160
 
        1. Invoked by a periodic task running every `RPC_LOOP_INTERVAL`
161
 
        seconds. This is the most common scenario.
162
 
        In this mode, the method is called without any arguments.
163
 
 
164
 
        2. Called by the `_process_backlogged_hosting_devices()` as part of
165
 
        the backlog processing task. In this mode, a list of device_ids
166
 
        are passed as arguments. These are the list of backlogged
167
 
        hosting devices that are now reachable and we want to sync services
168
 
        on them.
169
 
 
170
 
        3. Called by the `hosting_devices_removed()` method. This is when
171
 
        the config agent has received a notification from the plugin that
172
 
        some hosting devices are going to be removed. The payload contains
173
 
        the details of the hosting devices and the associated neutron
174
 
        resources on them which should be processed and removed.
175
 
 
176
 
        To avoid race conditions with these scenarios, this function is
177
 
        protected by a lock.
178
 
 
179
 
        This method goes on to invoke `process_service()` on the
180
 
        different service helpers.
181
 
 
182
 
        :param device_ids : List of devices that are now available and needs
183
 
         to be processed
184
 
        :param removed_devices_info: Info about the hosting devices which
185
 
        are going to be removed and details of the resources hosted on them.
186
 
        Expected Format:
187
 
                {
188
 
                 'hosting_data': {'hd_id1': {'routers': [id1, id2, ...]},
189
 
                                  'hd_id2': {'routers': [id3, id4, ...]}, ...},
190
 
                 'deconfigure': True/False
191
 
                }
192
 
        :return: None
193
 
        """
194
 
        LOG.debug("Processing services started")
195
 
        # Now we process only routing service, additional services will be
196
 
        # added in future
197
 
        if self.routing_service_helper:
198
 
            self.routing_service_helper.process_service(device_ids,
199
 
                                                        removed_devices_info)
200
 
        else:
201
 
            LOG.warning(_LW("No routing service helper loaded"))
202
 
        LOG.debug("Processing services completed")
203
 
 
204
 
    def _process_backlogged_hosting_devices(self, context):
205
 
        """Process currently backlogged devices.
206
 
 
207
 
        Go through the currently backlogged devices and process them.
208
 
        For devices which are now reachable (compared to last time), we call
209
 
        `process_services()` passing the now reachable device's id.
210
 
        For devices which have passed the `hosting_device_dead_timeout` and
211
 
        hence presumed dead, execute a RPC to the plugin informing that.
212
 
        :param context: RPC context
213
 
        :return: None
214
 
        """
215
 
        res = self._dev_status.check_backlogged_hosting_devices()
216
 
        if res['reachable']:
217
 
            self.process_services(device_ids=res['reachable'])
218
 
        if res['dead']:
219
 
            LOG.debug("Reporting dead hosting devices: %s", res['dead'])
220
 
            self.devmgr_rpc.report_dead_hosting_devices(context,
221
 
                                                        hd_ids=res['dead'])
222
 
 
223
 
    def hosting_devices_removed(self, context, payload):
224
 
        """Deal with hosting device removed RPC message."""
225
 
        try:
226
 
            if payload['hosting_data']:
227
 
                if payload['hosting_data'].keys():
228
 
                    self.process_services(removed_devices_info=payload)
229
 
        except KeyError as e:
230
 
            LOG.error(_LE("Invalid payload format for received RPC message "
231
 
                        "`hosting_devices_removed`. Error is %(error)s. "
232
 
                        "Payload is %(payload)s"),
233
 
                      {'error': e, 'payload': payload})
234
 
 
235
 
 
236
 
class CiscoCfgAgentWithStateReport(CiscoCfgAgent):
237
 
 
238
 
    def __init__(self, host, conf=None):
239
 
        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
240
 
        self.agent_state = {
241
 
            'binary': 'neutron-cisco-cfg-agent',
242
 
            'host': host,
243
 
            'topic': c_constants.CFG_AGENT,
244
 
            'configurations': {},
245
 
            'start_flag': True,
246
 
            'agent_type': c_constants.AGENT_TYPE_CFG}
247
 
        report_interval = cfg.CONF.AGENT.report_interval
248
 
        self.use_call = True
249
 
        self._initialize_rpc(host)
250
 
        self._agent_registration()
251
 
        super(CiscoCfgAgentWithStateReport, self).__init__(host=host,
252
 
                                                           conf=conf)
253
 
        if report_interval:
254
 
            self.heartbeat = loopingcall.FixedIntervalLoopingCall(
255
 
                self._report_state)
256
 
            self.heartbeat.start(interval=report_interval)
257
 
 
258
 
    def _agent_registration(self):
259
 
        """Register this agent with the server.
260
 
 
261
 
        This method registers the cfg agent with the neutron server so hosting
262
 
        devices can be assigned to it. In case the server is not ready to
263
 
        accept registration (it sends a False) then we retry registration
264
 
        for `MAX_REGISTRATION_ATTEMPTS` with a delay of
265
 
        `REGISTRATION_RETRY_DELAY`. If there is no server response or a
266
 
        failure to register after the required number of attempts,
267
 
        the agent stops itself.
268
 
        """
269
 
        for attempts in xrange(MAX_REGISTRATION_ATTEMPTS):
270
 
            context = n_context.get_admin_context_without_session()
271
 
            self.send_agent_report(self.agent_state, context)
272
 
            res = self.devmgr_rpc.register_for_duty(context)
273
 
            if res is True:
274
 
                LOG.info(_LI("[Agent registration] Agent successfully "
275
 
                           "registered"))
276
 
                return
277
 
            elif res is False:
278
 
                LOG.warning(_LW("[Agent registration] Neutron server said "
279
 
                                "that device manager was not ready. Retrying "
280
 
                                "in %0.2f seconds "), REGISTRATION_RETRY_DELAY)
281
 
                time.sleep(REGISTRATION_RETRY_DELAY)
282
 
            elif res is None:
283
 
                LOG.error(_LE("[Agent registration] Neutron server said that "
284
 
                              "no device manager was found. Cannot continue. "
285
 
                              "Exiting!"))
286
 
                raise SystemExit("Cfg Agent exiting")
287
 
        LOG.error(_LE("[Agent registration] %d unsuccessful registration "
288
 
                    "attempts. Exiting!"), MAX_REGISTRATION_ATTEMPTS)
289
 
        raise SystemExit("Cfg Agent exiting")
290
 
 
291
 
    def _report_state(self):
292
 
        """Report state to the plugin.
293
 
 
294
 
        This task run every `report_interval` period.
295
 
        Collects, creates and sends a summary of the services currently
296
 
        managed by this agent. Data is collected from the service helper(s).
297
 
        Refer the `configurations` dict for the parameters reported.
298
 
        :return: None
299
 
        """
300
 
        LOG.debug("Report state task started")
301
 
        configurations = {}
302
 
        if self.routing_service_helper:
303
 
            configurations = self.routing_service_helper.collect_state(
304
 
                self.agent_state['configurations'])
305
 
        non_responding = self._dev_status.get_backlogged_hosting_devices_info()
306
 
        configurations['non_responding_hosting_devices'] = non_responding
307
 
        self.agent_state['configurations'] = configurations
308
 
        self.agent_state['local_time'] = str(timeutils.utcnow())
309
 
        LOG.debug("State report data: %s", pprint.pformat(self.agent_state))
310
 
        self.send_agent_report(self.agent_state, self.context)
311
 
 
312
 
    def send_agent_report(self, report, context):
313
 
        """Send the agent report via RPC."""
314
 
        try:
315
 
            self.state_rpc.report_state(context, report, self.use_call)
316
 
            report.pop('start_flag', None)
317
 
            self.use_call = False
318
 
            LOG.debug("Send agent report successfully completed")
319
 
        except AttributeError:
320
 
            # This means the server does not support report_state
321
 
            LOG.warning(_LW("Neutron server does not support state report. "
322
 
                       "State report for this agent will be disabled."))
323
 
            self.heartbeat.stop()
324
 
            return
325
 
        except Exception:
326
 
            LOG.exception(_LE("Failed sending agent report!"))
327
 
 
328
 
 
329
 
def main(manager='neutron.plugins.cisco.cfg_agent.'
330
 
                 'cfg_agent.CiscoCfgAgentWithStateReport'):
331
 
    conf = cfg.CONF
332
 
    conf.register_opts(CiscoCfgAgent.OPTS, "cfg_agent")
333
 
    config.register_agent_state_opts_helper(conf)
334
 
    config.register_root_helper(conf)
335
 
    conf.register_opts(interface.OPTS)
336
 
    conf.register_opts(external_process.OPTS)
337
 
    common_config.init(sys.argv[1:])
338
 
    conf(project='neutron')
339
 
    config.setup_logging()
340
 
    server = neutron_service.Service.create(
341
 
        binary='neutron-cisco-cfg-agent',
342
 
        topic=c_constants.CFG_AGENT,
343
 
        report_interval=cfg.CONF.AGENT.report_interval,
344
 
        manager=manager)
345
 
    service.launch(server).wait()