~ubuntu-branches/ubuntu/utopic/exabgp/utopic

« back to all changes in this revision

Viewing changes to lib/exabgp/rib/store.py

  • Committer: Package Import Robot
  • Author(s): Henry-Nicolas Tourneur
  • Date: 2014-03-08 19:07:00 UTC
  • mfrom: (1.1.8)
  • Revision ID: package-import@ubuntu.com-20140308190700-xjbibpg1g6001c9x
Tags: 3.3.1-1
* New upstream release
* Bump python minimal required version (2.7)
* Closes: #726066 Debian packaging improvements proposed by Vincent Bernat
* Closes: #703774 not existent rundir (/var/run/exabgp) after reboot

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# encoding: utf-8
 
2
"""
 
3
store.py
 
4
 
 
5
Created by Thomas Mangin on 2009-11-05.
 
6
Copyright (c) 2009-2013 Exa Networks. All rights reserved.
 
7
"""
 
8
 
 
9
from exabgp.bgp.message.direction import IN,OUT
 
10
from exabgp.bgp.message.update import Update
 
11
from exabgp.bgp.message.refresh import RouteRefresh
 
12
 
 
13
# XXX: FIXME: we would not have to use so many setdefault if we pre-filled the dicts with the families
 
14
 
 
15
class Store (object):
 
16
        def __init__ (self,families):
 
17
                # XXX: FIXME: we can decide to not cache the routes we seen and let the backend do it for us and save the memory
 
18
                self._watchdog = {}
 
19
                self.cache = False
 
20
                self.families = families
 
21
                self.clear()
 
22
 
 
23
        # will resend all the routes once we reconnect
 
24
        def reset (self):
 
25
                # WARNING : this function can run while we are in the updates() loop too !
 
26
                self._enhanced_refresh_start = []
 
27
                self._enhanced_refresh_delay = []
 
28
                for update in self.updates(False): pass
 
29
 
 
30
        # back to square one, all the routes are removed
 
31
        def clear (self):
 
32
                self._cache_attribute = {}
 
33
                self._seen = {}
 
34
                self._modify_nlri = {}
 
35
                self._modify_sorted = {}
 
36
                self._changes = None
 
37
                self.reset()
 
38
 
 
39
        def sent_changes (self,families=None):
 
40
                # families can be None or []
 
41
                requested_families = self.families if not families else set(families).intersection(self.families)
 
42
 
 
43
                # we use list() to make a snapshot of the data at the time we run the command
 
44
                for family in requested_families:
 
45
                        for change in self._seen.get(family,{}).values():
 
46
                                if change.nlri.action == OUT.announce:
 
47
                                        yield change
 
48
 
 
49
        def resend (self,families,enhanced_refresh):
 
50
                # families can be None or []
 
51
                requested_families = self.families if not families else set(families).intersection(self.families)
 
52
 
 
53
                def _announced (family):
 
54
                        for change in self._seen.get(family,{}).values():
 
55
                                if change.nlri.action == OUT.announce:
 
56
                                        yield change
 
57
                        self._seen[family] = {}
 
58
 
 
59
                if enhanced_refresh:
 
60
                        for family in requested_families:
 
61
                                if family not in self._enhanced_refresh_start:
 
62
                                        self._enhanced_refresh_start.append(family)
 
63
                                        for change in _announced(family):
 
64
                                                self.insert_announced(change,True)
 
65
                else:
 
66
                        for family in requested_families:
 
67
                                for change in _announced(family):
 
68
                                        self.insert_announced(change,True)
 
69
 
 
70
        # def dump (self):
 
71
        #       # This function returns a hash and not a list as "in" tests are O(n) with lists and O(1) with hash
 
72
        #       # and with ten thousands routes this makes an enormous difference (60 seconds to 2)
 
73
        #       changes = {}
 
74
        #       for family in self._seen.keys():
 
75
        #               for change in self._seen[family].values():
 
76
        #                       if change.nlri.action == OUT.announce:
 
77
        #                               changes[change.index()] = change
 
78
        #       return changes
 
79
 
 
80
        def queued_changes (self):
 
81
                for change in self._modify_nlri.values():
 
82
                        yield change
 
83
 
 
84
        def update (self,changes):
 
85
                self._changes = changes
 
86
 
 
87
        def insert_announced_watchdog (self,change):
 
