~ubuntu-branches/ubuntu/utopic/exabgp/utopic

« back to all changes in this revision

Viewing changes to lib/exabgp/reactor/__init__.py

  • Committer: Package Import Robot
  • Author(s): Henry-Nicolas Tourneur
  • Date: 2014-03-08 19:07:00 UTC
  • mfrom: (1.1.8)
  • Revision ID: package-import@ubuntu.com-20140308190700-xjbibpg1g6001c9x
Tags: 3.3.1-1
* New upstream release
* Bump python minimal required version (2.7)
* Closes: #726066 Debian packaging improvements proposed by Vincent Bernat
* Closes: #703774 not existent rundir (/var/run/exabgp) after reboot

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# encoding: utf-8
 
2
"""
 
3
reactor.py
 
4
 
 
5
Created by Thomas Mangin on 2012-06-10.
 
6
Copyright (c) 2009-2013 Exa Networks. All rights reserved.
 
7
"""
 
8
 
 
9
import os
 
10
import re
 
11
import sys
 
12
import time
 
13
import signal
 
14
import select
 
15
 
 
16
from exabgp.version import version
 
17
 
 
18
from exabgp.reactor.daemon import Daemon
 
19
from exabgp.reactor.listener import Listener,NetworkError
 
20
from exabgp.reactor.api.processes import Processes,ProcessError
 
21
from exabgp.reactor.peer import Peer,ACTION
 
22
from exabgp.reactor.network.error import error
 
23
 
 
24
from exabgp.configuration.file import Configuration
 
25
from exabgp.configuration.environment import environment
 
26
 
 
27
from exabgp.logger import Logger
 
28
 
 
29
class Reactor (object):
 
30
        # [hex(ord(c)) for c in os.popen('clear').read()]
 
31
        clear = ''.join([chr(int(c,16)) for c in ['0x1b', '0x5b', '0x48', '0x1b', '0x5b', '0x32', '0x4a']])
 
32
 
 
33
        def __init__ (self,configuration):
 
34
                self.ip = environment.settings().tcp.bind
 
35
                self.port = environment.settings().tcp.port
 
36
 
 
37
                self.max_loop_time = environment.settings().reactor.speed
 
38
 
 
39
                self.logger = Logger()
 
40
                self.daemon = Daemon(self)
 
41
                self.processes = None
 
42
                self.listener = None
 
43
                self.configuration = Configuration(configuration)
 
44
 
 
45
                self._peers = {}
 
46
                self._shutdown = False
 
47
                self._reload = False
 
48
                self._reload_processes = False
 
49
                self._restart = False
 
50
                self._route_update = False
 
51
                self._saved_pid = False
 
52
                self._commands = []
 
53
                self._pending = []
 
54
 
 
55
                signal.signal(signal.SIGTERM, self.sigterm)
 
56
                signal.signal(signal.SIGHUP, self.sighup)
 
57
                signal.signal(signal.SIGALRM, self.sigalrm)
 
58
                signal.signal(signal.SIGUSR1, self.sigusr1)
 
59
                signal.signal(signal.SIGUSR2, self.sigusr2)
 
60
 
 
61
        def sigterm (self,signum, frame):
 
62
                self.logger.reactor("SIG TERM received - shutdown")
 
63
                self._shutdown = True
 
64
 
 
65
        def sighup (self,signum, frame):
 
66
                self.logger.reactor("SIG HUP received - shutdown")
 
67
                self._shutdown = True
 
68
 
 
69
        def sigalrm (self,signum, frame):
 
70
                self.logger.reactor("SIG ALRM received - restart")
 
71
                self._restart = True
 
72
 
 
73
        def sigusr1 (self,signum, frame):
 
74
                self.logger.reactor("SIG USR1 received - reload configuration")
 
75
                self._reload = True
 
76
 
 
77
        def sigusr2 (self,signum, frame):
 
78
                self.logger.reactor("SIG USR2 received - reload configuration and processes")
 
79
                self._reload = True
 
80
                self._reload_processes = True
 
81
 
 
82
        def run (self):
 
83
                if self.ip:
 
84
                        try:
 
85
                                self.listener = Listener([self.ip,],self.port)
 
86
                                self.listener.start()
 
87
                        except NetworkError,e:
 
88
                                self.listener = None
 
89
                                if os.geteuid() != 0 and self.port <= 1024:
 
90
                                        self.logger.reactor("Can not bind to %s:%d, you may need to run ExaBGP as root" % (self.ip,self.port),'critical')
 
91
                                else:
 
