~ubuntu-branches/ubuntu/wily/ryu/wily-proposed

« back to all changes in this revision

Viewing changes to ryu/tests/integrated/test_add_flow_v10.py

  • Committer: Package Import Robot
  • Author(s): Dariusz Dwornikowski
  • Date: 2014-08-18 16:58:52 UTC
  • Revision ID: package-import@ubuntu.com-20140818165852-i0qck3g5mw7rtxt0
Tags: upstream-3.12
ImportĀ upstreamĀ versionĀ 3.12

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
 
2
#
 
3
# Licensed under the Apache License, Version 2.0 (the "License");
 
4
# you may not use this file except in compliance with the License.
 
5
# You may obtain a copy of the License at
 
6
#
 
7
#    http://www.apache.org/licenses/LICENSE-2.0
 
8
#
 
9
# Unless required by applicable law or agreed to in writing, software
 
10
# distributed under the License is distributed on an "AS IS" BASIS,
 
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
12
# implied.
 
13
# See the License for the specific language governing permissions and
 
14
# limitations under the License.
 
15
 
 
16
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
17
 
 
18
import logging
 
19
 
 
20
from ryu.tests.integrated import tester
 
21
from ryu.ofproto import ofproto_v1_0
 
22
from ryu.ofproto import ether
 
23
from ryu.ofproto import nx_match
 
24
 
 
25
LOG = logging.getLogger(__name__)
 
26
 
 
27
 
 
28
class RunTest(tester.TestFlowBase):
 
29
    """ Test case for add flows of1.0
 
30
    """
 
31
    OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION]
 
32
 
 
33
    def __init__(self, *args, **kwargs):
 
34
        super(RunTest, self).__init__(*args, **kwargs)
 
35
        self._verify = []
 
36
 
 
37
    def add_action(self, dp, action):
 
38
        rule = nx_match.ClsRule()
 
39
        self.send_flow_mod(
 
40
            dp, rule, 0, dp.ofproto.OFPFC_ADD, 0, 0, None,
 
41
            0xffffffff, None, dp.ofproto.OFPFF_SEND_FLOW_REM, action)
 
42
 
 
43
    def add_rule(self, dp, rule):
 
44
        self.send_flow_mod(
 
45
            dp, rule, 0, dp.ofproto.OFPFC_ADD, 0, 0, None,
 
46
            0xffffffff, None, dp.ofproto.OFPFF_SEND_FLOW_REM, [])
 
47
 
 
48
    def send_flow_mod(self, dp, rule, cookie, command, idle_timeout,
 
49
                      hard_timeout, priority=None, buffer_id=0xffffffff,
 
50
                      out_port=None, flags=0, actions=None):
 
51
 
 
52
        if priority is None:
 
53
            priority = dp.ofproto.OFP_DEFAULT_PRIORITY
 
54
        if out_port is None:
 
55
            out_port = dp.ofproto.OFPP_NONE
 
56
 
 
57
        match_tuple = rule.match_tuple()
 
58
        match = dp.ofproto_parser.OFPMatch(*match_tuple)
 
59
 
 
60
        m = dp.ofproto_parser.OFPFlowMod(
 
61
            dp, match, cookie, command, idle_timeout, hard_timeout,
 
62
            priority, buffer_id, out_port, flags, actions)
 
63
 
 
64
        dp.send_msg(m)
 
65
 
 
66
    def _verify_action(self, actions, type_, name, value):
 
67
        try:
 
68
            action = actions[0]
 
69
            if action.cls_action_type != type_:
 
70
                return "Action type error. send:%s, val:%s" \
 
71
                    % (type_, action.cls_action_type)
 
72
        except IndexError:
 
73
            return "Action is not setting."
 
74
 
 
75
        f_value = None
 
76
        if name:
 
77
            try:
 
78
                if isinstance(name, list):
 
79
                    f_value = [getattr(action, n) for n in name]
 
80
                else:
 
81
                    f_value = getattr(action, name)
 
82
            except AttributeError:
 
83
                pass
 
84
 
 
85
        if f_value != value:
 
86
            return "Value error. send:%s=%s val:%s" \
 
87
                % (name, value, f_value)
 
88
        return True
 
89
 
 
90
    def _verify_rule(self, rule, name, value):
 
91
        f_value = getattr(rule, name)
 
92
        if f_value != value:
 
93
            return "Value error. send:%s=%s val:%s" \
 
