1
# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
16
# vim: tabstop=4 shiftwidth=4 softtabstop=4
20
from ryu.tests.integrated import tester
21
from ryu.ofproto import ofproto_v1_0
22
from ryu.ofproto import ether
23
from ryu.ofproto import nx_match
25
LOG = logging.getLogger(__name__)
28
class RunTest(tester.TestFlowBase):
29
""" Test case for add flows of1.0
31
OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION]
33
def __init__(self, *args, **kwargs):
34
super(RunTest, self).__init__(*args, **kwargs)
37
def add_action(self, dp, action):
38
rule = nx_match.ClsRule()
40
dp, rule, 0, dp.ofproto.OFPFC_ADD, 0, 0, None,
41
0xffffffff, None, dp.ofproto.OFPFF_SEND_FLOW_REM, action)
43
def add_rule(self, dp, rule):
45
dp, rule, 0, dp.ofproto.OFPFC_ADD, 0, 0, None,
46
0xffffffff, None, dp.ofproto.OFPFF_SEND_FLOW_REM, [])
48
def send_flow_mod(self, dp, rule, cookie, command, idle_timeout,
49
hard_timeout, priority=None, buffer_id=0xffffffff,
50
out_port=None, flags=0, actions=None):
53
priority = dp.ofproto.OFP_DEFAULT_PRIORITY
55
out_port = dp.ofproto.OFPP_NONE
57
match_tuple = rule.match_tuple()
58
match = dp.ofproto_parser.OFPMatch(*match_tuple)
60
m = dp.ofproto_parser.OFPFlowMod(
61
dp, match, cookie, command, idle_timeout, hard_timeout,
62
priority, buffer_id, out_port, flags, actions)
66
def _verify_action(self, actions, type_, name, value):
69
if action.cls_action_type != type_:
70
return "Action type error. send:%s, val:%s" \
71
% (type_, action.cls_action_type)
73
return "Action is not setting."
78
if isinstance(name, list):
79
f_value = [getattr(action, n) for n in name]
81
f_value = getattr(action, name)
82
except AttributeError:
86
return "Value error. send:%s=%s val:%s" \
87
% (name, value, f_value)
90
def _verify_rule(self, rule, name, value):
91
f_value = getattr(rule, name)
93
return "Value error. send:%s=%s val:%s" \
94
% (name, value, f_value)
97
def verify_default(self, dp, stats):
100
match = stats[0].match
101
actions = stats[0].actions
104
return self._verify_rule(match, *verify)
105
elif len(verify) == 3:
106
return self._verify_action(actions, *verify)
108
return "self._verify is invalid."
111
def test_action_output(self, dp):
113
self._verify = [dp.ofproto.OFPAT_OUTPUT,
115
action = dp.ofproto_parser.OFPActionOutput(out_port)
116
self.add_action(dp, [action, ])
118
def test_rule_set_in_port(self, dp):
120
self._verify = ['in_port', in_port]
122
rule = nx_match.ClsRule()
123
rule.set_in_port(in_port)
124
self.add_rule(dp, rule)
126
def test_action_vlan_vid(self, dp):
128
self._verify = [dp.ofproto.OFPAT_SET_VLAN_VID,
129
'vlan_vid', vlan_vid]
130
action = dp.ofproto_parser.OFPActionVlanVid(vlan_vid)
131
self.add_action(dp, [action, ])
133
def test_action_vlan_pcp(self, dp):
135
self._verify = [dp.ofproto.OFPAT_SET_VLAN_PCP,
136
'vlan_pcp', vlan_pcp]
137
action = dp.ofproto_parser.OFPActionVlanPcp(vlan_pcp)
138
self.add_action(dp, [action, ])
140
def test_action_strip_vlan(self, dp):
142
self._verify = [dp.ofproto.OFPAT_STRIP_VLAN,
144
action = dp.ofproto_parser.OFPActionStripVlan()
145
self.add_action(dp, [action, ])
147
def test_action_set_dl_src(self, dp):
148
dl_src = '56:b3:42:04:b2:7a'
149
dl_src_bin = self.haddr_to_bin(dl_src)
150
self._verify = [dp.ofproto.OFPAT_SET_DL_SRC,
151
'dl_addr', dl_src_bin]
152
action = dp.ofproto_parser.OFPActionSetDlSrc(dl_src_bin)
153
self.add_action(dp, [action, ])
155
def test_action_set_dl_dst(self, dp):
156
dl_dst = 'c2:93:a2:fb:d0:f4'
157
dl_dst_bin = self.haddr_to_bin(dl_dst)
158
self._verify = [dp.ofproto.OFPAT_SET_DL_DST,
159
'dl_addr', dl_dst_bin]
160
action = dp.ofproto_parser.OFPActionSetDlDst(dl_dst_bin)
161
self.add_action(dp, [action, ])
163
def test_action_set_nw_src(self, dp):
164
nw_src = '216.132.81.105'
165
nw_src_int = self.ipv4_to_int(nw_src)
166
self._verify = [dp.ofproto.OFPAT_SET_NW_SRC,
167
'nw_addr', nw_src_int]
168
action = dp.ofproto_parser.OFPActionSetNwSrc(nw_src_int)
169
self.add_action(dp, [action, ])
171
def test_action_set_nw_dst(self, dp):
172
nw_dst = '223.201.206.3'
173
nw_dst_int = self.ipv4_to_int(nw_dst)
174
self._verify = [dp.ofproto.OFPAT_SET_NW_DST,
175
'nw_addr', nw_dst_int]
176
action = dp.ofproto_parser.OFPActionSetNwDst(nw_dst_int)
177
self.add_action(dp, [action, ])
179
def test_action_set_nw_tos(self, dp):
180
# lowest two bits must be zero
182
self._verify = [dp.ofproto.OFPAT_SET_NW_TOS,
184
action = dp.ofproto_parser.OFPActionSetNwTos(nw_tos)
185
self.add_action(dp, [action, ])
187
def test_action_set_tp_src(self, dp):
189
self._verify = [dp.ofproto.OFPAT_SET_TP_SRC,
191
action = dp.ofproto_parser.OFPActionSetTpSrc(tp_src)
192
self.add_action(dp, [action, ])
194
def test_action_set_tp_dst(self, dp):
196
self._verify = [dp.ofproto.OFPAT_SET_TP_DST,
198
action = dp.ofproto_parser.OFPActionSetTpDst(tp_dst)
199
self.add_action(dp, [action, ])
201
def test_action_enqueue(self, dp):
203
queue_id = 4287508753
204
self._verify = [dp.ofproto.OFPAT_ENQUEUE,
205
['port', 'queue_id'], [port, queue_id]]
206
action = dp.ofproto_parser.OFPActionEnqueue(port, queue_id)
207
self.add_action(dp, [action, ])
210
def test_rule_set_in_port(self, dp):
212
self._verify = ['in_port', in_port]
214
rule = nx_match.ClsRule()
215
rule.set_in_port(in_port)
216
self.add_rule(dp, rule)
218
def test_rule_set_dl_src(self, dp):
219
dl_src = 'b8:a1:94:51:78:83'
220
dl_src_bin = self.haddr_to_bin(dl_src)
221
self._verify = ['dl_src', dl_src_bin]
223
rule = nx_match.ClsRule()
224
rule.set_dl_src(dl_src_bin)
225
self.add_rule(dp, rule)
227
def test_rule_set_dl_type_ip(self, dp):
228
dl_type = ether.ETH_TYPE_IP
229
self._verify = ['dl_type', dl_type]
231
rule = nx_match.ClsRule()
232
rule.set_dl_type(dl_type)
233
self.add_rule(dp, rule)
235
def test_rule_set_dl_type_arp(self, dp):
236
dl_type = ether.ETH_TYPE_ARP
237
self._verify = ['dl_type', dl_type]
239
rule = nx_match.ClsRule()
240
rule.set_dl_type(dl_type)
241
self.add_rule(dp, rule)
243
def test_rule_set_dl_type_vlan(self, dp):
244
dl_type = ether.ETH_TYPE_8021Q
245
self._verify = ['dl_type', dl_type]
247
rule = nx_match.ClsRule()
248
rule.set_dl_type(dl_type)
249
self.add_rule(dp, rule)
251
def test_rule_set_dl_type_ipv6(self, dp):
252
dl_type = ether.ETH_TYPE_IPV6
253
self._verify = ['dl_type', dl_type]
255
rule = nx_match.ClsRule()
256
rule.set_dl_type(dl_type)
257
self.add_rule(dp, rule)
259
def test_rule_set_dl_type_lacp(self, dp):
260
dl_type = ether.ETH_TYPE_SLOW
261
self._verify = ['dl_type', dl_type]
263
rule = nx_match.ClsRule()
264
rule.set_dl_type(dl_type)
265
self.add_rule(dp, rule)