92
                                        self.logger.reactor("Can not bind to %s:%d (%s)" % (self.ip,self.port,str(e)),'critical')
 
93
                                self.logger.reactor("unset exabgp.tcp.bind if you do not want listen for incoming connections",'critical')
 
94
                                self.logger.reactor("and check that no other daemon is already binding to port %d" % self.port,'critical')
 
95
                                sys.exit(1)
 
96
                        self.logger.reactor("Listening for BGP session(s) on %s:%d" % (self.ip,self.port))
 
97
 
 
98
                if not self.daemon.drop_privileges():
 
99
                        self.logger.reactor("Could not drop privileges to '%s' refusing to run as root" % self.daemon.user,'critical')
 
100
                        self.logger.reactor("Set the environmemnt value exabgp.daemon.user to change the unprivileged user",'critical')
 
101
                        return
 
102
 
 
103
                # This is required to make sure we can write in the log location as we now have dropped root privileges
 
104
                if not self.logger.restart():
 
105
                        self.logger.reactor("Could not setup the logger, aborting",'critical')
 
106
                        return
 
107
 
 
108
                self.daemon.daemonise()
 
109
 
 
110
                if not self.daemon.savepid():
 
111
                        self.logger.reactor('could not update PID, not starting','error')
 
112
 
 
113
                # Make sure we create processes one we have dropped privileges and closed file descriptor
 
114
                self.processes = Processes(self)
 
115
                self.reload()
 
116
 
 
117
                # did we complete the run of updates caused by the last SIGUSR1/SIGUSR2 ?
 
118
                reload_completed = True
 
119
 
 
120
                wait = environment.settings().tcp.delay
 
121
                if wait:
 
122
                        sleeptime = (wait * 60) - int(time.time()) % (wait * 60)
 
123
                        self.logger.reactor("waiting for %d seconds before connecting" % sleeptime)
 
124
                        time.sleep(float(sleeptime))
 
125
 
 
126
                while True:
 
127
                        try:
 
128
                                while self._peers:
 
129
                                        start = time.time()
 
130
 
 
131
                                        if self._shutdown:
 
132
                                                self._shutdown = False
 
133
                                                self.shutdown()
 
134
                                        elif self._reload and reload_completed:
 
135
                                                self._reload = False
 
136
                                                self.reload(self._reload_processes)
 
137
                                                self._reload_processes = False
 
138
                                        elif self._restart:
 
139
                                                self._restart = False
 
140
                                                self.restart()
 
141
                                        elif self._route_update:
 
142
                                                self._route_update = False
 
143
                                                self.route_update()
 
144
 
 
145
                                        peers = self._peers.keys()
 
146
 
 
147
                                        # handle keepalive first and foremost
 
148
                                        for key in peers:
 
149
                                                peer = self._peers[key]
 
150
                                                if peer.established():
 
151
                                                        if peer.keepalive() is False:
 
152
                                                                self.logger.reactor("problem with keepalive for peer %s " % peer.neighbor.name(),'error')
 
153
                                                                # unschedule the peer
 
154
 
 
155
                                        # Handle all connection
 
156
                                        ios = []
 
157
                                        while peers:
 
158
                                                for key in peers[:]:
 
159
                                                        peer = self._peers[key]
 
160
                                                        action = peer.run()
 
161
                                                        # .run() returns an ACTION enum:
 
162
                                                        # * immediate if it wants to be called again
 
163
                                                        # * later if it should be called again but has no work atm
 
164
                                                        # * close if it is finished and is closing down, or restarting
 
165
                                                        if action == ACTION.close:
 
166
                                                                self.unschedule(peer)
 
167
                                                                peers.remove(key)
 
168
                                                        elif action == ACTION.later:
 
169
                                                                ios.extend(peer.sockets())
 
170
                                                                # no need to come back to it before a a full cycle
 
171
                                                                peers.remove(key)
 
172
 
 
173
                                                        # give some time to our local processes
 
174
                                                        if self.schedule(self.processes.received()) or self._pending:
 
175
                                                                self._pending = list(self.run_pending(self._pending))
 
176
 
 
177
                                                duration = time.time() - start
 
178
                                                if duration >= self.max_loop_time:
 
179
                                                        ios=[]
 
180
                                                        break
 
181
 
 
182
                                        if not peers:
 
183
                                                reload_completed = True
 
184
 
 
185
                                        # append here after reading as if read fails due to a dead process
 
186
                                        # we may respawn the process which changes the FD
 
187
                                        ios.extend(self.processes.fds())
 
