~gnuoy/charms/trusty/quantum-gateway/fix-purge

« back to all changes in this revision

Viewing changes to files/neutron-ha-monitor.py

  • Committer: Liam Young
  • Date: 2015-01-23 14:48:02 UTC
  • mfrom: (76.2.89 neutron-ha-scale)
  • Revision ID: liam.young@canonical.com-20150123144802-63k91ec17k30p5ng
[xianghui, r=gnuoy] To support quantum-gateway HA in Icehouse before Juno native HA deployed.
It works by using pacemaker to monitor a daemon, which will detect neutron agents status and cleanup local resource when failover etc.

BP:
https://blueprints.launchpad.net/cts-engineering/+spec/neutron-gateway-services-need-ha-support

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2014 Canonical Ltd.
 
2
#
 
3
# Authors: Hui Xiang <hui.xiang@canonical.com>
 
4
#          Joshua Zhang <joshua.zhang@canonical.com>
 
5
#          Edward Hope-Morley <edward.hope-morley@canonical.com>
 
6
#
 
7
 
 
8
"""
 
9
Helpers for monitoring Neutron agents, reschedule failed agents,
 
10
cleaned resources on failed nodes.
 
11
"""
 
12
 
 
13
import os
 
14
import re
 
15
import sys
 
16
import signal
 
17
import socket
 
18
import subprocess
 
19
import time
 
20
 
 
21
from oslo.config import cfg
 
22
from neutron.agent.linux import ovs_lib
 
23
from neutron.agent.linux import ip_lib
 
24
from neutron.common import exceptions
 
25
from neutron.openstack.common import log as logging
 
26
 
 
27
LOG = logging.getLogger(__name__)
 
28
 
 
29
 
 
30
class Daemon(object):
 
31
    """A generic daemon class.
 
32
 
 
33
    Usage: subclass the Daemon class and override the run() method
 
34
    """
 
35
    def __init__(self, stdin='/dev/null', stdout='/dev/null',
 
36
                 stderr='/dev/null', procname='python'):
 
37
        self.stdin = stdin
 
38
        self.stdout = stdout
 
39
        self.stderr = stderr
 
40
        self.procname = procname
 
41
 
 
42
    def _fork(self):
 
43
        try:
 
44
            pid = os.fork()
 
45
            if pid > 0:
 
46
                sys.exit(0)
 
47
        except OSError:
 
48
            LOG.exception('Fork failed')
 
49
            sys.exit(1)
 
50
 
 
51
    def daemonize(self):
 
52
        """Daemonize process by doing Stevens double fork."""
 
53
        # fork first time
 
54
        self._fork()
 
55
 
 
56
        # decouple from parent environment
 
57
        os.chdir("/")
 
58
        os.setsid()
 
59
        os.umask(0)
 
60
        # fork second time
 
61
        self._fork()
 
62
 
 
63
        # redirect standard file descriptors
 
64
        sys.stdout.flush()
 
65
        sys.stderr.flush()
 
66
        stdin = open(self.stdin, 'r')
 
67
        stdout = open(self.stdout, 'a+')
 
68
        stderr = open(self.stderr, 'a+', 0)
 
69
        os.dup2(stdin.fileno(), sys.stdin.fileno())
 
70
        os.dup2(stdout.fileno(), sys.stdout.fileno())
 
71
        os.dup2(stderr.fileno(), sys.stderr.fileno())
 
72
 
 
73
        signal.signal(signal.SIGTERM, self.handle_sigterm)
 
74
 
 
75
    def handle_sigterm(self, signum, frame):
 
76
        sys.exit(0)
 
77
 
 
78
    def start(self):
 
79
        """Start the daemon."""
 
80
        self.daemonize()
 
81
        self.run()
 
82
 
 
83
    def run(self):
 
84
        """Override this method when subclassing Daemon.
 
85
 
 
86
        start() will call this method after the process has daemonized.
 
87
        """
 
88
        pass
 
89
 
 
90
 
 
91
class MonitorNeutronAgentsDaemon(Daemon):
 