88
                watchdog = change.attributes.watchdog()
 
89
                withdraw = change.attributes.withdraw()
 
90
                if watchdog:
 
91
                        if withdraw:
 
92
                                self._watchdog.setdefault(watchdog,{}).setdefault('-',{})[change.nlri.index()] = change
 
93
                                return True
 
94
                        self._watchdog.setdefault(watchdog,{}).setdefault('+',{})[change.nlri.index()] = change
 
95
                self.insert_announced(change)
 
96
                return True
 
97
 
 
98
        def announce_watchdog (self,watchdog):
 
99
                if watchdog in self._watchdog:
 
100
                        for change in self._watchdog[watchdog].get('-',{}).values():
 
101
                                change.nlri.action = OUT.announce
 
102
                                self.insert_announced(change)
 
103
                                self._watchdog[watchdog].setdefault('+',{})[change.nlri.index()] = change
 
104
                                self._watchdog[watchdog]['-'].pop(change.nlri.index())
 
105
 
 
106
        def withdraw_watchdog (self,watchdog):
 
107
                if watchdog in self._watchdog:
 
108
                        for change in self._watchdog[watchdog].get('+',{}).values():
 
109
                                change.nlri.action = OUT.withdraw
 
110
                                self.insert_announced(change)
 
111
                                self._watchdog[watchdog].setdefault('-',{})[change.nlri.index()] = change
 
112
                                self._watchdog[watchdog]['+'].pop(change.nlri.index())
 
113
 
 
114
        def insert_received (self,change):
 
115
                if not self.cache:
 
116
                        return
 
117
                elif change.nlri.action == IN.announced:
 
118
                        self._seen[change.nlri.index()] = change
 
119
                else:
 
120
                        self._seen.pop(change.nlri.index(),None)
 
121
 
 
122
        def insert_announced (self,change,force=False):
 
123
                # WARNING : this function can run while we are in the updates() loop
 
124
 
 
125
                # self._seen[family][nlri-index] = change
 
126
 
 
127
                # XXX: FIXME: if we fear a conflict of nlri-index between family (very very unlikely)
 
128
                # XXX: FIXME: then we should preprend the index() with the AFI and SAFI
 
129
 
 
130
                # self._modify_nlri[nlri-index] = change : we are modifying this nlri
 
131
                # self._modify_sorted[attr-index][nlri-index] = change : add or remove the nlri
 
132
                # self._cache_attribute[attr-index] = change
 
133
                # and it allow to overwrite change easily :-)
 
134
 
 
135
                # import traceback
 
136
                # traceback.print_stack()
 
137
                # print "inserting", change.extensive()
 
138
 
 
139
                if not force and self._enhanced_refresh_start:
 
140
                        self._enhanced_refresh_delay.append(change)
 
141
                        return
 
142
 
 
143
                change_nlri_index = change.nlri.index()
 
144
                change_attr_index = change.attributes.index()
 
145
 
 
146
                dict_sorted = self._modify_sorted
 
147
                dict_nlri = self._modify_nlri
 
148
                dict_attr = self._cache_attribute
 
149
 
 
150
                # removing a route befone we had time to announe it ?
 
151
                if change_nlri_index in dict_nlri:
 
152
                        old_attr_index = dict_nlri[change_nlri_index].attributes.index()
 
153
                        # pop removes the entry
 
154
                        old_change = dict_nlri.pop(change_nlri_index)
 
155
                        # do not delete dict_attr, other routes may use it
 
156
                        del dict_sorted[old_attr_index][change_nlri_index]
 
157
                        if not dict_sorted[old_attr_index]:
 
158
                                del dict_sorted[old_attr_index]
 
159
                        # route removed before announcement, all goo
 
160
                        if old_change.nlri.action == OUT.announce and change.nlri.action == OUT.withdraw:
 
161
                                # if we cache sent NLRI and this NLRI was never sent before, we do not need to send a withdrawal
 
162
                                if self.cache and change_nlri_index not in self._seen.get(change.nlri.family(),{}):
 
163
                                        return
 
164
 
 
165
                # add the route to the list to be announced
 
166
                dict_sorted.setdefault(change_attr_index,{})[change_nlri_index] = change
 
167
                dict_nlri[change_nlri_index] = change
 
168
                if change_attr_index not in dict_attr:
 
169
                        dict_attr[change_attr_index] = change
 