188
 
 
189
                                        # RFC state that we MUST not send more than one KEEPALIVE / sec
 
190
                                        # And doing less could cause the session to drop
 
191
 
 
192
                                        while self.schedule(self.processes.received()) or self._pending:
 
193
                                                self._pending = list(self.run_pending(self._pending))
 
194
 
 
195
                                                duration = time.time() - start
 
196
                                                if duration >= self.max_loop_time:
 
197
                                                        break
 
198
 
 
199
                                        if self.listener:
 
200
                                                for connection in self.listener.connected():
 
201
                                                        # found
 
202
                                                        # * False, not peer found for this TCP connection
 
203
                                                        # * True, peer found
 
204
                                                        # * None, conflict found for this TCP connections
 
205
                                                        found = False
 
206
                                                        for key in self._peers:
 
207
                                                                peer = self._peers[key]
 
208
                                                                neighbor = peer.neighbor
 
209
                                                                # XXX: FIXME: Inet can only be compared to Inet
 
210
                                                                if connection.local == str(neighbor.peer_address) and connection.peer == str(neighbor.local_address):
 
211
                                                                        if peer.incoming(connection):
 
212
                                                                                found = True
 
213
                                                                                break
 
214
                                                                        found = None
 
215
                                                                        break
 
216
 
 
217
                                                        if found:
 
218
                                                                self.logger.reactor("accepted connection from  %s - %s" % (connection.local,connection.peer))
 
219
                                                        elif found is False:
 
220
                                                                self.logger.reactor("no session configured for  %s - %s" % (connection.local,connection.peer))
 
221
                                                                connection.notification(6,3,'no session configured for the peer')
 
222
                                                                connection.close()
 
223
                                                        elif found is None:
 
224
                                                                self.logger.reactor("connection refused (already connected to the peer) %s - %s" % (connection.local,connection.peer))
 
225
                                                                connection.notification(6,5,'could not accept the connection')
 
226
                                                                connection.close()
 
227
 
 
228
                                        delay = max(start+self.max_loop_time-time.time(),0.0)
 
229
 
 
230
                                        # if we are not already late in this loop !
 
231
                                        if delay:
 
232
                                                # some peers indicated that they wished to be called later
 
233
                                                # so we are waiting for an update on their socket / pipe for up to the rest of the second
 
234
                                                if ios:
 
235
                                                        try:
 
236
                                                                read,_,_ = select.select(ios,[],[],delay)
 
237
                                                        except select.error,e:
 
238
                                                                errno,message = e.args
 
239
                                                                if not errno in error.block:
 
240
                                                                        raise e
 
241
                                                        # we can still loop here very fast if something goes wrogn with the FD
 
242
                                                else:
 
243
                                                        time.sleep(delay)
 
244
 
 
245
                                self.processes.terminate()
 
246
                                self.daemon.removepid()
 
247
                                break
 
248
                        except KeyboardInterrupt:
 
249
                                while True:
 
250
                                        try:
 
251
                                                self._shutdown = True
 
252
                                                self.logger.reactor("^C received")
 
253
                                                break
 
254
                                        except KeyboardInterrupt:
 
255
                                                pass
 
256
                        except SystemExit:
 
257
                                while True:
 
258
                                        try:
 
259
                                                self._shutdown = True
 
260
                                                self.logger.reactor("exiting")
 
261
                                                break
 
262
                                        except KeyboardInterrupt:
 
263
                                                pass
 
264
                        except IOError:
 
265
                                while True:
 
266
                                        try:
 
267
                                                self._shutdown = True
 
268
                                                self.logger.reactor("I/O Error received, most likely ^C during IO",'warning')
 
269
                                                break
 
270
                                        except KeyboardInterrupt:
 
271
                                                pass
 
272
                        except ProcessError:
 
273
                                while True:
 
274
                                        try:
 
275
                                                self._shutdown = True
 
276
                                                self.logger.reactor("Problem when sending message(s) to helper program, stopping",'error')
 
277
                                                break
 
278
                                        except KeyboardInterrupt:
 
279
                                                pass
 
280
                        except select.error,e:
 
281
                                while True:
 
282
                                        try:
 
283
                                                self._shutdown = True
 
284
                                                self.logger.reactor("problem using select, stopping",'error')
 
285
                                                break
 
286
                                        except KeyboardInterrupt:
 
287
                                                pass
 
288
#                               from exabgp.leak import objgraph
 
289
#                               print objgraph.show_most_common_types(limit=20)
 
290
#                               import random
 
291
#                               obj = objgraph.by_type('Route')[random.randint(0,2000)]
 