92
    def __init__(self):
 
93
        super(MonitorNeutronAgentsDaemon, self).__init__()
 
94
        logging.setup('Neuron-HA-Monitor')
 
95
        LOG.info('Monitor Neutron Agent Loop Init')
 
96
        self.hostname = None
 
97
        self.env = {}
 
98
 
 
99
    def get_env(self):
 
100
        envrc_f = '/etc/legacy_ha_envrc'
 
101
        envrc_f_m = False
 
102
        if os.path.isfile(envrc_f):
 
103
            ctime = time.ctime(os.stat(envrc_f).st_ctime)
 
104
            mtime = time.ctime(os.stat(envrc_f).st_mtime)
 
105
            if ctime != mtime:
 
106
                envrc_f_m = True
 
107
 
 
108
            if not self.env or envrc_f_m:
 
109
                with open(envrc_f, 'r') as f:
 
110
                    for line in f:
 
111
                        data = line.strip().split('=')
 
112
                        if data and data[0] and data[1]:
 
113
                            self.env[data[0]] = data[1]
 
114
                        else:
 
115
                            raise Exception("OpenStack env data uncomplete.")
 
116
        return self.env
 
117
 
 
118
    def get_hostname(self):
 
119
        if not self.hostname:
 
120
            self.hostname = socket.gethostname()
 
121
        return self.hostname
 
122
 
 
123
    def get_root_helper(self):
 
124
        return 'sudo'
 
125
 
 
126
    def list_monitor_res(self):
 
127
        # List crm resource 'cl_monitor' running node
 
128
        nodes = []
 
129
        cmd = ['crm', 'resource', 'show', 'cl_monitor']
 
130
        output = subprocess.check_output(cmd)
 
131
        pattern = re.compile('resource cl_monitor is running on: (.*) ')
 
132
        nodes = pattern.findall(output)
 
133
        return nodes
 
134
 
 
135
    def get_crm_res_lead_node(self):
 
136
        nodes = self.list_monitor_res()
 
137
        if nodes:
 
138
            return nodes[0].strip()
 
139
        else:
 
140
            LOG.error('Failed to get crm resource.')
 
141
            return None
 
142
 
 
143
    def unplug_device(self, device):
 
144
        try:
 
145
            device.link.delete()
 
146
        except RuntimeError:
 
147
            root_helper = self.get_root_helper()
 
148
            # Maybe the device is OVS port, so try to delete
 
149
            bridge_name = ovs_lib.get_bridge_for_iface(root_helper,
 
150
                                                       device.name)
 
151
            if bridge_name:
 
152
                bridge = ovs_lib.OVSBridge(bridge_name, root_helper)
 
153
                bridge.delete_port(device.name)
 
154
            else:
 
155
                LOG.debug('Unable to find bridge for device: %s', device.name)
 
156
 
 
157
    def get_pattern(self, key, text):
 
158
        if not key or not text:
 
159
            LOG.debug('Invalid key(%s) or text(%s)' % (key, text))
 
160
            return None
 
161
 
 
162
        pattern = re.compile('%s' % key)
 
163
        result = pattern.findall(text)
 
164
        return result
 
165
 
 
166
    def _cleanup(self, key1, key2):
 
167
        namespaces = []
 
168
        if key1:
 
169
            for k in key1.iterkeys():
 
170
                namespaces.append(key2 + '-' + k)
 
171
        else:
 
172
            try:
 
173
                cmd = ['sudo', 'ip', 'netns']
 
174
                ns = subprocess.check_output(cmd)
 
175
                namespaces = self.get_pattern('(%s.*)' % key2, ns)
 
176
            except RuntimeError as e:
 
177
                LOG.error('Failed to list namespace, (%s)' % e)
 
178
 
 
179
        if namespaces:
 
180
            LOG.info('Namespaces: %s is going to be deleted.' % namespaces)
 
181
            self.destroy_namespaces(namespaces)
 
