1
# Copyright (c) 2015 Mirantis, Inc.
4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
5
# not use this file except in compliance with the License. You may obtain
6
# a copy of the License at
8
# http://www.apache.org/licenses/LICENSE-2.0
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13
# License for the specific language governing permissions and limitations
15
from oslo_config import cfg
17
from neutron.plugins.linuxbridge.agent import arp_protect
18
from neutron.tests.common import machine_fixtures
19
from neutron.tests.common import net_helpers
20
from neutron.tests.functional import base as functional_base
22
no_arping = net_helpers.assert_no_arping
23
arping = net_helpers.assert_arping
26
class LinuxBridgeARPSpoofTestCase(functional_base.BaseSudoTestCase):
29
super(LinuxBridgeARPSpoofTestCase, self).setUp()
30
cfg.CONF.set_override('prevent_arp_spoofing', True, 'AGENT')
31
lbfixture = self.useFixture(net_helpers.LinuxBridgeFixture())
32
self.addCleanup(setattr, arp_protect, 'NAMESPACE', None)
33
arp_protect.NAMESPACE = lbfixture.namespace
34
bridge = lbfixture.bridge
35
self.source, self.destination, self.observer = self.useFixture(
36
machine_fixtures.PeerMachines(bridge, amount=3)).machines
38
def _add_arp_protection(self, machine, addresses, extra_port_dict=None):
39
port_dict = {'fixed_ips': [{'ip_address': a} for a in addresses]}
41
port_dict.update(extra_port_dict)
42
name = net_helpers.VethFixture.get_peer_name(machine.port.name)
43
arp_protect.setup_arp_spoofing_protection(name, port_dict)
44
self.addCleanup(arp_protect.delete_arp_spoofing_protection,
47
def test_arp_no_protection(self):
48
arping(self.source.namespace, self.destination.ip)
49
arping(self.destination.namespace, self.source.ip)
51
def test_arp_correct_protection(self):
52
self._add_arp_protection(self.source, [self.source.ip])
53
self._add_arp_protection(self.destination, [self.destination.ip])
54
arping(self.source.namespace, self.destination.ip)
55
arping(self.destination.namespace, self.source.ip)
57
def test_arp_fails_incorrect_protection(self):
58
self._add_arp_protection(self.source, ['1.1.1.1'])
59
self._add_arp_protection(self.destination, ['2.2.2.2'])
60
no_arping(self.source.namespace, self.destination.ip)
61
no_arping(self.destination.namespace, self.source.ip)
63
def test_arp_protection_removal(self):
64
self._add_arp_protection(self.source, ['1.1.1.1'])
65
self._add_arp_protection(self.destination, ['2.2.2.2'])
66
no_arping(self.observer.namespace, self.destination.ip)
67
no_arping(self.observer.namespace, self.source.ip)
68
name = net_helpers.VethFixture.get_peer_name(self.source.port.name)
69
arp_protect.delete_arp_spoofing_protection([name])
70
# spoofing should have been removed from source, but not dest
71
arping(self.observer.namespace, self.source.ip)
72
no_arping(self.observer.namespace, self.destination.ip)
74
def test_arp_protection_update(self):
75
self._add_arp_protection(self.source, ['1.1.1.1'])
76
self._add_arp_protection(self.destination, ['2.2.2.2'])
77
no_arping(self.observer.namespace, self.destination.ip)
78
no_arping(self.observer.namespace, self.source.ip)
79
self._add_arp_protection(self.source, ['192.0.0.0/1'])
80
# spoofing should have been updated on source, but not dest
81
arping(self.observer.namespace, self.source.ip)
82
no_arping(self.observer.namespace, self.destination.ip)
84
def test_arp_protection_port_security_disabled(self):
85
self._add_arp_protection(self.source, ['1.1.1.1'])
86
no_arping(self.observer.namespace, self.source.ip)
87
self._add_arp_protection(self.source, ['1.1.1.1'],
88
{'port_security_enabled': False})
89
arping(self.observer.namespace, self.source.ip)
91
def test_arp_protection_dead_reference_removal(self):
92
self._add_arp_protection(self.source, ['1.1.1.1'])
93
self._add_arp_protection(self.destination, ['2.2.2.2'])
94
no_arping(self.observer.namespace, self.destination.ip)
95
no_arping(self.observer.namespace, self.source.ip)
96
name = net_helpers.VethFixture.get_peer_name(self.source.port.name)
97
# This should remove all arp protect rules that aren't source port
98
arp_protect.delete_unreferenced_arp_protection([name])
99
no_arping(self.observer.namespace, self.source.ip)
100
arping(self.observer.namespace, self.destination.ip)