292
#                               objgraph.show_backrefs([obj], max_depth=10)
 
293
 
 
294
        def shutdown (self):
 
295
                """terminate all the current BGP connections"""
 
296
                self.logger.reactor("Performing shutdown")
 
297
                if self.listener:
 
298
                        self.listener.stop()
 
299
                for key in self._peers.keys():
 
300
                        self._peers[key].stop()
 
301
 
 
302
        def reload (self,restart=False):
 
303
                """reload the configuration and send to the peer the route which changed"""
 
304
                self.logger.reactor("Performing reload of exabgp %s" % version)
 
305
 
 
306
                reloaded = self.configuration.reload()
 
307
 
 
308
                if not reloaded:
 
309
                        self.logger.configuration("Problem with the configuration file, no change done",'error')
 
310
                        self.logger.configuration(self.configuration.error,'error')
 
311
                        return
 
312
 
 
313
                for key, peer in self._peers.items():
 
314
                        if key not in self.configuration.neighbor:
 
315
                                self.logger.reactor("Removing Peer %s" % peer.neighbor.name())
 
316
                                peer.stop()
 
317
 
 
318
                for key, neighbor in self.configuration.neighbor.items():
 
319
                        # new peer
 
320
                        if key not in self._peers:
 
321
                                self.logger.reactor("New Peer %s" % neighbor.name())
 
322
                                peer = Peer(neighbor,self)
 
323
                                self._peers[key] = peer
 
324
                        # modified peer
 
325
                        elif self._peers[key].neighbor != neighbor:
 
326
                                self.logger.reactor("Peer definition change, restarting %s" % str(key))
 
327
                                self._peers[key].restart(neighbor)
 
328
                        # same peer but perhaps not the routes
 
329
                        else:
 
330
                                self._peers[key].send_new(neighbor.rib.outgoing.queued_changes())
 
331
                self.logger.configuration("Loaded new configuration successfully",'warning')
 
332
                # This only starts once ...
 
333
                self.processes.start(restart)
 
334
 
 
335
        def run_pending (self,pending):
 
336
                more = True
 
337
                for generator in pending:
 
338
                        try:
 
339
                                if more:
 
340
                                        more = generator.next()
 
341
                                yield generator
 
342
                        except StopIteration:
 
343
                                pass
 
344
                        except KeyboardInterrupt:
 
345
                                self._shutdown = True
 
346
                                self.logger.reactor("^C received",'error')
 
347
                                break
 
348
 
 
349
        def schedule (self,commands):
 
350
                self._commands.extend(commands)
 
351
 
 
352
                if not self._commands:
 
353
                        return False
 
354
 
 
355
                service,command = self._commands.pop(0)
 
356
 
 
357
                if command == 'shutdown':
 
358
                        self._shutdown = True
 
359
                        self._pending = []
 
360
                        self._commands = []
 
361
                        self._answer(service,'shutdown in progress')
 
362
                        return True
 
363
 
 
364
                if command == 'reload':
 
365
                        self._reload = True
 
366
                        self._pending = []
 
367
                        self._commands = []
 
368
                        self._answer(service,'reload in progress')
 
369
                        return True
 
370
 
 
371
                if command == 'restart':
 
372
                        self._restart = True
 
373
                        self._pending = []
 
374
                        self._commands = []
 
375
                        self._answer(service,'restart in progress')
 
376
                        return True
 
377
 
 
378
                if command == 'version':
 
379
                        self._answer(service,'exabgp %s' % version)
 
380
                        return True
 
381
 
 
382
                if command == 'show neighbors':
 
383
                        def _show_neighbor (self):
 
384
                                for key in self.configuration.neighbor.keys():
 
385
                                        neighbor = self.configuration.neighbor[key]
 
386
                                        for line in str(neighbor).split('\n'):
 
387
                                                self._answer(service,line)
 
388
                                                yield True
 
389
                        self._pending.append(_show_neighbor(self))
 
390
                        return True
 
391
 
 
392
                if command == 'show routes':
 
393
                        def _show_route (self):
 
394
                                for key in self.configuration.neighbor.keys():
 
395
                                        neighbor = self.configuration.neighbor[key]
 
396
                                        for change in list(neighbor.rib.outgoing.sent_changes()):
 
397
                                                self._answer(service,'neighbor %s %s' % (neighbor.local_address,str(change.nlri)))
 
398
                                                yield True
 
399
                        self._pending.append(_show_route(self))
 
400
                        return True
 
401
 
 
402
                if command == 'show routes extensive':
 