182
 
 
183
    def cleanup_dhcp(self, networks):
 
184
        self._cleanup(networks, 'qdhcp')
 
185
 
 
186
    def cleanup_router(self, routers):
 
187
        self._cleanup(routers, 'qrouter')
 
188
 
 
189
    def destroy_namespaces(self, namespaces):
 
190
        try:
 
191
            root_helper = self.get_root_helper()
 
192
            for namespace in namespaces:
 
193
                ip = ip_lib.IPWrapper(root_helper, namespace)
 
194
                if ip.netns.exists(namespace):
 
195
                    for device in ip.get_devices(exclude_loopback=True):
 
196
                        self.unplug_device(device)
 
197
 
 
198
            ip.garbage_collect_namespace()
 
199
        except Exception:
 
200
            LOG.exception('Error unable to destroy namespace: %s', namespace)
 
201
 
 
202
    def is_same_host(self, host):
 
203
        return str(host).strip() == self.get_hostname()
 
204
 
 
205
    def validate_reschedule(self):
 
206
        crm_no_1_node = self.get_crm_res_lead_node()
 
207
        if not crm_no_1_node:
 
208
            LOG.error('No crm first node could be found.')
 
209
            return False
 
210
 
 
211
        if not self.is_same_host(crm_no_1_node):
 
212
            LOG.warn('Only the first crm node %s could reschedule. '
 
213
                     % crm_no_1_node)
 
214
            return False
 
215
        return True
 
216
 
 
217
    def l3_agents_reschedule(self, l3_agents, routers, quantum):
 
218
        if not self.validate_reschedule():
 
219
            return
 
220
 
 
221
        index = 0
 
222
        for router_id in routers:
 
223
            agent = index % len(l3_agents)
 
224
            LOG.info('Moving router %s from %s to %s' %
 
225
                     (router_id, routers[router_id], l3_agents[agent]))
 
226
            try:
 
227
                quantum.remove_router_from_l3_agent(l3_agent=routers[router_id],
 
228
                                                    router_id=router_id)
 
229
            except exceptions.NeutronException as e:
 
230
                LOG.error('Remove router raised exception: %s' % e)
 
231
            try:
 
232
                quantum.add_router_to_l3_agent(l3_agent=l3_agents[agent],
 
233
                                               body={'router_id': router_id})
 
234
            except exceptions.NeutronException as e:
 
235
                LOG.error('Add router raised exception: %s' % e)
 
236
            index += 1
 
237
 
 
238
    def dhcp_agents_reschedule(self, dhcp_agents, networks, quantum):
 
239
        if not self.validate_reschedule():
 
240
            return
 
241
 
 
242
        index = 0
 
243
        for network_id in networks:
 
244
            agent = index % len(dhcp_agents)
 
245
            LOG.info('Moving network %s from %s to %s' % (network_id,
 
246
                     networks[network_id], dhcp_agents[agent]))
 
247
            try:
 
248
                quantum.remove_network_from_dhcp_agent(
 
249
                    dhcp_agent=networks[network_id], network_id=network_id)
 
250
            except exceptions.NeutronException as e:
 
251
                LOG.error('Remove network raised exception: %s' % e)
 
252
            try:
 
253
                quantum.add_network_to_dhcp_agent(
 
254
                    dhcp_agent=dhcp_agents[agent],
 
255
                    body={'network_id': network_id})
 
256
            except exceptions.NeutronException as e:
 
257
                LOG.error('Add network raised exception: %s' % e)
 
258
            index += 1
 
259
 
 
260
    def get_quantum_client(self):
 
261
        env = self.get_env()
 
262
        if not env:
 
263
            LOG.info('Unable to re-assign resources at this time')
 
264
            return None
 
265
 
 
266
        try:
 
267
            from quantumclient.v2_0 import client
 
268
        except ImportError:
 
269
            # Try to import neutronclient instead for havana+
 
270
            from neutronclient.v2_0 import client
 
271
 
 
272
        auth_url = '%(auth_protocol)s://%(keystone_host)s:%(auth_port)s/v2.0' \
 
