20
20
# under the License.
21
21
# @author: Isaku Yamahata
28
from ryu.app import client
29
from ryu.app import conf_switch_key
27
30
from ryu.app import rest_nw_id
28
from ryu.app.client import OFPClient
29
31
from sqlalchemy.ext.sqlsoup import SqlSoup
31
33
from quantum.agent.linux import ovs_lib
32
34
from quantum.agent.linux.ovs_lib import VifPort
33
35
from quantum.common import config as logging_config
34
from quantum.common import constants
35
36
from quantum.openstack.common import cfg
37
from quantum.openstack.common.cfg import NoSuchGroupError
38
from quantum.openstack.common.cfg import NoSuchOptError
39
from quantum.openstack.common import log as LOG
36
40
from quantum.plugins.ryu.common import config
43
# This is copied of nova.flags._get_my_ip()
44
# Agent shouldn't depend on nova module
47
Returns the actual ip of the local machine.
49
This code figures out what source address would be used if some traffic
50
were to be sent out to some well known address on the Internet. In this
51
case, a Google DNS server is used, but the specific address does not
52
matter much. No traffic is actually sent.
54
csock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
55
csock.connect(('8.8.8.8', 80))
56
(addr, _port) = csock.getsockname()
61
def _get_ip(cfg_ip_str, cfg_interface_str):
64
ip = getattr(cfg.CONF.OVS, cfg_ip_str)
65
except (NoSuchOptError, NoSuchGroupError):
72
iface = getattr(cfg.CONF.OVS, cfg_interface_str)
73
except (NoSuchOptError, NoSuchGroupError):
76
iface = netifaces.ifaddresses(iface)[netifaces.AF_INET][0]
83
return _get_ip('tunnel_ip', 'tunnel_interface')
87
return _get_ip('ovsdb_ip', 'ovsdb_interface')
39
90
class OVSBridge(ovs_lib.OVSBridge):
40
91
def __init__(self, br_name, root_helper):
41
92
ovs_lib.OVSBridge.__init__(self, br_name, root_helper)
42
93
self.datapath_id = None
44
95
def find_datapath_id(self):
45
# ovs-vsctl get Bridge br-int datapath_id
46
res = self.run_vsctl(["get", "Bridge", self.br_name, "datapath_id"])
48
# remove preceding/trailing double quotes
49
dp_id = res.strip().strip('"')
50
self.datapath_id = dp_id
52
def set_controller(self, target):
53
methods = ("ssl", "tcp", "unix", "pssl", "ptcp", "punix")
54
args = target.split(":")
55
if not args[0] in methods:
56
target = "tcp:" + target
57
self.run_vsctl(["set-controller", self.br_name, target])
59
def _vifport(self, name, external_ids):
60
ofport = self.db_get_val("Interface", name, "ofport")
61
return VifPort(name, ofport, external_ids["iface-id"],
62
external_ids["attached-mac"], self)
96
self.datapath_id = self.get_datapath_id()
98
def set_manager(self, target):
99
self.run_vsctl(["set-manager", target])
101
def get_ofport(self, name):
102
return self.db_get_val("Interface", name, "ofport")
64
104
def _get_ports(self, get_port):
66
106
port_names = self.get_port_name_list()
67
107
for name in port_names:
108
if self.get_ofport(name) < 0:
68
110
port = get_port(name)
70
112
ports.append(port)
74
def _get_vif_port(self, name):
75
external_ids = self.db_get_map("Interface", name, "external_ids")
76
if "iface-id" in external_ids and "attached-mac" in external_ids:
77
return self._vifport(name, external_ids)
78
elif ("xs-vif-uuid" in external_ids and
79
"attached-mac" in external_ids):
80
# if this is a xenserver and iface-id is not automatically
81
# synced to OVS from XAPI, we grab it from XAPI directly
82
ofport = self.db_get_val("Interface", name, "ofport")
83
iface_id = self.get_xapi_iface_id(external_ids["xs-vif-uuid"])
84
return VifPort(name, ofport, iface_id,
85
external_ids["attached-mac"], self)
87
def get_vif_ports(self):
88
"returns a VIF object for each VIF port"
89
return self._get_ports(self._get_vif_port)
91
116
def _get_external_port(self, name):
92
118
external_ids = self.db_get_map("Interface", name, "external_ids")
96
ofport = self.db_get_val("Interface", name, "ofport")
122
# exclude tunnel ports
123
options = self.db_get_map("Interface", name, "options")
124
if "remote_ip" in options:
127
ofport = self.get_ofport(name)
97
128
return VifPort(name, ofport, None, None, self)
99
130
def get_external_ports(self):
100
131
return self._get_ports(self._get_external_port)
103
def check_ofp_mode(db):
134
class VifPortSet(object):
135
def __init__(self, int_br, ryu_rest_client):
136
super(VifPortSet, self).__init__()
138
self.api = ryu_rest_client
141
for port in self.int_br.get_external_ports():
142
LOG.debug(_('external port %s'), port)
143
self.api.update_port(rest_nw_id.NW_ID_EXTERNAL,
144
port.switch.datapath_id, port.ofport)
147
class OVSQuantumOFPRyuAgent(object):
148
def __init__(self, integ_br, ofp_rest_api_addr,
149
tunnel_ip, ovsdb_ip, ovsdb_port,
151
super(OVSQuantumOFPRyuAgent, self).__init__()
153
self.vif_ports = None
154
self._setup_integration_br(root_helper, integ_br,
156
tunnel_ip, ovsdb_port, ovsdb_ip)
158
def _setup_integration_br(self, root_helper, integ_br,
160
tunnel_ip, ovsdb_port, ovsdb_ip):
161
self.int_br = OVSBridge(integ_br, root_helper)
162
self.int_br.find_datapath_id()
164
ryu_rest_client = client.OFPClient(ofp_rest_api_addr)
166
self.vif_ports = VifPortSet(self.int_br, ryu_rest_client)
167
self.vif_ports.setup()
169
sc_client = client.SwitchConfClient(ofp_rest_api_addr)
170
sc_client.set_key(self.int_br.datapath_id,
171
conf_switch_key.OVS_TUNNEL_ADDR, tunnel_ip)
173
# Currently Ryu supports only tcp methods. (ssl isn't supported yet)
174
self.int_br.set_manager('ptcp:%d' % ovsdb_port)
175
sc_client.set_key(self.int_br.datapath_id, conf_switch_key.OVSDB_ADDR,
176
'tcp:%s:%d' % (ovsdb_ip, ovsdb_port))
179
def check_ofp_rest_api_addr(db):
104
180
LOG.debug("checking db")
106
182
servers = db.ofp_server.all()
113
189
elif serv.host_type == "controller":
114
190
ofp_controller_addr = serv.address
116
LOG.warn("ignoring unknown server type %s", serv)
192
LOG.warn(_("ignoring unknown server type %s"), serv)
118
LOG.debug("controller %s", ofp_controller_addr)
119
194
LOG.debug("api %s", ofp_rest_api_addr)
120
if not ofp_controller_addr:
121
raise RuntimeError("OF controller isn't specified")
195
if ofp_controller_addr:
196
LOG.warn(_('OF controller parameter is stale %s'), ofp_controller_addr)
122
197
if not ofp_rest_api_addr:
123
raise RuntimeError("Ryu rest API port isn't specified")
125
LOG.debug("going to ofp controller mode %s %s",
126
ofp_controller_addr, ofp_rest_api_addr)
127
return (ofp_controller_addr, ofp_rest_api_addr)
130
class OVSQuantumOFPRyuAgent:
131
def __init__(self, integ_br, db, root_helper):
132
self.root_helper = root_helper
133
(ofp_controller_addr, ofp_rest_api_addr) = check_ofp_mode(db)
135
self.nw_id_external = rest_nw_id.NW_ID_EXTERNAL
136
self.api = OFPClient(ofp_rest_api_addr)
137
self._setup_integration_br(integ_br, ofp_controller_addr)
139
def _setup_integration_br(self, integ_br, ofp_controller_addr):
140
self.int_br = OVSBridge(integ_br, self.root_helper)
141
self.int_br.find_datapath_id()
142
self.int_br.set_controller(ofp_controller_addr)
143
for port in self.int_br.get_external_ports():
144
self._port_update(self.nw_id_external, port)
146
def _port_update(self, network_id, port):
147
self.api.update_port(network_id, port.switch.datapath_id, port.ofport)
149
def _all_bindings(self, db):
150
"""return interface id -> port which include network id bindings"""
151
return dict((port.id, port) for port in db.ports.all())
153
def _set_port_status(self, port, status):
156
def daemon_loop(self, db):
157
# on startup, register all existing ports
158
all_bindings = self._all_bindings(db)
162
for port in self.int_br.get_vif_ports():
163
vif_ports[port.vif_id] = port
164
if port.vif_id in all_bindings:
165
net_id = all_bindings[port.vif_id].network_id
166
local_bindings[port.vif_id] = net_id
167
self._port_update(net_id, port)
168
self._set_port_status(all_bindings[port.vif_id],
169
constants.PORT_STATUS_ACTIVE)
170
LOG.info("Updating binding to net-id = %s for %s",
174
old_vif_ports = vif_ports
175
old_local_bindings = local_bindings
178
all_bindings = self._all_bindings(db)
181
new_local_bindings = {}
182
for port in self.int_br.get_vif_ports():
183
new_vif_ports[port.vif_id] = port
184
if port.vif_id in all_bindings:
185
net_id = all_bindings[port.vif_id].network_id
186
new_local_bindings[port.vif_id] = net_id
188
old_b = old_local_bindings.get(port.vif_id)
189
new_b = new_local_bindings.get(port.vif_id)
194
LOG.info("Removing binding to net-id = %s for %s",
196
if port.vif_id in all_bindings:
197
self._set_port_status(all_bindings[port.vif_id],
198
constants.PORT_STATUS_DOWN)
200
if port.vif_id in all_bindings:
201
self._set_port_status(all_bindings[port.vif_id],
202
constants.PORT_STATUS_ACTIVE)
203
LOG.info("Adding binding to net-id = %s for %s",
206
for vif_id in old_vif_ports:
207
if vif_id not in new_vif_ports:
208
LOG.info("Port Disappeared: %s", vif_id)
209
if vif_id in all_bindings:
210
self._set_port_status(all_bindings[port.vif_id],
211
constants.PORT_STATUS_DOWN)
213
old_vif_ports = new_vif_ports
214
old_local_bindings = new_local_bindings
198
raise RuntimeError(_("Ryu rest API port isn't specified"))
200
LOG.debug(_("going to ofp controller mode %s"), ofp_rest_api_addr)
201
return ofp_rest_api_addr
227
212
options = {"sql_connection": cfg.CONF.DATABASE.sql_connection}
228
213
db = SqlSoup(options["sql_connection"])
230
LOG.info("Connecting to database \"%s\" on %s",
231
db.engine.url.database, db.engine.url.host)
232
plugin = OVSQuantumOFPRyuAgent(integ_br, db, root_helper)
233
plugin.daemon_loop(db)
215
LOG.info(_("Connecting to database \"%(database)s\" on %(host)s") %
216
{"database": db.engine.url.database,
217
"host": db.engine.url.host})
218
ofp_rest_api_addr = check_ofp_rest_api_addr(db)
220
tunnel_ip = _get_tunnel_ip()
221
LOG.debug(_('tunnel_ip %s'), tunnel_ip)
222
ovsdb_port = cfg.CONF.OVS.ovsdb_port
223
LOG.debug(_('ovsdb_port %s'), ovsdb_port)
224
ovsdb_ip = _get_ovsdb_ip()
225
LOG.debug(_('ovsdb_ip %s'), ovsdb_ip)
227
OVSQuantumOFPRyuAgent(integ_br, ofp_rest_api_addr,
228
tunnel_ip, ovsdb_ip, ovsdb_port, root_helper)
229
except httplib.HTTPException, e:
230
LOG.error(_("initialization failed: %s"), e)
233
LOG.info(_("Ryu initialization on the node is done."
234
" Now Ryu agent exits successfully."))