403
                        def _show_extensive (self):
 
404
                                for key in self.configuration.neighbor.keys():
 
405
                                        neighbor = self.configuration.neighbor[key]
 
406
                                        for change in list(neighbor.rib.outgoing.sent_changes()):
 
407
                                                self._answer(service,'neighbor %s %s' % (neighbor.name(),change.extensive()))
 
408
                                                yield True
 
409
                        self._pending.append(_show_extensive(self))
 
410
                        return True
 
411
 
 
412
                # watchdog
 
413
                if command.startswith('announce watchdog'):
 
414
                        def _announce_watchdog (self,name):
 
415
                                for neighbor in self.configuration.neighbor:
 
416
                                        self.configuration.neighbor[neighbor].rib.outgoing.announce_watchdog(name)
 
417
                                        yield False
 
418
                                self._route_update = True
 
419
                        try:
 
420
                                name = command.split(' ')[2]
 
421
                        except IndexError:
 
422
                                name = service
 
423
                        self._pending.append(_announce_watchdog(self,name))
 
424
                        return True
 
425
 
 
426
                # watchdog
 
427
                if command.startswith('withdraw watchdog'):
 
428
                        def _withdraw_watchdog (self,name):
 
429
                                for neighbor in self.configuration.neighbor:
 
430
                                        self.configuration.neighbor[neighbor].rib.outgoing.withdraw_watchdog(name)
 
431
                                        yield False
 
432
                                self._route_update = True
 
433
                        try:
 
434
                                name = command.split(' ')[2]
 
435
                        except IndexError:
 
436
                                name = service
 
437
                        self._pending.append(_withdraw_watchdog(self,name))
 
438
                        return True
 
439
 
 
440
                def extract_neighbors (command):
 
441
                        """return a list of neighbor definition : the neighbor definition is a list of string which are in the neighbor indexing string"""
 
442
                        # This function returns a list and a string
 
443
                        # The first list contains parsed neighbor to match against our defined peers
 
444
                        # The string is the command to be run for those peers
 
445
                        # The parsed neighbor is a list of the element making the neighbor string so each part can be checked against the neighbor name
 
446
 
 
447
                        returned = []
 
448
                        neighbor,remaining = command.split(' ',1)
 
449
                        if neighbor != 'neighbor':
 
450
                                return [],command
 
451
 
 
452
                        ip,command = remaining.split(' ',1)
 
453
                        definition = ['neighbor %s' % (ip)]
 
454
 
 
455
                        while True:
 
456
                                try:
 
457
                                        key,value,remaining = command.split(' ',2)
 
458
                                except ValueError:
 
459
                                        key,value = command.split(' ',1)
 
460
                                if key == ',':
 
461
                                        returned.append(definition)
 
462
                                        _,command = command.split(' ',1)
 
463
                                        definition = []
 
464
                                        continue
 
465
                                if key not in ['neighbor','local-ip','local-as','peer-as','router-id','family-allowed']:
 
466
                                        if definition:
 
467
                                                returned.append(definition)
 
468
                                        break
 
469
                                definition.append('%s %s' % (key,value))
 
470
                                command = remaining
 
471
 
 
472
                        return returned,command
 
473
 
 
474
                def match_neighbor (description,name):
 
475
                        for string in description:
 
476
                                if re.search('(^|[\s,])%s($|[\s,])' % re.escape(string), name) is None:
 
477
                                        return False
 
478
                        return True
 
479
 
 
480
                def match_neighbors (descriptions,peers):
 
481
                        "returns the sublist of peers matching the description passed, or None if no description is given"
 
482
                        if not descriptions:
 
483
                                return peers.keys()
 
484
 
 
485
                        returned = []
 
486
                        for key in peers:
 
487
                                for description in descriptions:
 
488
                                        if match_neighbor(description,key):
 
489
                                                if key not in returned:
 
490
                                                        returned.append(key)
 
491
                        return returned
 
492
 
 
493
                # route announcement / withdrawal
 
494
                if 'announce route ' in command:
 
495
                        def _announce_change (self,command,nexthops):
 
496
                                changes = self.configuration.parse_api_route(command,nexthops,'announce')
 
497
                                if not changes:
 
498
                                        self.logger.reactor("Command could not parse route in : %s" % command,'warning')
 
499
                                        yield True
 
500
                                else:
 
501
                                        peers = []
 
502
                                        for (peer,change) in changes:
 
503
                                                peers.append(peer)
 
504
                                                self.configuration.change_to_peers(change,[peer,])
 
505
                                                yield False
 