170
 
 
171
 
 
172
        def updates (self,grouped):
 
173
                if self._changes:
 
174
                        dict_nlri = self._modify_nlri
 
175
 
 
176
                        for family in self._seen:
 
177
                                for change in self._seen[family].itervalues():
 
178
                                        if change.index() not in self._modify_nlri:
 
179
                                                change.nlri.action = OUT.withdraw
 
180
                                                self.insert_announced(change,True)
 
181
 
 
182
                        for new in self._changes:
 
183
                                self.insert_announced(new,True)
 
184
                        self._changes = None
 
185
                # end of changes
 
186
 
 
187
                rr_announced = []
 
188
 
 
189
                for afi,safi in self._enhanced_refresh_start:
 
190
                        rr_announced.append((afi,safi))
 
191
                        yield RouteRefresh(afi,safi,RouteRefresh.start)
 
192
 
 
193
                dict_sorted = self._modify_sorted
 
194
                dict_nlri = self._modify_nlri
 
195
                dict_attr = self._cache_attribute
 
196
 
 
197
                for attr_index,full_dict_change in dict_sorted.items():
 
198
                        if self.cache:
 
199
                                dict_change = {}
 
200
                                for nlri_index,change in full_dict_change.iteritems():
 
201
                                        family = change.nlri.family()
 
202
                                        announced = self._seen.get(family,{})
 
203
                                        if change.nlri.action == OUT.announce:
 
204
                                                if nlri_index in announced:
 
205
                                                        old_change = announced[nlri_index]
 
206
                                                        # it is a duplicate route
 
207
                                                        if old_change.attributes.index() == change.attributes.index():
 
208
                                                                continue
 
209
                                        elif change.nlri.action == OUT.withdraw:
 
210
                                                if nlri_index not in announced:
 
211
                                                        if dict_nlri[nlri_index].nlri.action == OUT.announce:
 
212
                                                                continue
 
213
                                        dict_change[nlri_index] = change
 
214
                        else:
 
215
                                dict_change = full_dict_change
 
216
 
 
217
                        if not dict_change:
 
218
                                continue
 
219
 
 
220
                        attributes = dict_attr[attr_index].attributes
 
221
 
 
222
                        # we NEED the copy provided by list() here as insert_announced can be called while we iterate
 
223
                        changed = list(dict_change.itervalues())
 
224
 
 
225
                        if grouped:
 
226
                                update = Update([dict_nlri[nlri_index].nlri for nlri_index in dict_change],attributes)
 
227
                                for change in changed:
 
228
                                        nlri_index = change.nlri.index()
 
229
                                        del dict_sorted[attr_index][nlri_index]
 
230
                                        del dict_nlri[nlri_index]
 
231
                                # only yield once we have a consistent state, otherwise it will go wrong
 
232
                                # as we will try to modify things we are using
 
233
                                yield update
 
234
                        else:
 
235
                                updates = []
 
236
                                for change in changed:
 
237
                                        updates.append(Update([change.nlri,],attributes))
 
238
                                        nlri_index = change.nlri.index()
 
239
                                        del dict_sorted[attr_index][nlri_index]
 
240
                                        del dict_nlri[nlri_index]
 
241
                                # only yield once we have a consistent state, otherwise it will go wrong
 
242
                                # as we will try to modify things we are using
 
243
                                for update in updates:
 
244
                                        yield update
 
245
 
 
246
                        if self.cache:
 
247
                                announced = self._seen
 
248
                                for change in changed:
 
249
                                        if change.nlri.action == OUT.announce:
 
250
                                                announced.setdefault(change.nlri.family(),{})[change.nlri.index()] = change
 
251
                                        else:
 
252
                                                family = change.nlri.family()
 
253
                                                if family in announced:
 
254
                                                        announced[family].pop(change.nlri.index(),None)
 
255
 
 
256
                if rr_announced:
 
257
                        for afi,safi in rr_announced:
 
258
                                self._enhanced_refresh_start.remove((afi,safi))
 
259
                                yield RouteRefresh(afi,safi,RouteRefresh.end)
 
260
 
 
261
                        for change in self._enhanced_refresh_delay:
 
262
                                self.insert_announced(change,True)
 
263
                        self.enhanced_refresh_delay = []
 
264
 
 
265
                        for update in self.updates(grouped):
 
266
                                yield update