5
Created by Thomas Mangin on 2009-11-05.
6
Copyright (c) 2009-2013 Exa Networks. All rights reserved.
9
from exabgp.bgp.message.direction import IN,OUT
10
from exabgp.bgp.message.update import Update
11
from exabgp.bgp.message.refresh import RouteRefresh
13
# XXX: FIXME: we would not have to use so many setdefault if we pre-filled the dicts with the families
16
def __init__ (self,families):
17
# XXX: FIXME: we can decide to not cache the routes we seen and let the backend do it for us and save the memory
20
self.families = families
23
# will resend all the routes once we reconnect
25
# WARNING : this function can run while we are in the updates() loop too !
26
self._enhanced_refresh_start = []
27
self._enhanced_refresh_delay = []
28
for update in self.updates(False): pass
30
# back to square one, all the routes are removed
32
self._cache_attribute = {}
34
self._modify_nlri = {}
35
self._modify_sorted = {}
39
def sent_changes (self,families=None):
40
# families can be None or []
41
requested_families = self.families if not families else set(families).intersection(self.families)
43
# we use list() to make a snapshot of the data at the time we run the command
44
for family in requested_families:
45
for change in self._seen.get(family,{}).values():
46
if change.nlri.action == OUT.announce:
49
def resend (self,families,enhanced_refresh):
50
# families can be None or []
51
requested_families = self.families if not families else set(families).intersection(self.families)
53
def _announced (family):
54
for change in self._seen.get(family,{}).values():
55
if change.nlri.action == OUT.announce:
57
self._seen[family] = {}
60
for family in requested_families:
61
if family not in self._enhanced_refresh_start:
62
self._enhanced_refresh_start.append(family)
63
for change in _announced(family):
64
self.insert_announced(change,True)
66
for family in requested_families:
67
for change in _announced(family):
68
self.insert_announced(change,True)
71
# # This function returns a hash and not a list as "in" tests are O(n) with lists and O(1) with hash
72
# # and with ten thousands routes this makes an enormous difference (60 seconds to 2)
74
# for family in self._seen.keys():
75
# for change in self._seen[family].values():
76
# if change.nlri.action == OUT.announce:
77
# changes[change.index()] = change
80
def queued_changes (self):
81
for change in self._modify_nlri.values():
84
def update (self,changes):
85
self._changes = changes
87
def insert_announced_watchdog (self,change):
88
watchdog = change.attributes.watchdog()
89
withdraw = change.attributes.withdraw()
92
self._watchdog.setdefault(watchdog,{}).setdefault('-',{})[change.nlri.index()] = change
94
self._watchdog.setdefault(watchdog,{}).setdefault('+',{})[change.nlri.index()] = change
95
self.insert_announced(change)
98
def announce_watchdog (self,watchdog):
99
if watchdog in self._watchdog:
100
for change in self._watchdog[watchdog].get('-',{}).values():
101
change.nlri.action = OUT.announce
102
self.insert_announced(change)
103
self._watchdog[watchdog].setdefault('+',{})[change.nlri.index()] = change
104
self._watchdog[watchdog]['-'].pop(change.nlri.index())
106
def withdraw_watchdog (self,watchdog):
107
if watchdog in self._watchdog:
108
for change in self._watchdog[watchdog].get('+',{}).values():
109
change.nlri.action = OUT.withdraw
110
self.insert_announced(change)
111
self._watchdog[watchdog].setdefault('-',{})[change.nlri.index()] = change
112
self._watchdog[watchdog]['+'].pop(change.nlri.index())
114
def insert_received (self,change):
117
elif change.nlri.action == IN.announced:
118
self._seen[change.nlri.index()] = change
120
self._seen.pop(change.nlri.index(),None)
122
def insert_announced (self,change,force=False):
123
# WARNING : this function can run while we are in the updates() loop
125
# self._seen[family][nlri-index] = change
127
# XXX: FIXME: if we fear a conflict of nlri-index between family (very very unlikely)
128
# XXX: FIXME: then we should preprend the index() with the AFI and SAFI
130
# self._modify_nlri[nlri-index] = change : we are modifying this nlri
131
# self._modify_sorted[attr-index][nlri-index] = change : add or remove the nlri
132
# self._cache_attribute[attr-index] = change
133
# and it allow to overwrite change easily :-)
136
# traceback.print_stack()
137
# print "inserting", change.extensive()
139
if not force and self._enhanced_refresh_start:
140
self._enhanced_refresh_delay.append(change)
143
change_nlri_index = change.nlri.index()
144
change_attr_index = change.attributes.index()
146
dict_sorted = self._modify_sorted
147
dict_nlri = self._modify_nlri
148
dict_attr = self._cache_attribute
150
# removing a route befone we had time to announe it ?
151
if change_nlri_index in dict_nlri:
152
old_attr_index = dict_nlri[change_nlri_index].attributes.index()
153
# pop removes the entry
154
old_change = dict_nlri.pop(change_nlri_index)
155
# do not delete dict_attr, other routes may use it
156
del dict_sorted[old_attr_index][change_nlri_index]
157
if not dict_sorted[old_attr_index]:
158
del dict_sorted[old_attr_index]
159
# route removed before announcement, all goo
160
if old_change.nlri.action == OUT.announce and change.nlri.action == OUT.withdraw:
161
# if we cache sent NLRI and this NLRI was never sent before, we do not need to send a withdrawal
162
if self.cache and change_nlri_index not in self._seen.get(change.nlri.family(),{}):
165
# add the route to the list to be announced
166
dict_sorted.setdefault(change_attr_index,{})[change_nlri_index] = change
167
dict_nlri[change_nlri_index] = change
168
if change_attr_index not in dict_attr:
169
dict_attr[change_attr_index] = change
172
def updates (self,grouped):
174
dict_nlri = self._modify_nlri
176
for family in self._seen:
177
for change in self._seen[family].itervalues():
178
if change.index() not in self._modify_nlri:
179
change.nlri.action = OUT.withdraw
180
self.insert_announced(change,True)
182
for new in self._changes:
183
self.insert_announced(new,True)
189
for afi,safi in self._enhanced_refresh_start:
190
rr_announced.append((afi,safi))
191
yield RouteRefresh(afi,safi,RouteRefresh.start)
193
dict_sorted = self._modify_sorted
194
dict_nlri = self._modify_nlri
195
dict_attr = self._cache_attribute
197
for attr_index,full_dict_change in dict_sorted.items():
200
for nlri_index,change in full_dict_change.iteritems():
201
family = change.nlri.family()
202
announced = self._seen.get(family,{})
203
if change.nlri.action == OUT.announce:
204
if nlri_index in announced:
205
old_change = announced[nlri_index]
206
# it is a duplicate route
207
if old_change.attributes.index() == change.attributes.index():
209
elif change.nlri.action == OUT.withdraw:
210
if nlri_index not in announced:
211
if dict_nlri[nlri_index].nlri.action == OUT.announce:
213
dict_change[nlri_index] = change
215
dict_change = full_dict_change
220
attributes = dict_attr[attr_index].attributes
222
# we NEED the copy provided by list() here as insert_announced can be called while we iterate
223
changed = list(dict_change.itervalues())
226
update = Update([dict_nlri[nlri_index].nlri for nlri_index in dict_change],attributes)
227
for change in changed:
228
nlri_index = change.nlri.index()
229
del dict_sorted[attr_index][nlri_index]
230
del dict_nlri[nlri_index]
231
# only yield once we have a consistent state, otherwise it will go wrong
232
# as we will try to modify things we are using
236
for change in changed:
237
updates.append(Update([change.nlri,],attributes))
238
nlri_index = change.nlri.index()
239
del dict_sorted[attr_index][nlri_index]
240
del dict_nlri[nlri_index]
241
# only yield once we have a consistent state, otherwise it will go wrong
242
# as we will try to modify things we are using
243
for update in updates:
247
announced = self._seen
248
for change in changed:
249
if change.nlri.action == OUT.announce:
250
announced.setdefault(change.nlri.family(),{})[change.nlri.index()] = change
252
family = change.nlri.family()
253
if family in announced:
254
announced[family].pop(change.nlri.index(),None)
257
for afi,safi in rr_announced:
258
self._enhanced_refresh_start.remove((afi,safi))
259
yield RouteRefresh(afi,safi,RouteRefresh.end)
261
for change in self._enhanced_refresh_delay:
262
self.insert_announced(change,True)
263
self.enhanced_refresh_delay = []
265
for update in self.updates(grouped):