506
                                        self.logger.reactor("Route added to %s : %s" % (', '.join(peers if peers else []) if peers is not None else 'all peers',change.extensive()))
 
507
                                        self._route_update = True
 
508
 
 
509
                        try:
 
510
                                descriptions,command = extract_neighbors(command)
 
511
                                peers = match_neighbors(descriptions,self._peers)
 
512
                                if peers == []:
 
513
                                        self.logger.reactor('no neighbor matching the command : %s' % command,'warning')
 
514
                                        return False
 
515
                                nexthops = dict((peer,self._peers[peer].neighbor.local_address) for peer in peers)
 
516
                                self._pending.append(_announce_change(self,command,nexthops))
 
517
                                return True
 
518
                        except ValueError:
 
519
                                pass
 
520
                        except IndexError:
 
521
                                pass
 
522
 
 
523
                # route announcement / withdrawal
 
524
                if 'flush route' in command:  # This allows flush routes with a s to work
 
525
                        def _flush (self,peers):
 
526
                                self.logger.reactor("Flushing routes for %s" % ', '.join(peers if peers else []) if peers is not None else 'all peers')
 
527
                                yield True
 
528
                                self._route_update = True
 
529
 
 
530
                        try:
 
531
                                descriptions,command = extract_neighbors(command)
 
532
                                peers = match_neighbors(descriptions,self._peers)
 
533
                                if peers == []:
 
534
                                        self.logger.reactor('no neighbor matching the command : %s' % command,'warning')
 
535
                                        return False
 
536
                                self._pending.append(_flush(self,peers))
 
537
                                return True
 
538
                        except ValueError:
 
539
                                pass
 
540
                        except IndexError:
 
541
                                pass
 
542
 
 
543
                if 'withdraw route' in command:
 
544
                        def _withdraw_change (self,command,nexthops):
 
545
                                changes = self.configuration.parse_api_route(command,nexthops,'withdraw')
 
546
                                if not changes:
 
547
                                        self.logger.reactor("Command could not parse route in : %s" % command,'warning')
 
548
                                        yield True
 
549
                                else:
 
550
                                        for (peer,change) in changes:
 
551
                                                if self.configuration.change_to_peers(change,[peer,]):
 
552
                                                        self.logger.reactor("Route removed : %s" % change.extensive())
 
553
                                                        yield False
 
554
                                                else:
 
555
                                                        self.logger.reactor("Could not find therefore remove route : %s" % change.extensive(),'warning')
 
556
                                                        yield False
 
557
                                        self._route_update = True
 
558
 
 
559
                        try:
 
560
                                descriptions,command = extract_neighbors(command)
 
561
                                peers = match_neighbors(descriptions,self._peers)
 
562
                                if peers == []:
 
563
                                        self.logger.reactor('no neighbor matching the command : %s' % command,'warning')
 
564
                                        return False
 
565
                                nexthops = dict((peer,self._peers[peer].neighbor.local_address) for peer in peers)
 
566
                                self._pending.append(_withdraw_change(self,command,nexthops))
 
567
                                return True
 
568
                        except ValueError:
 
569
                                pass
 
570
                        except IndexError:
 
571
                                pass
 
572
 
 
573
 
 
574
                # attribute announcement / withdrawal
 
575
                if 'announce attribute ' in command:
 
576
                        def _announce_attribute (self,command,nexthops):
 
577
                                changes = self.configuration.parse_api_attribute(command,nexthops,'announce')
 
578
                                if not changes:
 
579
                                        self.logger.reactor("Command could not parse attribute in : %s" % command,'warning')
 
580
                                        yield True
 
581
                                else:
 
582
                                        for (peers,change) in changes:
 
583
                                                self.configuration.change_to_peers(change,peers)
 
584
                                                self.logger.reactor("Route added to %s : %s" % (', '.join(peers if peers else []) if peers is not None else 'all peers',change.extensive()))
 
585
                                        yield False
 
586
                                        self._route_update = True
 
587
 
 
588
                        try:
 
589
                                descriptions,command = extract_neighbors(command)
 
590
                                peers = match_neighbors(descriptions,self._peers)
 
591
                                if peers == []:
 
592
                                        self.logger.reactor('no neighbor matching the command : %s' % command,'warning')
 
593
                                        return False
 
594
                                nexthops = dict((peer,self._peers[peer].neighbor.local_address) for peer in peers)
 
595
                                self._pending.append(_announce_attribute(self,command,nexthops))
 
596
                                return True
 
597
                        except ValueError:
 
598
                                pass
 
