1
# Copyright 2014 Big Switch Networks, Inc.
4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
5
# not use this file except in compliance with the License. You may obtain
6
# a copy of the License at
8
# http://www.apache.org/licenses/LICENSE-2.0
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13
# License for the specific language governing permissions and limitations
17
This module manages the HTTP and HTTPS connections to the backend controllers.
19
The main class it provides for external use is ServerPool which manages a set
20
of ServerProxy objects that correspond to individual backend controllers.
22
The following functionality is handled by this module:
23
- Translation of rest_* function calls to HTTP/HTTPS calls to the controllers
24
- Automatic failover between controllers
25
- SSL Certificate enforcement
38
import eventlet.corolocal
39
from oslo.config import cfg
40
from oslo.serialization import jsonutils
41
from oslo.utils import excutils
43
from neutron.common import exceptions
44
from neutron.i18n import _LE, _LI, _LW
45
from neutron.openstack.common import log as logging
46
from neutron.plugins.bigswitch.db import consistency_db as cdb
48
LOG = logging.getLogger(__name__)
50
# The following are used to invoke the API on the external controller
51
CAPABILITIES_PATH = "/capabilities"
52
NET_RESOURCE_PATH = "/tenants/%s/networks"
53
PORT_RESOURCE_PATH = "/tenants/%s/networks/%s/ports"
54
ROUTER_RESOURCE_PATH = "/tenants/%s/routers"
55
ROUTER_INTF_OP_PATH = "/tenants/%s/routers/%s/interfaces"
56
NETWORKS_PATH = "/tenants/%s/networks/%s"
57
FLOATINGIPS_PATH = "/tenants/%s/floatingips/%s"
58
PORTS_PATH = "/tenants/%s/networks/%s/ports/%s"
59
ATTACHMENT_PATH = "/tenants/%s/networks/%s/ports/%s/attachment"
60
ROUTERS_PATH = "/tenants/%s/routers/%s"
61
ROUTER_INTF_PATH = "/tenants/%s/routers/%s/interfaces/%s"
62
TOPOLOGY_PATH = "/topology"
63
HEALTH_PATH = "/health"
64
SWITCHES_PATH = "/switches/%s"
65
SUCCESS_CODES = range(200, 207)
66
FAILURE_CODES = [0, 301, 302, 303, 400, 401, 403, 404, 500, 501, 502, 503,
68
BASE_URI = '/networkService/v1.1'
69
ORCHESTRATION_SERVICE_ID = 'Neutron v2.0'
70
HASH_MATCH_HEADER = 'X-BSN-BVS-HASH-MATCH'
71
REQ_CONTEXT_HEADER = 'X-REQ-CONTEXT'
74
HTTP_SERVICE_UNAVAILABLE_RETRY_COUNT = 3
75
HTTP_SERVICE_UNAVAILABLE_RETRY_INTERVAL = 3
78
class RemoteRestError(exceptions.NeutronException):
79
message = _("Error in REST call to remote network "
80
"controller: %(reason)s")
83
def __init__(self, **kwargs):
84
self.status = kwargs.pop('status', None)
85
self.reason = kwargs.get('reason')
86
super(RemoteRestError, self).__init__(**kwargs)
89
class ServerProxy(object):
90
"""REST server proxy to a network controller."""
92
def __init__(self, server, port, ssl, auth, neutron_id, timeout,
93
base_uri, name, mypool, combined_cert):
97
self.base_uri = base_uri
98
self.timeout = timeout
100
self.success_codes = SUCCESS_CODES
102
self.neutron_id = neutron_id
104
self.capabilities = []
105
# enable server to reference parent pool
107
# cache connection here to avoid a SSL handshake for every connection
108
self.currentconn = None
110
self.auth = 'Basic ' + base64.encodestring(auth).strip()
111
self.combined_cert = combined_cert
113
def get_capabilities(self):
115
body = self.rest_call('GET', CAPABILITIES_PATH)[2]
116
self.capabilities = jsonutils.loads(body)
118
LOG.exception(_LE("Couldn't retrieve capabilities. "
119
"Newer API calls won't be supported."))
120
LOG.info(_LI("The following capabilities were received "
121
"for %(server)s: %(cap)s"), {'server': self.server,
122
'cap': self.capabilities})
123
return self.capabilities
125
def rest_call(self, action, resource, data='', headers=None,
126
timeout=False, reconnect=False, hash_handler=None):
127
uri = self.base_uri + resource
128
body = jsonutils.dumps(data)
129
headers = headers or {}
130
headers['Content-type'] = 'application/json'
131
headers['Accept'] = 'application/json'
132
headers['NeutronProxy-Agent'] = self.name
133
headers['Instance-ID'] = self.neutron_id
134
headers['Orchestration-Service-ID'] = ORCHESTRATION_SERVICE_ID
136
# this will be excluded on calls that don't need hashes
137
# (e.g. topology sync, capability checks)
138
headers[HASH_MATCH_HEADER] = hash_handler.read_for_update()
140
hash_handler = cdb.HashHandler()
141
if 'keep-alive' in self.capabilities:
142
headers['Connection'] = 'keep-alive'
146
headers['Authorization'] = self.auth
148
LOG.debug("ServerProxy: server=%(server)s, port=%(port)d, "
150
{'server': self.server, 'port': self.port, 'ssl': self.ssl})
151
LOG.debug("ServerProxy: resource=%(resource)s, data=%(data)r, "
152
"headers=%(headers)r, action=%(action)s",
153
{'resource': resource, 'data': data, 'headers': headers,
156
# unspecified timeout is False because a timeout can be specified as
157
# None to indicate no timeout.
159
timeout = self.timeout
161
if timeout != self.timeout:
162
# need a new connection if timeout has changed
165
if not self.currentconn or reconnect:
167
self.currentconn.close()
169
self.currentconn = HTTPSConnectionWithValidation(
170
self.server, self.port, timeout=timeout)
171
if self.currentconn is None:
172
LOG.error(_LE('ServerProxy: Could not establish HTTPS '
174
return 0, None, None, None
175
self.currentconn.combined_cert = self.combined_cert
177
self.currentconn = httplib.HTTPConnection(
178
self.server, self.port, timeout=timeout)
179
if self.currentconn is None:
180
LOG.error(_LE('ServerProxy: Could not establish HTTP '
182
return 0, None, None, None
185
self.currentconn.request(action, uri, body, headers)
186
response = self.currentconn.getresponse()
187
respstr = response.read()
189
if response.status in self.success_codes:
190
hash_value = response.getheader(HASH_MATCH_HEADER)
191
# don't clear hash from DB if a hash header wasn't present
192
if hash_value is not None:
193
hash_handler.put_hash(hash_value)
195
hash_handler.clear_lock()
197
respdata = jsonutils.loads(respstr)
199
# response was not JSON, ignore the exception
202
# release lock so others don't have to wait for timeout
203
hash_handler.clear_lock()
205
ret = (response.status, response.reason, respstr, respdata)
206
except httplib.HTTPException:
207
# If we were using a cached connection, try again with a new one.
208
with excutils.save_and_reraise_exception() as ctxt:
209
self.currentconn.close()
211
# if reconnect is true, this was on a fresh connection so
212
# reraise since this server seems to be broken
215
# if reconnect is false, it was a cached connection so
216
# try one more time before re-raising
218
return self.rest_call(action, resource, data, headers,
219
timeout=timeout, reconnect=True)
220
except (socket.timeout, socket.error) as e:
221
self.currentconn.close()
222
LOG.error(_LE('ServerProxy: %(action)s failure, %(e)r'),
223
{'action': action, 'e': e})
224
ret = 0, None, None, None
225
LOG.debug("ServerProxy: status=%(status)d, reason=%(reason)r, "
226
"ret=%(ret)s, data=%(data)r", {'status': ret[0],
233
class ServerPool(object):
238
def get_instance(cls):
241
cls._instance = cls()
244
def __init__(self, timeout=False,
245
base_uri=BASE_URI, name='NeutronRestProxy'):
246
LOG.debug("ServerPool: initializing")
247
# 'servers' is the list of network controller REST end-points
248
# (used in order specified till one succeeds, and it is sticky
249
# till next failure). Use 'server_auth' to encode api-key
250
servers = cfg.CONF.RESTPROXY.servers
251
self.auth = cfg.CONF.RESTPROXY.server_auth
252
self.ssl = cfg.CONF.RESTPROXY.server_ssl
253
self.neutron_id = cfg.CONF.RESTPROXY.neutron_id
254
self.base_uri = base_uri
257
self.timeout = cfg.CONF.RESTPROXY.server_timeout
258
self.always_reconnect = not cfg.CONF.RESTPROXY.cache_connections
260
if timeout is not False:
261
self.timeout = timeout
263
# Function to use to retrieve topology for consistency syncs.
264
# Needs to be set by module that uses the servermanager.
265
self.get_topo_function = None
266
self.get_topo_function_args = {}
269
raise cfg.Error(_('Servers not defined. Aborting server manager.'))
270
servers = [s if len(s.rsplit(':', 1)) == 2
271
else "%s:%d" % (s, default_port)
273
if any((len(spl) != 2 or not spl[1].isdigit())
274
for spl in [sp.rsplit(':', 1)
276
raise cfg.Error(_('Servers must be defined as <ip>:<port>. '
277
'Configuration was %s') % servers)
279
self.server_proxy_for(server, int(port))
280
for server, port in (s.rsplit(':', 1) for s in servers)
282
eventlet.spawn(self._consistency_watchdog,
283
cfg.CONF.RESTPROXY.consistency_interval)
284
ServerPool._instance = self
285
LOG.debug("ServerPool: initialization done")
287
def set_context(self, context):
288
# this context needs to be local to the greenthread
289
# so concurrent requests don't use the wrong context.
290
# Use a weakref so the context is garbage collected
291
# after the plugin is done with it.
292
ref = weakref.ref(context)
293
self.contexts[eventlet.corolocal.get_ident()] = ref
295
def get_context_ref(self):
296
# Try to get the context cached for this thread. If one
297
# doesn't exist or if it's been garbage collected, this will
300
return self.contexts[eventlet.corolocal.get_ident()]()
304
def get_capabilities(self):
305
# lookup on first try
307
return self.capabilities
308
except AttributeError:
309
# each server should return a list of capabilities it supports
310
# e.g. ['floatingip']
311
capabilities = [set(server.get_capabilities())
312
for server in self.servers]
313
# Pool only supports what all of the servers support
314
self.capabilities = set.intersection(*capabilities)
315
return self.capabilities
317
def server_proxy_for(self, server, port):
318
combined_cert = self._get_combined_cert_for_server(server, port)
319
return ServerProxy(server, port, self.ssl, self.auth, self.neutron_id,
320
self.timeout, self.base_uri, self.name, mypool=self,
321
combined_cert=combined_cert)
323
def _get_combined_cert_for_server(self, server, port):
324
# The ssl library requires a combined file with all trusted certs
325
# so we make one containing the trusted CAs and the corresponding
326
# host cert for this server
328
if self.ssl and not cfg.CONF.RESTPROXY.no_ssl_validation:
329
base_ssl = cfg.CONF.RESTPROXY.ssl_cert_directory
330
host_dir = os.path.join(base_ssl, 'host_certs')
331
ca_dir = os.path.join(base_ssl, 'ca_certs')
332
combined_dir = os.path.join(base_ssl, 'combined')
333
combined_cert = os.path.join(combined_dir, '%s.pem' % server)
334
if not os.path.exists(base_ssl):
335
raise cfg.Error(_('ssl_cert_directory [%s] does not exist. '
336
'Create it or disable ssl.') % base_ssl)
337
for automake in [combined_dir, ca_dir, host_dir]:
338
if not os.path.exists(automake):
339
os.makedirs(automake)
342
certs = self._get_ca_cert_paths(ca_dir)
344
# check for a host specific cert
345
hcert, exists = self._get_host_cert_path(host_dir, server)
348
elif cfg.CONF.RESTPROXY.ssl_sticky:
349
self._fetch_and_store_cert(server, port, hcert)
352
raise cfg.Error(_('No certificates were found to verify '
353
'controller %s') % (server))
354
self._combine_certs_to_file(certs, combined_cert)
357
def _combine_certs_to_file(self, certs, cfile):
359
Concatenates the contents of each certificate in a list of
360
certificate paths to one combined location for use with ssl
363
with open(cfile, 'w') as combined:
365
with open(c, 'r') as cert_handle:
366
combined.write(cert_handle.read())
368
def _get_host_cert_path(self, host_dir, server):
370
returns full path and boolean indicating existence
372
hcert = os.path.join(host_dir, '%s.pem' % server)
373
if os.path.exists(hcert):
377
def _get_ca_cert_paths(self, ca_dir):
378
certs = [os.path.join(root, name)
380
name for (root, dirs, files) in os.walk(ca_dir)
383
if name.endswith('.pem')]
386
def _fetch_and_store_cert(self, server, port, path):
388
Grabs a certificate from a server and writes it to
392
cert = ssl.get_server_certificate((server, port),
393
ssl_version=ssl.PROTOCOL_TLSv1)
394
except Exception as e:
395
raise cfg.Error(_('Could not retrieve initial '
396
'certificate from controller %(server)s. '
397
'Error details: %(error)s') %
398
{'server': server, 'error': e})
400
LOG.warning(_LW("Storing to certificate for host %(server)s "
401
"at %(path)s"), {'server': server,
403
self._file_put_contents(path, cert)
407
def _file_put_contents(self, path, contents):
408
# Simple method to write to file.
409
# Created for easy Mocking
410
with open(path, 'w') as handle:
411
handle.write(contents)
413
def server_failure(self, resp, ignore_codes=[]):
414
"""Define failure codes as required.
416
Note: We assume 301-303 is a failure, and try the next server in
419
return (resp[0] in FAILURE_CODES and resp[0] not in ignore_codes)
421
def action_success(self, resp):
422
"""Defining success codes as required.
424
Note: We assume any valid 2xx as being successful response.
426
return resp[0] in SUCCESS_CODES
428
def rest_call(self, action, resource, data, headers, ignore_codes,
430
context = self.get_context_ref()
432
# include the requesting context information if available
433
cdict = context.to_dict()
434
# remove the auth token so it's not present in debug logs on the
436
cdict.pop('auth_token', None)
437
headers[REQ_CONTEXT_HEADER] = jsonutils.dumps(cdict)
438
hash_handler = cdb.HashHandler()
439
good_first = sorted(self.servers, key=lambda x: x.failed)
440
first_response = None
441
for active_server in good_first:
442
for x in range(HTTP_SERVICE_UNAVAILABLE_RETRY_COUNT + 1):
443
ret = active_server.rest_call(action, resource, data, headers,
445
reconnect=self.always_reconnect,
446
hash_handler=hash_handler)
447
if ret[0] != httplib.SERVICE_UNAVAILABLE:
449
time.sleep(HTTP_SERVICE_UNAVAILABLE_RETRY_INTERVAL)
451
# If inconsistent, do a full synchronization
452
if ret[0] == httplib.CONFLICT:
453
if not self.get_topo_function:
454
raise cfg.Error(_('Server requires synchronization, '
455
'but no topology function was defined.'))
456
data = self.get_topo_function(**self.get_topo_function_args)
457
active_server.rest_call('PUT', TOPOLOGY_PATH, data,
459
# Store the first response as the error to be bubbled up to the
460
# user since it was a good server. Subsequent servers will most
461
# likely be cluster slaves and won't have a useful error for the
462
# user (e.g. 302 redirect to master)
463
if not first_response:
465
if not self.server_failure(ret, ignore_codes):
466
active_server.failed = False
469
LOG.error(_LE('ServerProxy: %(action)s failure for servers: '
470
'%(server)r Response: %(response)s'),
472
'server': (active_server.server,
475
LOG.error(_LE("ServerProxy: Error details: status=%(status)d, "
476
"reason=%(reason)r, ret=%(ret)s, data=%(data)r"),
477
{'status': ret[0], 'reason': ret[1], 'ret': ret[2],
479
active_server.failed = True
481
# A failure on a delete means the object is gone from Neutron but not
482
# from the controller. Set the consistency hash to a bad value to
483
# trigger a sync on the next check.
484
# NOTE: The hash must have a comma in it otherwise it will be ignored
486
if action == 'DELETE':
487
hash_handler.put_hash('INCONSISTENT,INCONSISTENT')
488
# All servers failed, reset server list and try again next time
489
LOG.error(_LE('ServerProxy: %(action)s failure for all servers: '
492
'server': tuple((s.server,
493
s.port) for s in self.servers)})
494
return first_response
496
def rest_action(self, action, resource, data='', errstr='%s',
497
ignore_codes=None, headers=None, timeout=False):
499
Wrapper for rest_call that verifies success and raises a
500
RemoteRestError on failure with a provided error string
501
By default, 404 errors on DELETE calls are ignored because
502
they already do not exist on the backend.
504
ignore_codes = ignore_codes or []
505
headers = headers or {}
506
if not ignore_codes and action == 'DELETE':
508
resp = self.rest_call(action, resource, data, headers, ignore_codes,
510
if self.server_failure(resp, ignore_codes):
511
LOG.error(errstr, resp[2])
512
raise RemoteRestError(reason=resp[2], status=resp[0])
513
if resp[0] in ignore_codes:
514
LOG.info(_LI("NeutronRestProxyV2: Received and ignored error "
515
"code %(code)s on %(action)s action to resource "
517
{'code': resp[2], 'action': action,
518
'resource': resource})
521
def rest_create_router(self, tenant_id, router):
522
resource = ROUTER_RESOURCE_PATH % tenant_id
523
data = {"router": router}
524
errstr = _("Unable to create remote router: %s")
525
self.rest_action('POST', resource, data, errstr)
527
def rest_update_router(self, tenant_id, router, router_id):
528
resource = ROUTERS_PATH % (tenant_id, router_id)
529
data = {"router": router}
530
errstr = _("Unable to update remote router: %s")
531
self.rest_action('PUT', resource, data, errstr)
533
def rest_delete_router(self, tenant_id, router_id):
534
resource = ROUTERS_PATH % (tenant_id, router_id)
535
errstr = _("Unable to delete remote router: %s")
536
self.rest_action('DELETE', resource, errstr=errstr)
538
def rest_add_router_interface(self, tenant_id, router_id, intf_details):
539
resource = ROUTER_INTF_OP_PATH % (tenant_id, router_id)
540
data = {"interface": intf_details}
541
errstr = _("Unable to add router interface: %s")
542
self.rest_action('POST', resource, data, errstr)
544
def rest_remove_router_interface(self, tenant_id, router_id, interface_id):
545
resource = ROUTER_INTF_PATH % (tenant_id, router_id, interface_id)
546
errstr = _("Unable to delete remote intf: %s")
547
self.rest_action('DELETE', resource, errstr=errstr)
549
def rest_create_network(self, tenant_id, network):
550
resource = NET_RESOURCE_PATH % tenant_id
551
data = {"network": network}
552
errstr = _("Unable to create remote network: %s")
553
self.rest_action('POST', resource, data, errstr)
555
def rest_update_network(self, tenant_id, net_id, network):
556
resource = NETWORKS_PATH % (tenant_id, net_id)
557
data = {"network": network}
558
errstr = _("Unable to update remote network: %s")
559
self.rest_action('PUT', resource, data, errstr)
561
def rest_delete_network(self, tenant_id, net_id):
562
resource = NETWORKS_PATH % (tenant_id, net_id)
563
errstr = _("Unable to update remote network: %s")
564
self.rest_action('DELETE', resource, errstr=errstr)
566
def rest_create_port(self, tenant_id, net_id, port):
567
resource = ATTACHMENT_PATH % (tenant_id, net_id, port["id"])
568
data = {"port": port}
569
device_id = port.get("device_id")
570
if not port["mac_address"] or not device_id:
571
# controller only cares about ports attached to devices
572
LOG.warning(_LW("No device MAC attached to port %s. "
573
"Skipping notification to controller."),
576
data["attachment"] = {"id": device_id,
577
"mac": port["mac_address"]}
578
errstr = _("Unable to create remote port: %s")
579
self.rest_action('PUT', resource, data, errstr)
581
def rest_delete_port(self, tenant_id, network_id, port_id):
582
resource = ATTACHMENT_PATH % (tenant_id, network_id, port_id)
583
errstr = _("Unable to delete remote port: %s")
584
self.rest_action('DELETE', resource, errstr=errstr)
586
def rest_update_port(self, tenant_id, net_id, port):
587
# Controller has no update operation for the port endpoint
588
# the create PUT method will replace
589
self.rest_create_port(tenant_id, net_id, port)
591
def rest_create_floatingip(self, tenant_id, floatingip):
592
resource = FLOATINGIPS_PATH % (tenant_id, floatingip['id'])
593
errstr = _("Unable to create floating IP: %s")
594
self.rest_action('PUT', resource, floatingip, errstr=errstr)
596
def rest_update_floatingip(self, tenant_id, floatingip, oldid):
597
resource = FLOATINGIPS_PATH % (tenant_id, oldid)
598
errstr = _("Unable to update floating IP: %s")
599
self.rest_action('PUT', resource, floatingip, errstr=errstr)
601
def rest_delete_floatingip(self, tenant_id, oldid):
602
resource = FLOATINGIPS_PATH % (tenant_id, oldid)
603
errstr = _("Unable to delete floating IP: %s")
604
self.rest_action('DELETE', resource, errstr=errstr)
606
def rest_get_switch(self, switch_id):
607
resource = SWITCHES_PATH % switch_id
608
errstr = _("Unable to retrieve switch: %s")
609
resp = self.rest_action('GET', resource, errstr=errstr,
611
# return None if switch not found, else return switch info
612
return None if resp[0] == 404 else resp[3]
614
def _consistency_watchdog(self, polling_interval=60):
615
if 'consistency' not in self.get_capabilities():
616
LOG.warning(_LW("Backend server(s) do not support automated "
617
"consitency checks."))
619
if not polling_interval:
620
LOG.warning(_LW("Consistency watchdog disabled by polling "
621
"interval setting of %s."), polling_interval)
624
# If consistency is supported, all we have to do is make any
625
# rest call and the consistency header will be added. If it
626
# doesn't match, the backend will return a synchronization error
627
# that will be handled by the rest_action.
628
eventlet.sleep(polling_interval)
630
self.rest_action('GET', HEALTH_PATH)
632
LOG.exception(_LE("Encountered an error checking controller "
636
class HTTPSConnectionWithValidation(httplib.HTTPSConnection):
638
# If combined_cert is None, the connection will continue without
639
# any certificate validation.
643
sock = socket.create_connection((self.host, self.port),
644
self.timeout, self.source_address)
645
if self._tunnel_host:
649
if self.combined_cert:
650
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
651
cert_reqs=ssl.CERT_REQUIRED,
652
ca_certs=self.combined_cert,
653
ssl_version=ssl.PROTOCOL_TLSv1)
655
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
656
cert_reqs=ssl.CERT_NONE,
657
ssl_version=ssl.PROTOCOL_TLSv1)