273
                   % env
 
274
        quantum = client.Client(username=env['service_username'],
 
275
                                password=env['service_password'],
 
276
                                tenant_name=env['service_tenant'],
 
277
                                auth_url=auth_url,
 
278
                                region_name=env['region'])
 
279
        return quantum
 
280
 
 
281
    def reassign_agent_resources(self, quantum=None):
 
282
        """Use agent scheduler API to detect down agents and re-schedule"""
 
283
        if not quantum:
 
284
            LOG.error('Failed to get quantum client.')
 
285
            return
 
286
 
 
287
        try:
 
288
            DHCP_AGENT = "DHCP Agent"
 
289
            L3_AGENT = "L3 Agent"
 
290
            agents = quantum.list_agents(agent_type=DHCP_AGENT)
 
291
        except exceptions.NeutronException as e:
 
292
            LOG.error('Failed to get quantum agents, %s' % e)
 
293
            return
 
294
 
 
295
        dhcp_agents = []
 
296
        l3_agents = []
 
297
        networks = {}
 
298
        for agent in agents['agents']:
 
299
            hosted_networks = quantum.list_networks_on_dhcp_agent(
 
300
                agent['id'])['networks']
 
301
            if not agent['alive']:
 
302
                LOG.info('DHCP Agent %s down' % agent['id'])
 
303
                for network in hosted_networks:
 
304
                    networks[network['id']] = agent['id']
 
305
                if self.is_same_host(agent['host']):
 
306
                    self.cleanup_dhcp(networks)
 
307
            else:
 
308
                dhcp_agents.append(agent['id'])
 
309
                LOG.info('Active dhcp agents: %s' % agent['id'])
 
310
                if not hosted_networks and self.is_same_host(agent['host']):
 
311
                    self.cleanup_dhcp(None)
 
312
 
 
313
        agents = quantum.list_agents(agent_type=L3_AGENT)
 
314
        routers = {}
 
315
        for agent in agents['agents']:
 
316
            hosted_routers = quantum.list_routers_on_l3_agent(
 
317
                agent['id'])['routers']
 
318
            if not agent['alive']:
 
319
                LOG.info('L3 Agent %s down' % agent['id'])
 
320
                for router in hosted_routers:
 
321
                    routers[router['id']] = agent['id']
 
322
                if self.is_same_host(agent['host']):
 
323
                    self.cleanup_router(routers)
 
324
            else:
 
325
                l3_agents.append(agent['id'])
 
326
                LOG.info('Active l3 agents: %s' % agent['id'])
 
327
                if not hosted_routers and self.is_same_host(agent['host']):
 
328
                    self.cleanup_router(None)
 
329
 
 
330
        if not networks and not routers:
 
331
            LOG.info('No networks and routers hosted on failed agents.')
 
332
            return
 
333
 
 
334
        if len(dhcp_agents) == 0 and len(l3_agents) == 0:
 
335
            LOG.error('Unable to relocate resources, there are %s dhcp_agents '
 
336
                      'and %s l3_agents in this cluster' % (len(dhcp_agents),
 
337
                                                            len(l3_agents)))
 
338
            return
 
339
 
 
340
        if len(l3_agents) > 0:
 
341
            self.l3_agents_reschedule(l3_agents, routers, quantum)
 
342
            # new l3 node will not create a tunnel if don't restart ovs process
 
343
 
 
344
        if len(dhcp_agents) > 0:
 
345
            self.dhcp_agents_reschedule(dhcp_agents, networks, quantum)
 
346
 
 
347
 
 
348
    def check_ovs_tunnel(self, quantum=None):
 
349
        '''
 
350
        Work around for Bug #1411163
 
351
        No fdb entries added when failover dhcp and l3 agent together.
 
352
        '''
 
353
        if not quantum:
 
354
            LOG.error('Failed to get quantum client.')
 
355
            return
 
356
 
 
357
        try:
 
358
            OVS_AGENT = 'Open vSwitch agent'
 