599
                        except IndexError:
 
600
                                pass
 
601
 
 
602
                # attribute announcement / withdrawal
 
603
                if 'withdraw attribute ' in command:
 
604
                        def _withdraw_attribute (self,command,nexthops):
 
605
                                changes = self.configuration.parse_api_attribute(command,nexthops,'withdraw')
 
606
                                if not changes:
 
607
                                        self.logger.reactor("Command could not parse attribute in : %s" % command,'warning')
 
608
                                        yield True
 
609
                                else:
 
610
                                        for (peers,change) in changes:
 
611
                                                if self.configuration.change_to_peers(change,peers):
 
612
                                                        self.logger.reactor("Route removed : %s" % change.extensive())
 
613
                                                        yield False
 
614
                                                else:
 
615
                                                        self.logger.reactor("Could not find therefore remove route : %s" % change.extensive(),'warning')
 
616
                                                        yield False
 
617
                                        self._route_update = True
 
618
 
 
619
                        try:
 
620
                                descriptions,command = extract_neighbors(command)
 
621
                                peers = match_neighbors(descriptions,self._peers)
 
622
                                if peers == []:
 
623
                                        self.logger.reactor('no neighbor matching the command : %s' % command,'warning')
 
624
                                        return False
 
625
                                nexthops = dict((peer,self._peers[peer].neighbor.local_address) for peer in peers)
 
626
                                self._pending.append(_withdraw_attribute(self,command,nexthops))
 
627
                                return True
 
628
                        except ValueError:
 
629
                                pass
 
630
                        except IndexError:
 
631
                                pass
 
632
 
 
633
                # flow announcement / withdrawal
 
634
                if 'announce flow' in command:
 
635
                        def _announce_flow (self,command,peers):
 
636
                                changes = self.configuration.parse_api_flow(command,'announce')
 
637
                                if not changes:
 
638
                                        self.logger.reactor("Command could not parse flow in : %s" % command)
 
639
                                        yield True
 
640
                                else:
 
641
                                        for change in changes:
 
642
                                                self.configuration.change_to_peers(change,peers)
 
643
                                                self.logger.reactor("Flow added to %s : %s" % (', '.join(peers if peers else []) if peers is not None else 'all peers',change.extensive()))
 
644
                                                yield False
 
645
                                        self._route_update = True
 
646
 
 
647
                        try:
 
648
                                descriptions,command = extract_neighbors(command)
 
649
                                peers = match_neighbors(descriptions,self._peers)
 
650
                                if peers == []:
 
651
                                        self.logger.reactor('no neighbor matching the command : %s' % command,'warning')
 
652
                                        return False
 
653
                                self._pending.append(_announce_flow(self,command,peers))
 
654
                                return True
 
655
                        except ValueError:
 
656
                                pass
 
657
                        except IndexError:
 
658
                                pass
 
659
 
 
660
                if 'withdraw flow' in command:
 
661
                        def _withdraw_flow (self,command,peers):
 
662
                                changes = self.configuration.parse_api_flow(command,'withdraw')
 
663
                                if not changes:
 
664
                                        self.logger.reactor("Command could not parse flow in : %s" % command)
 
665
                                        yield True
 
666
                                else:
 
667
                                        for change in changes:
 
668
                                                if self.configuration.change_to_peers(change,peers):
 
669
                                                        self.logger.reactor("Flow found and removed : %s" % change.extensive())
 
670
                                                        yield False
 
671
                                                else:
 
672
                                                        self.logger.reactor("Could not find therefore remove flow : %s" % change.extensive(),'warning')
 
673
                                                        yield False
 
674
                                        self._route_update = True
 
675
 
 
676
                        try:
 
677
                                descriptions,command = extract_neighbors(command)
 
678
                                peers = match_neighbors(descriptions,self._peers)
 
679
                                if peers == []:
 
680
                                        self.logger.reactor('no neighbor matching the command : %s' % command,'warning')
 
681
                                        return False
 
682
                                self._pending.append(_withdraw_flow(self,command,peers))
 
683
                                return True
 
684
                        except ValueError:
 
685
                                pass
 
686
                        except IndexError:
 
687
                                pass
 
688
 
 
689
                # route announcement / withdrawal
 
690
                if 'teardown' in command:
 
691
                        try:
 
692
                                descriptions,command = extract_neighbors(command)
 
693
                                _,code = command.split(' ',1)
 
694
                                for key in self._peers:
 
695
                                        for description in descriptions:
 
696
                                                if match_neighbor(description,key):
 
697
                                                        self._peers[key].teardown(int(code))
 
