5
Created by Thomas Mangin on 2009-08-25.
6
Copyright (c) 2009-2013 Exa Networks. All rights reserved.
10
from struct import unpack
12
from exabgp.rib.table import Table
13
from exabgp.rib.delta import Delta
15
from exabgp.bgp.connection import Connection
16
from exabgp.bgp.message import Message,Failure
17
from exabgp.bgp.message.nop import NOP
18
from exabgp.bgp.message.open import Open
19
from exabgp.bgp.message.open.routerid import RouterID
20
from exabgp.bgp.message.open.capability import Capabilities
21
from exabgp.bgp.message.open.capability.negotiated import Negotiated
22
from exabgp.bgp.message.update import Update
23
from exabgp.bgp.message.update.eor import EOR
24
from exabgp.bgp.message.keepalive import KeepAlive
25
from exabgp.bgp.message.notification import Notification, Notify
26
from exabgp.bgp.message.refresh import RouteRefresh
28
from exabgp.structure.processes import ProcessError
30
from exabgp.structure.log import Logger,LazyFormat
32
# This is the number of chuncked message we are willing to buffer, not the number of routes
35
# README: Move all the old packet decoding in another file to clean up the includes here, as it is not used anyway
37
class Protocol (object):
40
def __init__ (self,peer,connection=None):
41
self.logger = Logger()
43
self.neighbor = peer.neighbor
44
self.connection = connection
45
self.negotiated = Negotiated()
47
self.delta = Delta(Table(peer))
50
# The message size is the whole BGP message _without_ headers
51
self.message_size = 4096-19
53
# XXX: we use self.peer.neighbor.peer_address when we could use self.neighbor.peer_address
55
def me (self,message):
56
return "Peer %15s ASN %-7s %s" % (self.peer.neighbor.peer_address,self.peer.neighbor.peer_as,message)
59
# allows to test the protocol code using modified StringIO with a extra 'pending' function
60
if not self.connection:
61
peer = self.neighbor.peer_address
62
local = self.neighbor.local_address
63
md5 = self.neighbor.md5
64
ttl = self.neighbor.ttl
65
self.connection = Connection(peer,local,md5,ttl)
67
if self.peer.neighbor.api.neighbor_changes:
69
self.peer.supervisor.processes.connected(self.peer.neighbor.peer_address)
71
raise Failure('Could not send connected message(s) to helper program(s)')
73
def close (self,reason='unspecified'):
75
# must be first otherwise we could have a loop caused by the raise in the below
76
self.connection.close()
77
self.connection = None
79
if self.peer.neighbor.api.neighbor_changes:
81
self.peer.supervisor.processes.down(self.peer.neighbor.peer_address,reason)
83
raise Failure('Could not send down message(s) to helper program(s)')
85
def write (self,message):
86
if self.neighbor.api.send_packets:
88
self.peer.supervisor.processes.send(self.peer.neighbor.peer_address,message[18],message[:19],message[19:])
90
raise Failure('Could not send update message(s) to helper program(s)')
91
return self.connection.write(message)
93
# Read from network .......................................................
95
def read_message (self,keepalive_comment=''):
96
# This call reset the time for the timeout in
97
if not self.connection.pending(True):
103
if self.connection.pending():
104
delta = self.connection.read(length)
108
if header[:16] != Message.MARKER:
109
# We are speaking BGP - send us a valid Marker
110
raise Notify(1,1,'The packet received does not contain a BGP marker')
112
raw_length = header[16:18]
113
msg_length = unpack('!H',raw_length)[0]
116
if (msg_length < 19 or msg_length > 4096):
121
(msg == Open.TYPE and msg_length < 29) or
122
(msg == Update.TYPE and msg_length < 23) or
123
(msg == Notification.TYPE and msg_length < 21) or
124
(msg == KeepAlive.TYPE and msg_length != 19) or
125
(msg == RouteRefresh.TYPE and msg_length != 23)
127
# MUST send the faulty msg_length back
128
raise Notify(1,2,'%d has an invalid message length of %d' %(str(msg),msg_length))
130
length = msg_length - 19
133
if self.connection.pending():
134
delta = self.connection.read(length)
138
if self.neighbor.api.receive_packets:
140
self.peer.supervisor.processes.receive(self.peer.neighbor.peer_address,msg,header,body)
142
raise Failure('Could not send update message(s) to helper program(s)')
144
if msg == KeepAlive.TYPE:
145
self.logger.message(self.me('<< KEEPALIVE%s' % keepalive_comment))
148
elif msg == Update.TYPE:
149
self.logger.message(self.me('<< UPDATE'))
151
if msg_length == 30 and body.startswith(EOR.PREFIX):
152
return EOR().factory(body)
154
if self.neighbor.api.receive_routes:
155
update = Update().factory(self.negotiated,body)
157
for route in update.routes:
158
self.logger.routes(LazyFormat(self.me(''),str,route))
161
self.peer.supervisor.processes.routes(self.neighbor.peer_address,update.routes)
163
raise Failure('Could not send routes message(s) to helper program(s)')
168
elif msg == Notification.TYPE:
169
self.logger.message(self.me('<< NOTIFICATION'))
170
raise Notification().factory(body)
172
elif msg == Open.TYPE:
173
return Open().factory(body)
176
if msg == RouteRefresh.TYPE:
177
self.logger.message(self.me('<< ROUTE-REFRESH'))
178
return RouteRefresh().factory(body)
181
self.logger.message(self.me('<< NOP'))
183
return NOP().factory(msg)
186
def read_open (self,_open,ip):
187
message = self.read_message()
189
if message.TYPE == NOP.TYPE:
192
if message.TYPE != Open.TYPE:
193
raise Notify(5,1,'The first packet recevied is not an open message (%s)' % message)
195
self.negotiated.received(message)
197
if not self.negotiated.asn4:
198
if self.neighbor.local_as.asn4():
199
raise Notify(2,0,'peer does not speak ASN4, we are stuck')
201
# we will use RFC 4893 to convey new ASN to the peer
204
if self.negotiated.peer_as != self.neighbor.peer_as:
205
raise Notify(2,2,'ASN in OPEN (%d) did not match ASN expected (%d)' % (message.asn,self.neighbor.peer_as))
207
# RFC 6286 : http://tools.ietf.org/html/rfc6286
208
#if message.router_id == RouterID('0.0.0.0'):
209
# message.router_id = RouterID(ip)
210
if message.router_id == RouterID('0.0.0.0'):
211
raise Notify(2,3,'0.0.0.0 is an invalid router_id according to RFC6286')
213
if message.router_id == self.neighbor.router_id and message.asn == self.neighbor.local_as:
214
raise Notify(2,3,'BGP Indendifier collision (%s) on IBGP according to RFC 6286' % message.router_id)
216
if message.hold_time and message.hold_time < 3:
217
raise Notify(2,6,'Hold Time is invalid (%d)' % message.hold_time)
219
if self.negotiated.multisession not in (True,False):
220
raise Notify(*self.negotiated.multisession)
222
self.logger.message(self.me('<< %s' % message))
225
def read_keepalive (self,comment=''):
226
message = self.read_message(comment)
227
if message.TYPE == NOP.TYPE:
229
if message.TYPE != KeepAlive.TYPE:
234
# Sending message to peer
237
def new_open (self,restarted):
238
sent_open = Open().new(
240
self.neighbor.local_as,
241
self.neighbor.router_id.ip,
242
Capabilities().new(self.neighbor,restarted),
243
self.neighbor.hold_time
246
self.negotiated.sent(sent_open)
248
# we do not buffer open message in purpose
249
if not self.write(sent_open.message()):
250
raise Failure('Could not send open')
251
self.logger.message(self.me('>> %s' % sent_open))
254
def new_keepalive (self,comment=''):
257
written = self.write(m)
259
self.logger.message(self.me('|| buffer not yet empty, adding KEEPALIVE to it'))
260
self._messages.append((1,'KEEPALIVE%s' % comment,m))
262
self.logger.message(self.me('>> KEEPALIVE%s' % comment))
265
def new_update (self):
266
# XXX: This should really be calculated once only
267
for number in self._announce('UPDATE',self.peer.bgp.delta.updates(self.negotiated,self.neighbor.group_updates)):
271
eor = EOR().new(self.negotiated.families)
272
for answer in self._announce(str(eor),eor.updates(self.negotiated)):
275
def new_notification (self,notification):
276
return self.write(notification.message())
279
# Sending / Buffer handling
282
def clear_buffer (self):
283
self.logger.message(self.me('clearing MESSAGE(s) buffer'))
287
return len(self._messages)
290
# performance only to remove inference
293
self._frozen = time.time()
294
if self._frozen and self._frozen + self.negotiated.holdtime < time.time():
295
raise Failure('peer %s not reading on his socket (or not fast at all) - killing session' % self.neighbor.peer_as)
296
self.logger.message(self.me("unable to send route for %d second (maximum allowed %d)" % (time.time()-self._frozen,self.negotiated.holdtime)))
297
nb_backlog = len(self._messages)
298
if nb_backlog > MAX_BACKLOG:
299
raise Failure('over %d chunked routes buffered for peer %s - killing session' % (MAX_BACKLOG,self.neighbor.peer_as))
300
self.logger.message(self.me("self._messages of %d/%d chunked routes" % (nb_backlog,MAX_BACKLOG)))
301
while self._messages:
302
number,name,update = self._messages[0]
303
if not self.write(update):
304
self.logger.message(self.me("|| failed to send %d %s(s) from buffer" % (number,name)))
306
self._messages.pop(0)
310
def _announce (self,name,generator):
311
def chunked (generator,size):
314
for data in generator:
316
raise Failure('Can not send BGP update larger than %d bytes on this connection.' % size)
317
if len(chunk) + len(data) <= size:
328
for number,update in chunked(generator,self.message_size):
329
self.logger.message(self.me('|| adding %d %s(s) to existing buffer' % (number,name)))
330
self._messages.append((number,name,update))
331
for name,number in self._backlog():
332
self.logger.message(self.me('>> %d buffered %s(s)' % (number,name)))
336
for number,update in chunked(generator,self.message_size):
338
if self.write(update):
339
self.logger.message(self.me('>> %d %s(s)' % (number,name)))
342
self.logger.message(self.me('|| could not send %d %s(s), buffering' % (number,name)))
343
self._messages.append((number,name,update))
346
self.logger.message(self.me('|| buffering the rest of the %s(s) (%d)' % (name,number)))
347
self._messages.append((number,name,update))