94
                % (name, value, f_value)
 
95
        return True
 
96
 
 
97
    def verify_default(self, dp, stats):
 
98
        verify = self._verify
 
99
        self._verify = []
 
100
        match = stats[0].match
 
101
        actions = stats[0].actions
 
102
 
 
103
        if len(verify) == 2:
 
104
            return self._verify_rule(match, *verify)
 
105
        elif len(verify) == 3:
 
106
            return self._verify_action(actions, *verify)
 
107
        else:
 
108
            return "self._verify is invalid."
 
109
 
 
110
    # Test of Actions
 
111
    def test_action_output(self, dp):
 
112
        out_port = 2
 
113
        self._verify = [dp.ofproto.OFPAT_OUTPUT,
 
114
                        'port', out_port]
 
115
        action = dp.ofproto_parser.OFPActionOutput(out_port)
 
116
        self.add_action(dp, [action, ])
 
117
 
 
118
    def test_rule_set_in_port(self, dp):
 
119
        in_port = 32
 
120
        self._verify = ['in_port', in_port]
 
121
 
 
122
        rule = nx_match.ClsRule()
 
123
        rule.set_in_port(in_port)
 
124
        self.add_rule(dp, rule)
 
125
 
 
126
    def test_action_vlan_vid(self, dp):
 
127
        vlan_vid = 2
 
128
        self._verify = [dp.ofproto.OFPAT_SET_VLAN_VID,
 
129
                        'vlan_vid', vlan_vid]
 
130
        action = dp.ofproto_parser.OFPActionVlanVid(vlan_vid)
 
131
        self.add_action(dp, [action, ])
 
132
 
 
133
    def test_action_vlan_pcp(self, dp):
 
134
        vlan_pcp = 4
 
135
        self._verify = [dp.ofproto.OFPAT_SET_VLAN_PCP,
 
136
                        'vlan_pcp', vlan_pcp]
 
137
        action = dp.ofproto_parser.OFPActionVlanPcp(vlan_pcp)
 
138
        self.add_action(dp, [action, ])
 
139
 
 
140
    def test_action_strip_vlan(self, dp):
 
141
        vlan_pcp = 4
 
142
        self._verify = [dp.ofproto.OFPAT_STRIP_VLAN,
 
143
                        None, None]
 
144
        action = dp.ofproto_parser.OFPActionStripVlan()
 
145
        self.add_action(dp, [action, ])
 
146
 
 
147
    def test_action_set_dl_src(self, dp):
 
148
        dl_src = '56:b3:42:04:b2:7a'
 
149
        dl_src_bin = self.haddr_to_bin(dl_src)
 
150
        self._verify = [dp.ofproto.OFPAT_SET_DL_SRC,
 
151
                        'dl_addr', dl_src_bin]
 
152
        action = dp.ofproto_parser.OFPActionSetDlSrc(dl_src_bin)
 
153
        self.add_action(dp, [action, ])
 
154
 
 
155
    def test_action_set_dl_dst(self, dp):
 
156
        dl_dst = 'c2:93:a2:fb:d0:f4'
 
157
        dl_dst_bin = self.haddr_to_bin(dl_dst)
 
158
        self._verify = [dp.ofproto.OFPAT_SET_DL_DST,
 
159
                        'dl_addr', dl_dst_bin]
 
160
        action = dp.ofproto_parser.OFPActionSetDlDst(dl_dst_bin)
 
161
        self.add_action(dp, [action, ])
 
162
 
 
163
    def test_action_set_nw_src(self, dp):
 
164
        nw_src = '216.132.81.105'
 
165
        nw_src_int = self.ipv4_to_int(nw_src)
 
166
        self._verify = [dp.ofproto.OFPAT_SET_NW_SRC,
 
167
                        'nw_addr', nw_src_int]
 
168
        action = dp.ofproto_parser.OFPActionSetNwSrc(nw_src_int)
 
169
        self.add_action(dp, [action, ])
 
170
 
 
171
    def test_action_set_nw_dst(self, dp):
 
172
        nw_dst = '223.201.206.3'
 
173
        nw_dst_int = self.ipv4_to_int(nw_dst)
 
174
        self._verify = [dp.ofproto.OFPAT_SET_NW_DST,
 
175
                        'nw_addr', nw_dst_int]
 
176
        action = dp.ofproto_parser.OFPActionSetNwDst(nw_dst_int)
 
