5
Created by Thomas Mangin on 2012-06-10.
6
Copyright (c) 2009-2013 Exa Networks. All rights reserved.
16
from exabgp.version import version
18
from exabgp.reactor.daemon import Daemon
19
from exabgp.reactor.listener import Listener,NetworkError
20
from exabgp.reactor.api.processes import Processes,ProcessError
21
from exabgp.reactor.peer import Peer,ACTION
22
from exabgp.reactor.network.error import error
24
from exabgp.configuration.file import Configuration
25
from exabgp.configuration.environment import environment
27
from exabgp.logger import Logger
29
class Reactor (object):
30
# [hex(ord(c)) for c in os.popen('clear').read()]
31
clear = ''.join([chr(int(c,16)) for c in ['0x1b', '0x5b', '0x48', '0x1b', '0x5b', '0x32', '0x4a']])
33
def __init__ (self,configuration):
34
self.ip = environment.settings().tcp.bind
35
self.port = environment.settings().tcp.port
37
self.max_loop_time = environment.settings().reactor.speed
39
self.logger = Logger()
40
self.daemon = Daemon(self)
43
self.configuration = Configuration(configuration)
46
self._shutdown = False
48
self._reload_processes = False
50
self._route_update = False
51
self._saved_pid = False
55
signal.signal(signal.SIGTERM, self.sigterm)
56
signal.signal(signal.SIGHUP, self.sighup)
57
signal.signal(signal.SIGALRM, self.sigalrm)
58
signal.signal(signal.SIGUSR1, self.sigusr1)
59
signal.signal(signal.SIGUSR2, self.sigusr2)
61
def sigterm (self,signum, frame):
62
self.logger.reactor("SIG TERM received - shutdown")
65
def sighup (self,signum, frame):
66
self.logger.reactor("SIG HUP received - shutdown")
69
def sigalrm (self,signum, frame):
70
self.logger.reactor("SIG ALRM received - restart")
73
def sigusr1 (self,signum, frame):
74
self.logger.reactor("SIG USR1 received - reload configuration")
77
def sigusr2 (self,signum, frame):
78
self.logger.reactor("SIG USR2 received - reload configuration and processes")
80
self._reload_processes = True
85
self.listener = Listener([self.ip,],self.port)
87
except NetworkError,e:
89
if os.geteuid() != 0 and self.port <= 1024:
90
self.logger.reactor("Can not bind to %s:%d, you may need to run ExaBGP as root" % (self.ip,self.port),'critical')
92
self.logger.reactor("Can not bind to %s:%d (%s)" % (self.ip,self.port,str(e)),'critical')
93
self.logger.reactor("unset exabgp.tcp.bind if you do not want listen for incoming connections",'critical')
94
self.logger.reactor("and check that no other daemon is already binding to port %d" % self.port,'critical')
96
self.logger.reactor("Listening for BGP session(s) on %s:%d" % (self.ip,self.port))
98
if not self.daemon.drop_privileges():
99
self.logger.reactor("Could not drop privileges to '%s' refusing to run as root" % self.daemon.user,'critical')
100
self.logger.reactor("Set the environmemnt value exabgp.daemon.user to change the unprivileged user",'critical')
103
# This is required to make sure we can write in the log location as we now have dropped root privileges
104
if not self.logger.restart():
105
self.logger.reactor("Could not setup the logger, aborting",'critical')
108
self.daemon.daemonise()
110
if not self.daemon.savepid():
111
self.logger.reactor('could not update PID, not starting','error')
113
# Make sure we create processes one we have dropped privileges and closed file descriptor
114
self.processes = Processes(self)
117
# did we complete the run of updates caused by the last SIGUSR1/SIGUSR2 ?
118
reload_completed = True
120
wait = environment.settings().tcp.delay
122
sleeptime = (wait * 60) - int(time.time()) % (wait * 60)
123
self.logger.reactor("waiting for %d seconds before connecting" % sleeptime)
124
time.sleep(float(sleeptime))
132
self._shutdown = False
134
elif self._reload and reload_completed:
136
self.reload(self._reload_processes)
137
self._reload_processes = False
139
self._restart = False
141
elif self._route_update:
142
self._route_update = False
145
peers = self._peers.keys()
147
# handle keepalive first and foremost
149
peer = self._peers[key]
150
if peer.established():
151
if peer.keepalive() is False:
152
self.logger.reactor("problem with keepalive for peer %s " % peer.neighbor.name(),'error')
153
# unschedule the peer
155
# Handle all connection
159
peer = self._peers[key]
161
# .run() returns an ACTION enum:
162
# * immediate if it wants to be called again
163
# * later if it should be called again but has no work atm
164
# * close if it is finished and is closing down, or restarting
165
if action == ACTION.close:
166
self.unschedule(peer)
168
elif action == ACTION.later:
169
ios.extend(peer.sockets())
170
# no need to come back to it before a a full cycle
173
# give some time to our local processes
174
if self.schedule(self.processes.received()) or self._pending:
175
self._pending = list(self.run_pending(self._pending))
177
duration = time.time() - start
178
if duration >= self.max_loop_time:
183
reload_completed = True
185
# append here after reading as if read fails due to a dead process
186
# we may respawn the process which changes the FD
187
ios.extend(self.processes.fds())
189
# RFC state that we MUST not send more than one KEEPALIVE / sec
190
# And doing less could cause the session to drop
192
while self.schedule(self.processes.received()) or self._pending:
193
self._pending = list(self.run_pending(self._pending))
195
duration = time.time() - start
196
if duration >= self.max_loop_time:
200
for connection in self.listener.connected():
202
# * False, not peer found for this TCP connection
204
# * None, conflict found for this TCP connections
206
for key in self._peers:
207
peer = self._peers[key]
208
neighbor = peer.neighbor
209
# XXX: FIXME: Inet can only be compared to Inet
210
if connection.local == str(neighbor.peer_address) and connection.peer == str(neighbor.local_address):
211
if peer.incoming(connection):
218
self.logger.reactor("accepted connection from %s - %s" % (connection.local,connection.peer))
220
self.logger.reactor("no session configured for %s - %s" % (connection.local,connection.peer))
221
connection.notification(6,3,'no session configured for the peer')
224
self.logger.reactor("connection refused (already connected to the peer) %s - %s" % (connection.local,connection.peer))
225
connection.notification(6,5,'could not accept the connection')
228
delay = max(start+self.max_loop_time-time.time(),0.0)
230
# if we are not already late in this loop !
232
# some peers indicated that they wished to be called later
233
# so we are waiting for an update on their socket / pipe for up to the rest of the second
236
read,_,_ = select.select(ios,[],[],delay)
237
except select.error,e:
238
errno,message = e.args
239
if not errno in error.block:
241
# we can still loop here very fast if something goes wrogn with the FD
245
self.processes.terminate()
246
self.daemon.removepid()
248
except KeyboardInterrupt:
251
self._shutdown = True
252
self.logger.reactor("^C received")
254
except KeyboardInterrupt:
259
self._shutdown = True
260
self.logger.reactor("exiting")
262
except KeyboardInterrupt:
267
self._shutdown = True
268
self.logger.reactor("I/O Error received, most likely ^C during IO",'warning')
270
except KeyboardInterrupt:
275
self._shutdown = True
276
self.logger.reactor("Problem when sending message(s) to helper program, stopping",'error')
278
except KeyboardInterrupt:
280
except select.error,e:
283
self._shutdown = True
284
self.logger.reactor("problem using select, stopping",'error')
286
except KeyboardInterrupt:
288
# from exabgp.leak import objgraph
289
# print objgraph.show_most_common_types(limit=20)
291
# obj = objgraph.by_type('Route')[random.randint(0,2000)]
292
# objgraph.show_backrefs([obj], max_depth=10)
295
"""terminate all the current BGP connections"""
296
self.logger.reactor("Performing shutdown")
299
for key in self._peers.keys():
300
self._peers[key].stop()
302
def reload (self,restart=False):
303
"""reload the configuration and send to the peer the route which changed"""
304
self.logger.reactor("Performing reload of exabgp %s" % version)
306
reloaded = self.configuration.reload()
309
self.logger.configuration("Problem with the configuration file, no change done",'error')
310
self.logger.configuration(self.configuration.error,'error')
313
for key, peer in self._peers.items():
314
if key not in self.configuration.neighbor:
315
self.logger.reactor("Removing Peer %s" % peer.neighbor.name())
318
for key, neighbor in self.configuration.neighbor.items():
320
if key not in self._peers:
321
self.logger.reactor("New Peer %s" % neighbor.name())
322
peer = Peer(neighbor,self)
323
self._peers[key] = peer
325
elif self._peers[key].neighbor != neighbor:
326
self.logger.reactor("Peer definition change, restarting %s" % str(key))
327
self._peers[key].restart(neighbor)
328
# same peer but perhaps not the routes
330
self._peers[key].send_new(neighbor.rib.outgoing.queued_changes())
331
self.logger.configuration("Loaded new configuration successfully",'warning')
332
# This only starts once ...
333
self.processes.start(restart)
335
def run_pending (self,pending):
337
for generator in pending:
340
more = generator.next()
342
except StopIteration:
344
except KeyboardInterrupt:
345
self._shutdown = True
346
self.logger.reactor("^C received",'error')
349
def schedule (self,commands):
350
self._commands.extend(commands)
352
if not self._commands:
355
service,command = self._commands.pop(0)
357
if command == 'shutdown':
358
self._shutdown = True
361
self._answer(service,'shutdown in progress')
364
if command == 'reload':
368
self._answer(service,'reload in progress')
371
if command == 'restart':
375
self._answer(service,'restart in progress')
378
if command == 'version':
379
self._answer(service,'exabgp %s' % version)
382
if command == 'show neighbors':
383
def _show_neighbor (self):
384
for key in self.configuration.neighbor.keys():
385
neighbor = self.configuration.neighbor[key]
386
for line in str(neighbor).split('\n'):
387
self._answer(service,line)
389
self._pending.append(_show_neighbor(self))
392
if command == 'show routes':
393
def _show_route (self):
394
for key in self.configuration.neighbor.keys():
395
neighbor = self.configuration.neighbor[key]
396
for change in list(neighbor.rib.outgoing.sent_changes()):
397
self._answer(service,'neighbor %s %s' % (neighbor.local_address,str(change.nlri)))
399
self._pending.append(_show_route(self))
402
if command == 'show routes extensive':
403
def _show_extensive (self):
404
for key in self.configuration.neighbor.keys():
405
neighbor = self.configuration.neighbor[key]
406
for change in list(neighbor.rib.outgoing.sent_changes()):
407
self._answer(service,'neighbor %s %s' % (neighbor.name(),change.extensive()))
409
self._pending.append(_show_extensive(self))
413
if command.startswith('announce watchdog'):
414
def _announce_watchdog (self,name):
415
for neighbor in self.configuration.neighbor:
416
self.configuration.neighbor[neighbor].rib.outgoing.announce_watchdog(name)
418
self._route_update = True
420
name = command.split(' ')[2]
423
self._pending.append(_announce_watchdog(self,name))
427
if command.startswith('withdraw watchdog'):
428
def _withdraw_watchdog (self,name):
429
for neighbor in self.configuration.neighbor:
430
self.configuration.neighbor[neighbor].rib.outgoing.withdraw_watchdog(name)
432
self._route_update = True
434
name = command.split(' ')[2]
437
self._pending.append(_withdraw_watchdog(self,name))
440
def extract_neighbors (command):
441
"""return a list of neighbor definition : the neighbor definition is a list of string which are in the neighbor indexing string"""
442
# This function returns a list and a string
443
# The first list contains parsed neighbor to match against our defined peers
444
# The string is the command to be run for those peers
445
# The parsed neighbor is a list of the element making the neighbor string so each part can be checked against the neighbor name
448
neighbor,remaining = command.split(' ',1)
449
if neighbor != 'neighbor':
452
ip,command = remaining.split(' ',1)
453
definition = ['neighbor %s' % (ip)]
457
key,value,remaining = command.split(' ',2)
459
key,value = command.split(' ',1)
461
returned.append(definition)
462
_,command = command.split(' ',1)
465
if key not in ['neighbor','local-ip','local-as','peer-as','router-id','family-allowed']:
467
returned.append(definition)
469
definition.append('%s %s' % (key,value))
472
return returned,command
474
def match_neighbor (description,name):
475
for string in description:
476
if re.search('(^|[\s,])%s($|[\s,])' % re.escape(string), name) is None:
480
def match_neighbors (descriptions,peers):
481
"returns the sublist of peers matching the description passed, or None if no description is given"
487
for description in descriptions:
488
if match_neighbor(description,key):
489
if key not in returned:
493
# route announcement / withdrawal
494
if 'announce route ' in command:
495
def _announce_change (self,command,nexthops):
496
changes = self.configuration.parse_api_route(command,nexthops,'announce')
498
self.logger.reactor("Command could not parse route in : %s" % command,'warning')
502
for (peer,change) in changes:
504
self.configuration.change_to_peers(change,[peer,])
506
self.logger.reactor("Route added to %s : %s" % (', '.join(peers if peers else []) if peers is not None else 'all peers',change.extensive()))
507
self._route_update = True
510
descriptions,command = extract_neighbors(command)
511
peers = match_neighbors(descriptions,self._peers)
513
self.logger.reactor('no neighbor matching the command : %s' % command,'warning')
515
nexthops = dict((peer,self._peers[peer].neighbor.local_address) for peer in peers)
516
self._pending.append(_announce_change(self,command,nexthops))
523
# route announcement / withdrawal
524
if 'flush route' in command: # This allows flush routes with a s to work
525
def _flush (self,peers):
526
self.logger.reactor("Flushing routes for %s" % ', '.join(peers if peers else []) if peers is not None else 'all peers')
528
self._route_update = True
531
descriptions,command = extract_neighbors(command)
532
peers = match_neighbors(descriptions,self._peers)
534
self.logger.reactor('no neighbor matching the command : %s' % command,'warning')
536
self._pending.append(_flush(self,peers))
543
if 'withdraw route' in command:
544
def _withdraw_change (self,command,nexthops):
545
changes = self.configuration.parse_api_route(command,nexthops,'withdraw')
547
self.logger.reactor("Command could not parse route in : %s" % command,'warning')
550
for (peer,change) in changes:
551
if self.configuration.change_to_peers(change,[peer,]):
552
self.logger.reactor("Route removed : %s" % change.extensive())
555
self.logger.reactor("Could not find therefore remove route : %s" % change.extensive(),'warning')
557
self._route_update = True
560
descriptions,command = extract_neighbors(command)
561
peers = match_neighbors(descriptions,self._peers)
563
self.logger.reactor('no neighbor matching the command : %s' % command,'warning')
565
nexthops = dict((peer,self._peers[peer].neighbor.local_address) for peer in peers)
566
self._pending.append(_withdraw_change(self,command,nexthops))
574
# attribute announcement / withdrawal
575
if 'announce attribute ' in command:
576
def _announce_attribute (self,command,nexthops):
577
changes = self.configuration.parse_api_attribute(command,nexthops,'announce')
579
self.logger.reactor("Command could not parse attribute in : %s" % command,'warning')
582
for (peers,change) in changes:
583
self.configuration.change_to_peers(change,peers)
584
self.logger.reactor("Route added to %s : %s" % (', '.join(peers if peers else []) if peers is not None else 'all peers',change.extensive()))
586
self._route_update = True
589
descriptions,command = extract_neighbors(command)
590
peers = match_neighbors(descriptions,self._peers)
592
self.logger.reactor('no neighbor matching the command : %s' % command,'warning')
594
nexthops = dict((peer,self._peers[peer].neighbor.local_address) for peer in peers)
595
self._pending.append(_announce_attribute(self,command,nexthops))
602
# attribute announcement / withdrawal
603
if 'withdraw attribute ' in command:
604
def _withdraw_attribute (self,command,nexthops):
605
changes = self.configuration.parse_api_attribute(command,nexthops,'withdraw')
607
self.logger.reactor("Command could not parse attribute in : %s" % command,'warning')
610
for (peers,change) in changes:
611
if self.configuration.change_to_peers(change,peers):
612
self.logger.reactor("Route removed : %s" % change.extensive())
615
self.logger.reactor("Could not find therefore remove route : %s" % change.extensive(),'warning')
617
self._route_update = True
620
descriptions,command = extract_neighbors(command)
621
peers = match_neighbors(descriptions,self._peers)
623
self.logger.reactor('no neighbor matching the command : %s' % command,'warning')
625
nexthops = dict((peer,self._peers[peer].neighbor.local_address) for peer in peers)
626
self._pending.append(_withdraw_attribute(self,command,nexthops))
633
# flow announcement / withdrawal
634
if 'announce flow' in command:
635
def _announce_flow (self,command,peers):
636
changes = self.configuration.parse_api_flow(command,'announce')
638
self.logger.reactor("Command could not parse flow in : %s" % command)
641
for change in changes:
642
self.configuration.change_to_peers(change,peers)
643
self.logger.reactor("Flow added to %s : %s" % (', '.join(peers if peers else []) if peers is not None else 'all peers',change.extensive()))
645
self._route_update = True
648
descriptions,command = extract_neighbors(command)
649
peers = match_neighbors(descriptions,self._peers)
651
self.logger.reactor('no neighbor matching the command : %s' % command,'warning')
653
self._pending.append(_announce_flow(self,command,peers))
660
if 'withdraw flow' in command:
661
def _withdraw_flow (self,command,peers):
662
changes = self.configuration.parse_api_flow(command,'withdraw')
664
self.logger.reactor("Command could not parse flow in : %s" % command)
667
for change in changes:
668
if self.configuration.change_to_peers(change,peers):
669
self.logger.reactor("Flow found and removed : %s" % change.extensive())
672
self.logger.reactor("Could not find therefore remove flow : %s" % change.extensive(),'warning')
674
self._route_update = True
677
descriptions,command = extract_neighbors(command)
678
peers = match_neighbors(descriptions,self._peers)
680
self.logger.reactor('no neighbor matching the command : %s' % command,'warning')
682
self._pending.append(_withdraw_flow(self,command,peers))
689
# route announcement / withdrawal
690
if 'teardown' in command:
692
descriptions,command = extract_neighbors(command)
693
_,code = command.split(' ',1)
694
for key in self._peers:
695
for description in descriptions:
696
if match_neighbor(description,key):
697
self._peers[key].teardown(int(code))
698
self.logger.reactor('teardown scheduled for %s' % ' '.join(description))
705
if 'announce route-refresh' in command:
706
def _announce_refresh (self,command,peers):
707
rr = self.configuration.parse_api_refresh(command)
709
self.logger.reactor("Command could not parse flow in : %s" % command)
712
self.configuration.refresh_to_peers(rr,peers)
713
self.logger.reactor("Sent to %s : %s" % (', '.join(peers if peers else []) if peers is not None else 'all peers',rr.extensive()))
715
self._route_update = True
718
descriptions,command = extract_neighbors(command)
719
peers = match_neighbors(descriptions,self._peers)
721
self.logger.reactor('no neighbor matching the command : %s' % command,'warning')
723
self._pending.append(_announce_refresh(self,command,peers))
730
if command.startswith('operational ') and (command.split() + ['safe'])[1].lower() in ('asm','adm','rpcq','rpcp','apcq','apcp','lpcq','lpcp'):
731
def _announce_operational (self,command,peers):
732
operational = self.configuration.parse_api_operational(command)
734
self.logger.reactor("Command could not parse operational command : %s" % command)
737
self.configuration.operational_to_peers(operational,peers)
738
self.logger.reactor("operational message sent to %s : %s" % (', '.join(peers if peers else []) if peers is not None else 'all peers',operational.extensive()))
740
self._route_update = True
743
descriptions,command = extract_neighbors(command)
744
peers = match_neighbors(descriptions,self._peers)
746
self.logger.reactor('no neighbor matching the command : %s' % command,'warning')
748
self._pending.append(_announce_operational(self,command,peers))
757
self.logger.reactor("Command from process not understood : %s" % command,'warning')
760
def _answer (self,service,string):
761
self.processes.write(service,string)
762
self.logger.reactor('Responding to %s : %s' % (service,string))
765
def route_update (self):
766
"""the process ran and we need to figure what routes to changes"""
767
self.logger.reactor("Performing dynamic route update")
768
for key in self.configuration.neighbor.keys():
769
self._peers[key].send_new()
770
self.logger.reactor("Updated peers dynamic routes successfully")
772
def route_flush (self):
773
"""we just want to flush any unflushed routes"""
774
self.logger.reactor("Performing route flush")
775
for key in self.configuration.neighbor.keys():
776
self._peers[key].send_new(update=True)
779
"""kill the BGP session and restart it"""
780
self.logger.reactor("Performing restart of exabgp %s" % version)
781
self.configuration.reload()
783
for key in self._peers.keys():
784
if key not in self.configuration.neighbor.keys():
785
neighbor = self.configuration.neighbor[key]
786
self.logger.reactor("Removing Peer %s" % neighbor.name())
787
self._peers[key].stop()
789
self._peers[key].restart()
790
self.processes.terminate()
791
self.processes.start()
793
def unschedule (self,peer):
794
key = peer.neighbor.name()
795
if key in self._peers: