~ubuntu-branches/ubuntu/utopic/exabgp/utopic

« back to all changes in this revision

Viewing changes to lib/exabgp/bgp/protocol.py

  • Committer: Package Import Robot
  • Author(s): Henry-Nicolas Tourneur
  • Date: 2014-03-08 19:07:00 UTC
  • mfrom: (1.1.8)
  • Revision ID: package-import@ubuntu.com-20140308190700-xjbibpg1g6001c9x
Tags: 3.3.1-1
* New upstream release
* Bump python minimal required version (2.7)
* Closes: #726066 Debian packaging improvements proposed by Vincent Bernat
* Closes: #703774 not existent rundir (/var/run/exabgp) after reboot

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# encoding: utf-8
2
 
"""
3
 
protocol.py
4
 
 
5
 
Created by Thomas Mangin on 2009-08-25.
6
 
Copyright (c) 2009-2013 Exa Networks. All rights reserved.
7
 
"""
8
 
 
9
 
import time
10
 
from struct import unpack
11
 
 
12
 
from exabgp.rib.table import Table
13
 
from exabgp.rib.delta import Delta
14
 
 
15
 
from exabgp.bgp.connection import Connection
16
 
from exabgp.bgp.message import Message,Failure
17
 
from exabgp.bgp.message.nop import NOP
18
 
from exabgp.bgp.message.open import Open
19
 
from exabgp.bgp.message.open.routerid import RouterID
20
 
from exabgp.bgp.message.open.capability import Capabilities
21
 
from exabgp.bgp.message.open.capability.negotiated import Negotiated
22
 
from exabgp.bgp.message.update import Update
23
 
from exabgp.bgp.message.update.eor import EOR
24
 
from exabgp.bgp.message.keepalive import KeepAlive
25
 
from exabgp.bgp.message.notification import Notification, Notify
26
 
from exabgp.bgp.message.refresh import RouteRefresh
27
 
 
28
 
from exabgp.structure.processes import ProcessError
29
 
 
30
 
from exabgp.structure.log import Logger,LazyFormat
31
 
 
32
 
# This is the number of chuncked message we are willing to buffer, not the number of routes
33
 
MAX_BACKLOG = 15000
34
 
 
35
 
# README: Move all the old packet decoding in another file to clean up the includes here, as it is not used anyway
36
 
 
37
 
class Protocol (object):
38
 
        decode = True
39
 
 
40
 
        def __init__ (self,peer,connection=None):
41
 
                self.logger = Logger()
42
 
                self.peer = peer
43
 
                self.neighbor = peer.neighbor
44
 
                self.connection = connection
45
 
                self.negotiated = Negotiated()
46
 
 
47
 
                self.delta = Delta(Table(peer))
48
 
                self._messages = []
49
 
                self._frozen = 0
50
 
                # The message size is the whole BGP message _without_ headers
51
 
                self.message_size = 4096-19
52
 
 
53
 
        # XXX: we use self.peer.neighbor.peer_address when we could use self.neighbor.peer_address
54
 
 
55
 
        def me (self,message):
56
 
                return "Peer %15s ASN %-7s %s" % (self.peer.neighbor.peer_address,self.peer.neighbor.peer_as,message)
57
 
 
58
 
        def connect (self):
59
 
                # allows to test the protocol code using modified StringIO with a extra 'pending' function
60
 
                if not self.connection:
61
 
                        peer = self.neighbor.peer_address
62
 
                        local = self.neighbor.local_address
63
 
                        md5 = self.neighbor.md5
64
 
                        ttl = self.neighbor.ttl
65
 
                        self.connection = Connection(peer,local,md5,ttl)
66
 
 
67
 
                        if self.peer.neighbor.api.neighbor_changes:
68
 
                                try:
69
 
                                        self.peer.supervisor.processes.connected(self.peer.neighbor.peer_address)
70
 
                                except ProcessError:
71
 
                                        raise Failure('Could not send connected message(s) to helper program(s)')
72
 
 
73
 
        def close (self,reason='unspecified'):
74
 
                if self.connection:
75
 
                        # must be first otherwise we could have a loop caused by the raise in the below
76
 
                        self.connection.close()
77
 
                        self.connection = None
78
 
 
79
 
                        if self.peer.neighbor.api.neighbor_changes:
80
 
                                try:
81
 
                                        self.peer.supervisor.processes.down(self.peer.neighbor.peer_address,reason)
82
 
                                except ProcessError:
83
 
                                        raise Failure('Could not send down message(s) to helper program(s)')
84
 
 
85
 
        def write (self,message):
86
 
                if self.neighbor.api.send_packets:
87
 
                        try:
88
 
                                self.peer.supervisor.processes.send(self.peer.neighbor.peer_address,message[18],message[:19],message[19:])
89
 
                        except ProcessError:
90
 
                                raise Failure('Could not send update message(s) to helper program(s)')
91
 
                return self.connection.write(message)
92
 
 
93
 
        # Read from network .......................................................
94
 
 
95
 
        def read_message (self,keepalive_comment=''):
96
 
                # This call reset the time for the timeout in
97
 
                if not self.connection.pending(True):
98
 
                        return NOP()
99
 
 
100
 
                length = 19
101
 
                header = ''
102
 
                while length:
103
 
                        if self.connection.pending():
104
 
                                delta = self.connection.read(length)
105
 
                                header += delta
106
 
                                length -= len(delta)
107
 
 
108
 
                if header[:16] != Message.MARKER:
109
 
                        # We are speaking BGP - send us a valid Marker
110
 
                        raise Notify(1,1,'The packet received does not contain a BGP marker')
111
 
 
112
 
                raw_length = header[16:18]
113
 
                msg_length = unpack('!H',raw_length)[0]
114
 
                msg = header[18]
115
 
 
116
 
                if (msg_length < 19 or msg_length > 4096):
117
 
                        # BAD Message Length
118
 
                        raise Notify(1,2)
119
 
 
120
 
                if (
121
 
                        (msg == Open.TYPE and msg_length < 29) or
122
 
                        (msg == Update.TYPE and msg_length < 23) or
123
 
                        (msg == Notification.TYPE and msg_length < 21) or
124
 
                        (msg == KeepAlive.TYPE and msg_length != 19) or
125
 
                        (msg == RouteRefresh.TYPE and msg_length != 23)
126
 
                ):
127
 
                        # MUST send the faulty msg_length back
128
 
                        raise Notify(1,2,'%d has an invalid message length of %d' %(str(msg),msg_length))
129
 
 
130
 
                length = msg_length - 19
131
 
                body = ''
132
 
                while length:
133
 
                        if self.connection.pending():
134
 
                                delta = self.connection.read(length)
135
 
                                body += delta
136
 
                                length -= len(delta)
137
 
 
138
 
                if self.neighbor.api.receive_packets:
139
 
                        try:
140
 
                                self.peer.supervisor.processes.receive(self.peer.neighbor.peer_address,msg,header,body)
141
 
                        except ProcessError:
142
 
                                raise Failure('Could not send update message(s) to helper program(s)')
143
 
 
144
 
                if msg == KeepAlive.TYPE:
145
 
                        self.logger.message(self.me('<< KEEPALIVE%s' % keepalive_comment))
146
 
                        return KeepAlive()
147
 
 
148
 
                elif msg == Update.TYPE:
149
 
                        self.logger.message(self.me('<< UPDATE'))
150
 
 
151
 
                        if msg_length == 30 and body.startswith(EOR.PREFIX):
152
 
                                return EOR().factory(body)
153
 
 
154
 
                        if self.neighbor.api.receive_routes:
155
 
                                update = Update().factory(self.negotiated,body)
156
 
 
157
 
                                for route in update.routes:
158
 
                                        self.logger.routes(LazyFormat(self.me(''),str,route))
159
 
 
160
 
                                try:
161
 
                                        self.peer.supervisor.processes.routes(self.neighbor.peer_address,update.routes)
162
 
                                except ProcessError:
163
 
                                        raise Failure('Could not send routes message(s) to helper program(s)')
164
 
                                return update
165
 
                        else:
166
 
                                return NOP()
167
 
 
168
 
                elif msg == Notification.TYPE:
169
 
                        self.logger.message(self.me('<< NOTIFICATION'))
170
 
                        raise Notification().factory(body)
171
 
 
172
 
                elif msg == Open.TYPE:
173
 
                        return Open().factory(body)
174
 
 
175
 
 
176
 
                if msg == RouteRefresh.TYPE:
177
 
                        self.logger.message(self.me('<< ROUTE-REFRESH'))
178
 
                        return RouteRefresh().factory(body)
179
 
 
180
 
                else:
181
 
                        self.logger.message(self.me('<< NOP'))
182
 
 
183
 
                return NOP().factory(msg)
184
 
 
185
 
 
186
 
        def read_open (self,_open,ip):
187
 
                message = self.read_message()
188
 
 
189
 
                if message.TYPE == NOP.TYPE:
190
 
                        return message
191
 
 
192
 
                if message.TYPE != Open.TYPE:
193
 
                        raise Notify(5,1,'The first packet recevied is not an open message (%s)' % message)
194
 
 
195
 
                self.negotiated.received(message)
196
 
 
197
 
                if not self.negotiated.asn4:
198
 
                        if self.neighbor.local_as.asn4():
199
 
                                raise Notify(2,0,'peer does not speak ASN4, we are stuck')
200
 
                        else:
201
 
                                # we will use RFC 4893 to convey new ASN to the peer
202
 
                                self.negotiated.asn4
203
 
 
204
 
                if self.negotiated.peer_as != self.neighbor.peer_as:
205
 
                        raise Notify(2,2,'ASN in OPEN (%d) did not match ASN expected (%d)' % (message.asn,self.neighbor.peer_as))
206
 
 
207
 
                # RFC 6286 : http://tools.ietf.org/html/rfc6286
208
 
                #if message.router_id == RouterID('0.0.0.0'):
209
 
                #       message.router_id = RouterID(ip)
210
 
                if message.router_id == RouterID('0.0.0.0'):
211
 
                        raise Notify(2,3,'0.0.0.0 is an invalid router_id according to RFC6286')
212
 
 
213
 
                if message.router_id == self.neighbor.router_id and message.asn == self.neighbor.local_as:
214
 
                        raise Notify(2,3,'BGP Indendifier collision (%s) on IBGP according to RFC 6286' % message.router_id)
215
 
 
216
 
                if message.hold_time and message.hold_time < 3:
217
 
                        raise Notify(2,6,'Hold Time is invalid (%d)' % message.hold_time)
218
 
 
219
 
                if self.negotiated.multisession not in (True,False):
220
 
                        raise Notify(*self.negotiated.multisession)
221
 
 
222
 
                self.logger.message(self.me('<< %s' % message))
223
 
                return message
224
 
 
225
 
        def read_keepalive (self,comment=''):
226
 
                message = self.read_message(comment)
227
 
                if message.TYPE == NOP.TYPE:
228
 
                        return message
229
 
                if message.TYPE != KeepAlive.TYPE:
230
 
                        raise Notify(5,2)
231
 
                return message
232
 
 
233
 
        #
234
 
        # Sending message to peer
235
 
        #
236
 
 
237
 
        def new_open (self,restarted):
238
 
                sent_open = Open().new(
239
 
                        4,
240
 
                        self.neighbor.local_as,
241
 
                        self.neighbor.router_id.ip,
242
 
                        Capabilities().new(self.neighbor,restarted),
243
 
                        self.neighbor.hold_time
244
 
                )
245
 
 
246
 
                self.negotiated.sent(sent_open)
247
 
 
248
 
                # we do not buffer open message in purpose
249
 
                if not self.write(sent_open.message()):
250
 
                        raise Failure('Could not send open')
251
 
                self.logger.message(self.me('>> %s' % sent_open))
252
 
                return sent_open
253
 
 
254
 
        def new_keepalive (self,comment=''):
255
 
                k = KeepAlive()
256
 
                m = k.message()
257
 
                written = self.write(m)
258
 
                if not written:
259
 
                        self.logger.message(self.me('|| buffer not yet empty, adding KEEPALIVE to it'))
260
 
                        self._messages.append((1,'KEEPALIVE%s' % comment,m))
261
 
                else:
262
 
                        self.logger.message(self.me('>> KEEPALIVE%s' % comment))
263
 
                return k
264
 
 
265
 
        def new_update (self):
266
 
                # XXX: This should really be calculated once only
267
 
                for number in self._announce('UPDATE',self.peer.bgp.delta.updates(self.negotiated,self.neighbor.group_updates)):
268
 
                        yield number
269
 
 
270
 
        def new_eors (self):
271
 
                eor = EOR().new(self.negotiated.families)
272
 
                for answer in self._announce(str(eor),eor.updates(self.negotiated)):
273
 
                                pass
274
 
 
275
 
        def new_notification (self,notification):
276
 
                return self.write(notification.message())
277
 
 
278
 
        #
279
 
        # Sending / Buffer handling
280
 
        #
281
 
 
282
 
        def clear_buffer (self):
283
 
                self.logger.message(self.me('clearing MESSAGE(s) buffer'))
284
 
                self._messages = []
285
 
 
286
 
        def buffered (self):
287
 
                return len(self._messages)
288
 
 
289
 
        def _backlog (self):
290
 
                # performance only to remove inference
291
 
                if self._messages:
292
 
                        if not self._frozen:
293
 
                                self._frozen = time.time()
294
 
                        if self._frozen and self._frozen + self.negotiated.holdtime < time.time():
295
 
                                raise Failure('peer %s not reading on his socket (or not fast at all) - killing session' % self.neighbor.peer_as)
296
 
                        self.logger.message(self.me("unable to send route for %d second (maximum allowed %d)" % (time.time()-self._frozen,self.negotiated.holdtime)))
297
 
                        nb_backlog = len(self._messages)
298
 
                        if nb_backlog > MAX_BACKLOG:
299
 
                                raise Failure('over %d chunked routes buffered for peer %s - killing session' % (MAX_BACKLOG,self.neighbor.peer_as))
300
 
                        self.logger.message(self.me("self._messages of %d/%d chunked routes" % (nb_backlog,MAX_BACKLOG)))
301
 
                while self._messages:
302
 
                        number,name,update = self._messages[0]
303
 
                        if not self.write(update):
304
 
                                self.logger.message(self.me("|| failed to send %d %s(s) from buffer" % (number,name)))
305
 
                                break
306
 
                        self._messages.pop(0)
307
 
                        self._frozen = 0
308
 
                        yield name,number
309
 
 
310
 
        def _announce (self,name,generator):
311
 
                def chunked (generator,size):
312
 
                        chunk = ''
313
 
                        number = 0
314
 
                        for data in generator:
315
 
                                if len(data) > size:
316
 
                                        raise Failure('Can not send BGP update larger than %d bytes on this connection.' % size)
317
 
                                if len(chunk) + len(data) <= size:
318
 
                                        chunk += data
319
 
                                        number += 1
320
 
                                        continue
321
 
                                yield number,chunk
322
 
                                chunk = data
323
 
                                number = 1
324
 
                        if chunk:
325
 
                                yield number,chunk
326
 
 
327
 
                if self._messages:
328
 
                        for number,update in chunked(generator,self.message_size):
329
 
                                        self.logger.message(self.me('|| adding %d  %s(s) to existing buffer' % (number,name)))
330
 
                                        self._messages.append((number,name,update))
331
 
                        for name,number in self._backlog():
332
 
                                self.logger.message(self.me('>> %d buffered %s(s)' % (number,name)))
333
 
                                yield number
334
 
                else:
335
 
                        sending = True
336
 
                        for number,update in chunked(generator,self.message_size):
337
 
                                if sending:
338
 
                                        if self.write(update):
339
 
                                                self.logger.message(self.me('>> %d %s(s)' % (number,name)))
340
 
                                                yield number
341
 
                                        else:
342
 
                                                self.logger.message(self.me('|| could not send %d %s(s), buffering' % (number,name)))
343
 
                                                self._messages.append((number,name,update))
344
 
                                                sending = False
345
 
                                else:
346
 
                                        self.logger.message(self.me('|| buffering the rest of the %s(s) (%d)' % (name,number)))
347
 
                                        self._messages.append((number,name,update))
348
 
                                        yield 0