~ubuntu-branches/ubuntu/trusty/quantum/trusty

« back to all changes in this revision

Viewing changes to quantum/plugins/ryu/agent/ryu_quantum_agent.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Adam Gandelman, Chuck Short
  • Date: 2012-11-23 09:43:14 UTC
  • mfrom: (2.1.16)
  • Revision ID: package-import@ubuntu.com-20121123094314-e1tqsulrwe21b9aq
Tags: 2013.1~g1-0ubuntu1
[ Adam Gandelman ]
* debian/patches/*: Refreshed for opening of Grizzly.

[ Chuck Short ]
* New upstream release.
* debian/rules: FTFBS if there is missing binaries.
* debian/quantum-server.install: Add quantum-debug.

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
#    under the License.
21
21
# @author: Isaku Yamahata
22
22
 
23
 
import logging as LOG
 
23
import httplib
 
24
import socket
24
25
import sys
25
 
import time
26
26
 
 
27
import netifaces
 
28
from ryu.app import client
 
29
from ryu.app import conf_switch_key
27
30
from ryu.app import rest_nw_id
28
 
from ryu.app.client import OFPClient
29
31
from sqlalchemy.ext.sqlsoup import SqlSoup
30
32
 
31
33
from quantum.agent.linux import ovs_lib
32
34
from quantum.agent.linux.ovs_lib import VifPort
33
35
from quantum.common import config as logging_config
34
 
from quantum.common import constants
35
36
from quantum.openstack.common import cfg
 
37
from quantum.openstack.common.cfg import NoSuchGroupError
 
38
from quantum.openstack.common.cfg import NoSuchOptError
 
39
from quantum.openstack.common import log as LOG
36
40
from quantum.plugins.ryu.common import config
37
41
 
38
42
 
 
43
# This is copied of nova.flags._get_my_ip()
 
44
# Agent shouldn't depend on nova module
 
45
def _get_my_ip():
 
46
    """
 
47
    Returns the actual ip of the local machine.
 
48
 
 
49
    This code figures out what source address would be used if some traffic
 
50
    were to be sent out to some well known address on the Internet. In this
 
51
    case, a Google DNS server is used, but the specific address does not
 
52
    matter much.  No traffic is actually sent.
 
53
    """
 
54
    csock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 
55
    csock.connect(('8.8.8.8', 80))
 
56
    (addr, _port) = csock.getsockname()
 
57
    csock.close()
 
58
    return addr
 
59
 
 
60
 
 
61
def _get_ip(cfg_ip_str, cfg_interface_str):
 
62
    ip = None
 
63
    try:
 
64
        ip = getattr(cfg.CONF.OVS, cfg_ip_str)
 
65
    except (NoSuchOptError, NoSuchGroupError):
 
66
        pass
 
67
    if ip:
 
68
        return ip
 
69
 
 
70
    iface = None
 
71
    try:
 
72
        iface = getattr(cfg.CONF.OVS, cfg_interface_str)
 
73
    except (NoSuchOptError, NoSuchGroupError):
 
74
        pass
 
75
    if iface:
 
76
        iface = netifaces.ifaddresses(iface)[netifaces.AF_INET][0]
 
77
        return iface['addr']
 
78
 
 
79
    return _get_my_ip()
 
80
 
 
81
 
 
82
def _get_tunnel_ip():
 
83
    return _get_ip('tunnel_ip', 'tunnel_interface')
 
84
 
 
85
 
 
86
def _get_ovsdb_ip():
 
87
    return _get_ip('ovsdb_ip', 'ovsdb_interface')
 
88
 
 
89
 
39
90
class OVSBridge(ovs_lib.OVSBridge):
40
91
    def __init__(self, br_name, root_helper):
41
92
        ovs_lib.OVSBridge.__init__(self, br_name, root_helper)
42
93
        self.datapath_id = None
43
94
 
44
95
    def find_datapath_id(self):
45
 
        # ovs-vsctl get Bridge br-int datapath_id
46
 
        res = self.run_vsctl(["get", "Bridge", self.br_name, "datapath_id"])
47
 
 
48
 
        # remove preceding/trailing double quotes
49
 
        dp_id = res.strip().strip('"')
50
 
        self.datapath_id = dp_id
51
 
 
52
 
    def set_controller(self, target):
53
 
        methods = ("ssl", "tcp", "unix", "pssl", "ptcp", "punix")
54
 
        args = target.split(":")
55
 
        if not args[0] in methods:
56
 
            target = "tcp:" + target
57
 
        self.run_vsctl(["set-controller", self.br_name, target])
58
 
 
59
 
    def _vifport(self, name, external_ids):
60
 
        ofport = self.db_get_val("Interface", name, "ofport")
61
 
        return VifPort(name, ofport, external_ids["iface-id"],
62
 
                       external_ids["attached-mac"], self)
 
96
        self.datapath_id = self.get_datapath_id()
 
97
 
 
98
    def set_manager(self, target):
 
99
        self.run_vsctl(["set-manager", target])
 
100
 
 
101
    def get_ofport(self, name):
 
102
        return self.db_get_val("Interface", name, "ofport")
63
103
 
64
104
    def _get_ports(self, get_port):
65
105
        ports = []
66
106
        port_names = self.get_port_name_list()
67
107
        for name in port_names:
 
108
            if self.get_ofport(name) < 0:
 
109
                continue
68
110
            port = get_port(name)
69
111
            if port:
70
112
                ports.append(port)
71
113
 
72
114
        return ports
73
115
 
74
 
    def _get_vif_port(self, name):
75
 
        external_ids = self.db_get_map("Interface", name, "external_ids")
76
 
        if "iface-id" in external_ids and "attached-mac" in external_ids:
77
 
            return self._vifport(name, external_ids)
78
 
        elif ("xs-vif-uuid" in external_ids and
79
 
              "attached-mac" in external_ids):
80
 
            # if this is a xenserver and iface-id is not automatically
81
 
            # synced to OVS from XAPI, we grab it from XAPI directly
82
 
            ofport = self.db_get_val("Interface", name, "ofport")
83
 
            iface_id = self.get_xapi_iface_id(external_ids["xs-vif-uuid"])
84
 
            return VifPort(name, ofport, iface_id,
85
 
                           external_ids["attached-mac"], self)
86
 
 
87
 
    def get_vif_ports(self):
88
 
        "returns a VIF object for each VIF port"
89
 
        return self._get_ports(self._get_vif_port)
90
 
 
91
116
    def _get_external_port(self, name):
 
117
        # exclude vif ports
92
118
        external_ids = self.db_get_map("Interface", name, "external_ids")
93
119
        if external_ids:
94
120
            return
95
121
 
96
 
        ofport = self.db_get_val("Interface", name, "ofport")
 
122
        # exclude tunnel ports
 
123
        options = self.db_get_map("Interface", name, "options")
 
124
        if "remote_ip" in options:
 
125
            return
 
126
 
 
127
        ofport = self.get_ofport(name)
97
128
        return VifPort(name, ofport, None, None, self)
98
129
 
99
130
    def get_external_ports(self):
100
131
        return self._get_ports(self._get_external_port)
101
132
 
102
133
 
103
 
def check_ofp_mode(db):
 
134
class VifPortSet(object):
 
135
    def __init__(self, int_br, ryu_rest_client):
 
136
        super(VifPortSet, self).__init__()
 
137
        self.int_br = int_br
 
138
        self.api = ryu_rest_client
 
139
 
 
140
    def setup(self):
 
141
        for port in self.int_br.get_external_ports():
 
142
            LOG.debug(_('external port %s'), port)
 
143
            self.api.update_port(rest_nw_id.NW_ID_EXTERNAL,
 
144
                                 port.switch.datapath_id, port.ofport)
 
145
 
 
146
 
 
147
class OVSQuantumOFPRyuAgent(object):
 
148
    def __init__(self, integ_br, ofp_rest_api_addr,
 
149
                 tunnel_ip, ovsdb_ip, ovsdb_port,
 
150
                 root_helper):
 
151
        super(OVSQuantumOFPRyuAgent, self).__init__()
 
152
        self.int_br = None
 
153
        self.vif_ports = None
 
154
        self._setup_integration_br(root_helper, integ_br,
 
155
                                   ofp_rest_api_addr,
 
156
                                   tunnel_ip, ovsdb_port, ovsdb_ip)
 
157
 
 
158
    def _setup_integration_br(self, root_helper, integ_br,
 
159
                              ofp_rest_api_addr,
 
160
                              tunnel_ip, ovsdb_port, ovsdb_ip):
 
161
        self.int_br = OVSBridge(integ_br, root_helper)
 
162
        self.int_br.find_datapath_id()
 
163
 
 
164
        ryu_rest_client = client.OFPClient(ofp_rest_api_addr)
 
165
 
 
166
        self.vif_ports = VifPortSet(self.int_br, ryu_rest_client)
 
167
        self.vif_ports.setup()
 
168
 
 
169
        sc_client = client.SwitchConfClient(ofp_rest_api_addr)
 
170
        sc_client.set_key(self.int_br.datapath_id,
 
171
                          conf_switch_key.OVS_TUNNEL_ADDR, tunnel_ip)
 
172
 
 
173
        # Currently Ryu supports only tcp methods. (ssl isn't supported yet)
 
174
        self.int_br.set_manager('ptcp:%d' % ovsdb_port)
 
175
        sc_client.set_key(self.int_br.datapath_id, conf_switch_key.OVSDB_ADDR,
 
176
                          'tcp:%s:%d' % (ovsdb_ip, ovsdb_port))
 
177
 
 
178
 
 
179
def check_ofp_rest_api_addr(db):
104
180
    LOG.debug("checking db")
105
181
 
106
182
    servers = db.ofp_server.all()
113
189
        elif serv.host_type == "controller":
114
190
            ofp_controller_addr = serv.address
115
191
        else:
116
 
            LOG.warn("ignoring unknown server type %s", serv)
 
192
            LOG.warn(_("ignoring unknown server type %s"), serv)
117
193
 
118
 
    LOG.debug("controller %s", ofp_controller_addr)
119
194
    LOG.debug("api %s", ofp_rest_api_addr)
120
 
    if not ofp_controller_addr:
121
 
        raise RuntimeError("OF controller isn't specified")
 
195
    if ofp_controller_addr:
 
196
        LOG.warn(_('OF controller parameter is stale %s'), ofp_controller_addr)
122
197
    if not ofp_rest_api_addr:
123
 
        raise RuntimeError("Ryu rest API port isn't specified")
124
 
 
125
 
    LOG.debug("going to ofp controller mode %s %s",
126
 
              ofp_controller_addr, ofp_rest_api_addr)
127
 
    return (ofp_controller_addr, ofp_rest_api_addr)
128
 
 
129
 
 
130
 
class OVSQuantumOFPRyuAgent:
131
 
    def __init__(self, integ_br, db, root_helper):
132
 
        self.root_helper = root_helper
133
 
        (ofp_controller_addr, ofp_rest_api_addr) = check_ofp_mode(db)
134
 
 
135
 
        self.nw_id_external = rest_nw_id.NW_ID_EXTERNAL
136
 
        self.api = OFPClient(ofp_rest_api_addr)
137
 
        self._setup_integration_br(integ_br, ofp_controller_addr)
138
 
 
139
 
    def _setup_integration_br(self, integ_br, ofp_controller_addr):
140
 
        self.int_br = OVSBridge(integ_br, self.root_helper)
141
 
        self.int_br.find_datapath_id()
142
 
        self.int_br.set_controller(ofp_controller_addr)
143
 
        for port in self.int_br.get_external_ports():
144
 
            self._port_update(self.nw_id_external, port)
145
 
 
146
 
    def _port_update(self, network_id, port):
147
 
        self.api.update_port(network_id, port.switch.datapath_id, port.ofport)
148
 
 
149
 
    def _all_bindings(self, db):
150
 
        """return interface id -> port which include network id bindings"""
151
 
        return dict((port.id, port) for port in db.ports.all())
152
 
 
153
 
    def _set_port_status(self, port, status):
154
 
        port.status = status
155
 
 
156
 
    def daemon_loop(self, db):
157
 
        # on startup, register all existing ports
158
 
        all_bindings = self._all_bindings(db)
159
 
 
160
 
        local_bindings = {}
161
 
        vif_ports = {}
162
 
        for port in self.int_br.get_vif_ports():
163
 
            vif_ports[port.vif_id] = port
164
 
            if port.vif_id in all_bindings:
165
 
                net_id = all_bindings[port.vif_id].network_id
166
 
                local_bindings[port.vif_id] = net_id
167
 
                self._port_update(net_id, port)
168
 
                self._set_port_status(all_bindings[port.vif_id],
169
 
                                      constants.PORT_STATUS_ACTIVE)
170
 
                LOG.info("Updating binding to net-id = %s for %s",
171
 
                         net_id, str(port))
172
 
        db.commit()
173
 
 
174
 
        old_vif_ports = vif_ports
175
 
        old_local_bindings = local_bindings
176
 
 
177
 
        while True:
178
 
            all_bindings = self._all_bindings(db)
179
 
 
180
 
            new_vif_ports = {}
181
 
            new_local_bindings = {}
182
 
            for port in self.int_br.get_vif_ports():
183
 
                new_vif_ports[port.vif_id] = port
184
 
                if port.vif_id in all_bindings:
185
 
                    net_id = all_bindings[port.vif_id].network_id
186
 
                    new_local_bindings[port.vif_id] = net_id
187
 
 
188
 
                old_b = old_local_bindings.get(port.vif_id)
189
 
                new_b = new_local_bindings.get(port.vif_id)
190
 
                if old_b == new_b:
191
 
                    continue
192
 
 
193
 
                if old_b:
194
 
                    LOG.info("Removing binding to net-id = %s for %s",
195
 
                             old_b, str(port))
196
 
                    if port.vif_id in all_bindings:
197
 
                        self._set_port_status(all_bindings[port.vif_id],
198
 
                                              constants.PORT_STATUS_DOWN)
199
 
                if new_b:
200
 
                    if port.vif_id in all_bindings:
201
 
                        self._set_port_status(all_bindings[port.vif_id],
202
 
                                              constants.PORT_STATUS_ACTIVE)
203
 
                    LOG.info("Adding binding to net-id = %s for %s",
204
 
                             new_b, str(port))
205
 
 
206
 
            for vif_id in old_vif_ports:
207
 
                if vif_id not in new_vif_ports:
208
 
                    LOG.info("Port Disappeared: %s", vif_id)
209
 
                    if vif_id in all_bindings:
210
 
                        self._set_port_status(all_bindings[port.vif_id],
211
 
                                              constants.PORT_STATUS_DOWN)
212
 
 
213
 
            old_vif_ports = new_vif_ports
214
 
            old_local_bindings = new_local_bindings
215
 
            db.commit()
216
 
            time.sleep(2)
 
198
        raise RuntimeError(_("Ryu rest API port isn't specified"))
 
199
 
 
200
    LOG.debug(_("going to ofp controller mode %s"), ofp_rest_api_addr)
 
201
    return ofp_rest_api_addr
217
202
 
218
203
 
219
204
def main():
227
212
    options = {"sql_connection": cfg.CONF.DATABASE.sql_connection}
228
213
    db = SqlSoup(options["sql_connection"])
229
214
 
230
 
    LOG.info("Connecting to database \"%s\" on %s",
231
 
             db.engine.url.database, db.engine.url.host)
232
 
    plugin = OVSQuantumOFPRyuAgent(integ_br, db, root_helper)
233
 
    plugin.daemon_loop(db)
234
 
 
 
215
    LOG.info(_("Connecting to database \"%(database)s\" on %(host)s") %
 
216
             {"database": db.engine.url.database,
 
217
              "host": db.engine.url.host})
 
218
    ofp_rest_api_addr = check_ofp_rest_api_addr(db)
 
219
 
 
220
    tunnel_ip = _get_tunnel_ip()
 
221
    LOG.debug(_('tunnel_ip %s'), tunnel_ip)
 
222
    ovsdb_port = cfg.CONF.OVS.ovsdb_port
 
223
    LOG.debug(_('ovsdb_port %s'), ovsdb_port)
 
224
    ovsdb_ip = _get_ovsdb_ip()
 
225
    LOG.debug(_('ovsdb_ip %s'), ovsdb_ip)
 
226
    try:
 
227
        OVSQuantumOFPRyuAgent(integ_br, ofp_rest_api_addr,
 
228
                              tunnel_ip, ovsdb_ip, ovsdb_port, root_helper)
 
229
    except httplib.HTTPException, e:
 
230
        LOG.error(_("initialization failed: %s"), e)
 
231
        sys.exit(1)
 
232
 
 
233
    LOG.info(_("Ryu initialization on the node is done."
 
234
               " Now Ryu agent exits successfully."))
235
235
    sys.exit(0)
236
236
 
237
237