177
        self.add_action(dp, [action, ])
 
178
 
 
179
    def test_action_set_nw_tos(self, dp):
 
180
        # lowest two bits must be zero
 
181
        nw_tos = 1 << 2
 
182
        self._verify = [dp.ofproto.OFPAT_SET_NW_TOS,
 
183
                        'tos', nw_tos]
 
184
        action = dp.ofproto_parser.OFPActionSetNwTos(nw_tos)
 
185
        self.add_action(dp, [action, ])
 
186
 
 
187
    def test_action_set_tp_src(self, dp):
 
188
        tp_src = 55420
 
189
        self._verify = [dp.ofproto.OFPAT_SET_TP_SRC,
 
190
                        'tp', tp_src]
 
191
        action = dp.ofproto_parser.OFPActionSetTpSrc(tp_src)
 
192
        self.add_action(dp, [action, ])
 
193
 
 
194
    def test_action_set_tp_dst(self, dp):
 
195
        tp_dst = 15430
 
196
        self._verify = [dp.ofproto.OFPAT_SET_TP_DST,
 
197
                        'tp', tp_dst]
 
198
        action = dp.ofproto_parser.OFPActionSetTpDst(tp_dst)
 
199
        self.add_action(dp, [action, ])
 
200
 
 
201
    def test_action_enqueue(self, dp):
 
202
        port = 207
 
203
        queue_id = 4287508753
 
204
        self._verify = [dp.ofproto.OFPAT_ENQUEUE,
 
205
                        ['port', 'queue_id'], [port, queue_id]]
 
206
        action = dp.ofproto_parser.OFPActionEnqueue(port, queue_id)
 
207
        self.add_action(dp, [action, ])
 
208
 
 
209
    # Test of Rules
 
210
    def test_rule_set_in_port(self, dp):
 
211
        in_port = 32
 
212
        self._verify = ['in_port', in_port]
 
213
 
 
214
        rule = nx_match.ClsRule()
 
215
        rule.set_in_port(in_port)
 
216
        self.add_rule(dp, rule)
 
217
 
 
218
    def test_rule_set_dl_src(self, dp):
 
219
        dl_src = 'b8:a1:94:51:78:83'
 
220
        dl_src_bin = self.haddr_to_bin(dl_src)
 
221
        self._verify = ['dl_src', dl_src_bin]
 
222
 
 
223
        rule = nx_match.ClsRule()
 
224
        rule.set_dl_src(dl_src_bin)
 
225
        self.add_rule(dp, rule)
 
226
 
 
227
    def test_rule_set_dl_type_ip(self, dp):
 
228
        dl_type = ether.ETH_TYPE_IP
 
229
        self._verify = ['dl_type', dl_type]
 
230
 
 
231
        rule = nx_match.ClsRule()
 
232
        rule.set_dl_type(dl_type)
 
233
        self.add_rule(dp, rule)
 
234
 
 
235
    def test_rule_set_dl_type_arp(self, dp):
 
236
        dl_type = ether.ETH_TYPE_ARP
 
237
        self._verify = ['dl_type', dl_type]
 
238
 
 
239
        rule = nx_match.ClsRule()
 
240
        rule.set_dl_type(dl_type)
 
241
        self.add_rule(dp, rule)
 
242
 
 
243
    def test_rule_set_dl_type_vlan(self, dp):
 
244
        dl_type = ether.ETH_TYPE_8021Q
 
245
        self._verify = ['dl_type', dl_type]
 
246
 
 
247
        rule = nx_match.ClsRule()
 
248
        rule.set_dl_type(dl_type)
 
249
        self.add_rule(dp, rule)
 
250
 
 
251
    def test_rule_set_dl_type_ipv6(self, dp):
 
252
        dl_type = ether.ETH_TYPE_IPV6
 
253
        self._verify = ['dl_type', dl_type]
 
254
 
 
255
        rule = nx_match.ClsRule()
 
256
        rule.set_dl_type(dl_type)
 
257
        self.add_rule(dp, rule)
 
258
 
 
259
    def test_rule_set_dl_type_lacp(self, dp):
 
260
        dl_type = ether.ETH_TYPE_SLOW
 
261
        self._verify = ['dl_type', dl_type]
 
262
 
 
263
        rule = nx_match.ClsRule()
 
264
        rule.set_dl_type(dl_type)
 
265
        self.add_rule(dp, rule)