1
# Licensed under the Apache License, Version 2.0 (the "License"); you may
2
# not use this file except in compliance with the License. You may obtain
3
# a copy of the License at
5
# http://www.apache.org/licenses/LICENSE-2.0
7
# Unless required by applicable law or agreed to in writing, software
8
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10
# License for the specific language governing permissions and limitations
15
from neutron.agent.common import config as agent_config
16
from neutron.agent.l3 import router_info
17
from neutron.openstack.common import uuidutils
18
from neutron.tests import base
21
_uuid = uuidutils.generate_uuid
24
class TestRouterInfo(base.BaseTestCase):
26
super(TestRouterInfo, self).setUp()
28
conf = agent_config.setup_conf()
29
conf.use_namespaces = True
31
self.ip_cls_p = mock.patch('neutron.agent.linux.ip_lib.IPWrapper')
32
ip_cls = self.ip_cls_p.start()
33
self.mock_ip = mock.MagicMock()
34
ip_cls.return_value = self.mock_ip
35
self.ri_kwargs = {'agent_conf': conf,
36
'interface_driver': mock.sentinel.interface_driver}
38
def _check_agent_method_called(self, calls):
39
self.mock_ip.netns.execute.assert_has_calls(
40
[mock.call(call, check_exit_code=False) for call in calls],
43
def test_routing_table_update(self):
44
ri = router_info.RouterInfo(_uuid(), {}, **self.ri_kwargs)
47
fake_route1 = {'destination': '135.207.0.0/16',
49
fake_route2 = {'destination': '135.207.111.111/32',
52
ri._update_routing_table('replace', fake_route1)
53
expected = [['ip', 'route', 'replace', 'to', '135.207.0.0/16',
55
self._check_agent_method_called(expected)
57
ri._update_routing_table('delete', fake_route1)
58
expected = [['ip', 'route', 'delete', 'to', '135.207.0.0/16',
60
self._check_agent_method_called(expected)
62
ri._update_routing_table('replace', fake_route2)
63
expected = [['ip', 'route', 'replace', 'to', '135.207.111.111/32',
65
self._check_agent_method_called(expected)
67
ri._update_routing_table('delete', fake_route2)
68
expected = [['ip', 'route', 'delete', 'to', '135.207.111.111/32',
70
self._check_agent_method_called(expected)
72
def test_routes_updated(self):
73
ri = router_info.RouterInfo(_uuid(), {}, **self.ri_kwargs)
77
fake_new_routes = [{'destination': "110.100.31.0/24",
78
'nexthop': "10.100.10.30"},
79
{'destination': "110.100.30.0/24",
80
'nexthop': "10.100.10.30"}]
81
ri.routes = fake_old_routes
82
ri.router['routes'] = fake_new_routes
85
expected = [['ip', 'route', 'replace', 'to', '110.100.30.0/24',
86
'via', '10.100.10.30'],
87
['ip', 'route', 'replace', 'to', '110.100.31.0/24',
88
'via', '10.100.10.30']]
90
self._check_agent_method_called(expected)
92
fake_new_routes = [{'destination': "110.100.30.0/24",
93
'nexthop': "10.100.10.30"}]
94
ri.router['routes'] = fake_new_routes
96
expected = [['ip', 'route', 'delete', 'to', '110.100.31.0/24',
97
'via', '10.100.10.30']]
99
self._check_agent_method_called(expected)
101
ri.router['routes'] = fake_new_routes
104
expected = [['ip', 'route', 'delete', 'to', '110.100.30.0/24',
105
'via', '10.100.10.30']]
106
self._check_agent_method_called(expected)