359
            agents = quantum.list_agents(agent_type=OVS_AGENT)
 
360
        except exceptions.NeutronException as e:
 
361
            LOG.error('No ovs agent found on localhost, error:%s.' % e)
 
362
            return
 
363
 
 
364
        for agent in agents['agents']:
 
365
            if self.is_same_host(agent['host']) and agent['alive']:
 
366
                conf = agent['configurations']
 
367
                if 'gre' in conf['tunnel_types'] and conf['l2_population'] \
 
368
                        and conf['devices']:
 
369
                    LOG.debug('local ovs agent:%s' % agent)
 
370
                    ovs_output = subprocess.check_output(['ovs-vsctl',
 
371
                                                          'list-ports', 'br-tun'])
 
372
                    ports = ovs_output.strip().split('\n')
 
373
                    look_up_gre_port = False
 
374
                    for port in ports:
 
375
                        if port.startswith('gre-'):
 
376
                            look_up_gre_port = True
 
377
                            break
 
378
                    if not look_up_gre_port:
 
379
                        try:
 
380
                            LOG.error('Local agent has devices, but no ovs tunnel is created,'
 
381
                                      'restart ovs agent.')
 
382
                            cmd = ['sudo', 'service', 'neutron-plugin-openvswitch-agent',
 
383
                                   'restart']
 
384
                            subprocess.call(cmd)
 
385
                        except subprocess.CalledProcessError:
 
386
                            LOG.error('Failed to restart neutron-plugin-openvswitch-agent.')
 
387
 
 
388
    def check_local_agents(self):
 
389
        services = ['openvswitch-switch', 'neutron-dhcp-agent',
 
390
                    'neutron-metadata-agent', 'neutron-vpn-agent']
 
391
        for s in services:
 
392
            status = ['sudo', 'service', s, 'status']
 
393
            restart = ['sudo', 'service', s, 'restart']
 
394
            start = ['sudo', 'service', s, 'start']
 
395
            stop = '%s stop/waiting' % s
 
396
            try:
 
397
                output = subprocess.check_output(status)
 
398
                if output.strip() == stop:
 
399
                    subprocess.check_output(start)
 
400
                    LOG.error('Restart service: %s' % s)
 
401
                    if s == 'neutron-metadata-agent':
 
402
                        subprocess.check_output(['sudo', 'service',
 
403
                                                 'neutron-vpn-agent',
 
404
                                                 'restart'])
 
405
                        LOG.error('Restart neutron-vpn-agent')
 
406
            except subprocess.CalledProcessError:
 
407
                LOG.error('Restart service: %s' % s)
 
408
                subprocess.check_output(restart)
 
409
                if s == 'neutron-metadata-agent':
 
410
                    subprocess.check_output(['sudo', 'service',
 
411
                                             'neutron-vpn-agent',
 
412
                                             'restart'])
 
413
 
 
414
    def run(self):
 
415
        while True:
 
416
            LOG.info('Monitor Neutron HA Agent Loop Start')
 
417
            quantum = self.get_quantum_client()
 
418
            self.reassign_agent_resources(quantum=quantum)
 
419
            self.check_ovs_tunnel(quantum=quantum)
 
420
            self.check_local_agents()
 
421
            LOG.info('sleep %s' % cfg.CONF.check_interval)
 
422
            time.sleep(float(cfg.CONF.check_interval))
 
423
 
 
424
 
 
425
if __name__ == '__main__':
 
426
    opts = [
 
427
        cfg.StrOpt('check_interval',
 
428
                   default=8,
 
429
                   help='Check Neutron Agents interval.'),
 
430
    ]
 
431
 
 
432
    cfg.CONF.register_cli_opts(opts)
 
433
    cfg.CONF(project='monitor_neutron_agents', default_config_files=[])
 
434
    logging.setup('Neuron-HA-Monitor')
 
435
    monitor_daemon = MonitorNeutronAgentsDaemon()
 
436
    monitor_daemon.start()