23
23
from neutron.agent.linux import keepalived
24
24
from neutron.agent.linux import utils as agent_utils
25
from neutron.common import constants as l3_constants
26
from neutron.i18n import _LE, _LI
25
from neutron.i18n import _LI
26
from neutron.notifiers import batch_notifier
28
28
LOG = logging.getLogger(__name__)
91
91
def __init__(self, host):
92
92
self._init_ha_conf_path()
93
93
super(AgentMixin, self).__init__(host)
94
self.state_change_notifier = batch_notifier.BatchNotifier(
95
self._calculate_batch_duration(), self.notify_server)
94
96
eventlet.spawn(self._start_keepalived_notifications_server)
96
98
def _start_keepalived_notifications_server(self):
98
100
L3AgentKeepalivedStateChangeServer(self, self.conf))
99
101
state_change_server.run()
103
def _calculate_batch_duration(self):
104
# Slave becomes the master after not hearing from it 3 times
105
detection_time = self.conf.ha_vrrp_advert_int * 3
107
# Keepalived takes a couple of seconds to configure the VIPs
108
configuration_time = 2
110
# Give it enough slack to batch all events due to the same failure
111
return (detection_time + configuration_time) * 2
101
113
def enqueue_state_change(self, router_id, state):
102
114
LOG.info(_LI('Router %(router_id)s transitioned to %(state)s'),
103
115
{'router_id': router_id,
105
117
self._update_metadata_proxy(router_id, state)
118
self.state_change_notifier.queue_event((router_id, state))
107
120
def _update_metadata_proxy(self, router_id, state):
122
135
self.metadata_driver.destroy_monitored_metadata_proxy(
123
136
self.process_monitor, ri.router_id, ri.ns_name, self.conf)
138
def notify_server(self, batched_events):
139
translation_map = {'master': 'active',
142
translated_states = dict((router_id, translation_map[state]) for
143
router_id, state in batched_events)
144
LOG.debug('Updating server with HA routers states %s',
146
self.plugin_rpc.update_ha_routers_states(
147
self.context, translated_states)
125
149
def _init_ha_conf_path(self):
126
150
ha_full_path = os.path.dirname("/%s/" % self.conf.ha_confs_path)
127
151
agent_utils.ensure_dir(ha_full_path)
129
def process_ha_router_added(self, ri):
130
ha_port = ri.router.get(l3_constants.HA_INTERFACE_KEY)
132
LOG.error(_LE('Unable to process HA router %s without ha port'),
136
ri._set_subnet_info(ha_port)
138
ri._init_keepalived_manager(self.process_monitor)
139
ri.ha_network_added(ha_port['network_id'],
142
ha_port['mac_address'])
144
ri.update_initial_state(self.enqueue_state_change)
145
ri.spawn_state_change_monitor(self.process_monitor)
147
def process_ha_router_removed(self, ri):
148
ri.destroy_state_change_monitor(self.process_monitor)
149
ri.ha_network_removed()