698
                                                        self.logger.reactor('teardown scheduled for %s' % ' '.join(description))
 
699
                                return True
 
700
                        except ValueError:
 
701
                                pass
 
702
                        except IndexError:
 
703
                                pass
 
704
 
 
705
                if 'announce route-refresh' in command:
 
706
                        def _announce_refresh (self,command,peers):
 
707
                                rr = self.configuration.parse_api_refresh(command)
 
708
                                if not rr:
 
709
                                        self.logger.reactor("Command could not parse flow in : %s" % command)
 
710
                                        yield True
 
711
                                else:
 
712
                                        self.configuration.refresh_to_peers(rr,peers)
 
713
                                        self.logger.reactor("Sent to %s : %s" % (', '.join(peers if peers else []) if peers is not None else 'all peers',rr.extensive()))
 
714
                                        yield False
 
715
                                        self._route_update = True
 
716
 
 
717
                        try:
 
718
                                descriptions,command = extract_neighbors(command)
 
719
                                peers = match_neighbors(descriptions,self._peers)
 
720
                                if peers == []:
 
721
                                        self.logger.reactor('no neighbor matching the command : %s' % command,'warning')
 
722
                                        return False
 
723
                                self._pending.append(_announce_refresh(self,command,peers))
 
724
                                return True
 
725
                        except ValueError:
 
726
                                pass
 
727
                        except IndexError:
 
728
                                pass
 
729
 
 
730
                if command.startswith('operational ') and (command.split() + ['safe'])[1].lower() in ('asm','adm','rpcq','rpcp','apcq','apcp','lpcq','lpcp'):
 
731
                        def _announce_operational (self,command,peers):
 
732
                                operational = self.configuration.parse_api_operational(command)
 
733
                                if not operational:
 
734
                                        self.logger.reactor("Command could not parse operational command : %s" % command)
 
735
                                        yield True
 
736
                                else:
 
737
                                        self.configuration.operational_to_peers(operational,peers)
 
738
                                        self.logger.reactor("operational message sent to %s : %s" % (', '.join(peers if peers else []) if peers is not None else 'all peers',operational.extensive()))
 
739
                                        yield False
 
740
                                        self._route_update = True
 
741
 
 
742
                        try:
 
743
                                descriptions,command = extract_neighbors(command)
 
744
                                peers = match_neighbors(descriptions,self._peers)
 
745
                                if peers == []:
 
746
                                        self.logger.reactor('no neighbor matching the command : %s' % command,'warning')
 
747
                                        return False
 
748
                                self._pending.append(_announce_operational(self,command,peers))
 
749
                                return True
 
750
                        except ValueError:
 
751
                                pass
 
752
                        except IndexError:
 
753
                                pass
 
754
 
 
755
 
 
756
                # unknown
 
757
                self.logger.reactor("Command from process not understood : %s" % command,'warning')
 
758
                return False
 
759
 
 
760
        def _answer (self,service,string):
 
761
                self.processes.write(service,string)
 
762
                self.logger.reactor('Responding to %s : %s' % (service,string))
 
763
 
 
764
 
 
765
        def route_update (self):
 
766
                """the process ran and we need to figure what routes to changes"""
 
767
                self.logger.reactor("Performing dynamic route update")
 
768
                for key in self.configuration.neighbor.keys():
 
769
                        self._peers[key].send_new()
 
770
                self.logger.reactor("Updated peers dynamic routes successfully")
 
771
 
 
772
        def route_flush (self):
 
773
                """we just want to flush any unflushed routes"""
 
774
                self.logger.reactor("Performing route flush")
 
775
                for key in self.configuration.neighbor.keys():
 
776
                        self._peers[key].send_new(update=True)
 
777
 
 
778
        def restart (self):
 
779
                """kill the BGP session and restart it"""
 
780
                self.logger.reactor("Performing restart of exabgp %s" % version)
 
781
                self.configuration.reload()
 
782
 
 
783
                for key in self._peers.keys():
 
784
                        if key not in self.configuration.neighbor.keys():
 
785
                                neighbor = self.configuration.neighbor[key]
 
786
                                self.logger.reactor("Removing Peer %s" % neighbor.name())
 
787
                                self._peers[key].stop()
 
788
                        else:
 
789
                                self._peers[key].restart()
 
790
                self.processes.terminate()
 
791
                self.processes.start()
 
792
 
 
793
        def unschedule (self,peer):
 
794
                key = peer.neighbor.name()
 
795
                if key in self._peers:
 
796
                        del self._peers[key]