~ubuntu-branches/ubuntu/vivid/neutron/vivid-updates

« back to all changes in this revision

Viewing changes to neutron/plugins/bigswitch/servermanager.py

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2015-03-30 11:17:19 UTC
  • mfrom: (1.1.21)
  • Revision ID: package-import@ubuntu.com-20150330111719-h0gx7233p4jkkgfh
Tags: 1:2015.1~b3-0ubuntu1
* New upstream milestone release:
  - d/control: Align version requirements with upstream.
  - d/control: Add new dependency on oslo-log.
  - d/p/*: Rebase.
  - d/control,d/neutron-plugin-hyperv*: Dropped, decomposed into
    separate project upstream.
  - d/control,d/neutron-plugin-openflow*: Dropped, decomposed into
    separate project upstream.
  - d/neutron-common.install: Add neutron-rootwrap-daemon and 
    neutron-keepalived-state-change binaries.
  - d/rules: Ignore neutron-hyperv-agent when installing; only for Windows.
  - d/neutron-plugin-cisco.install: Drop neutron-cisco-cfg-agent as
    decomposed into separate project upstream.
  - d/neutron-plugin-vmware.install: Drop neutron-check-nsx-config and
    neutron-nsx-manage as decomposed into separate project upstream.
  - d/control: Add dependency on python-neutron-fwaas to neutron-l3-agent.
* d/pydist-overrides: Add overrides for oslo packages.
* d/control: Fixup type in package description (LP: #1263539).
* d/p/fixup-driver-test-execution.patch: Cherry pick fix from upstream VCS
  to support unit test exection in out-of-tree vendor drivers.
* d/neutron-common.postinst: Allow general access to /etc/neutron but limit
  access to root/neutron to /etc/neutron/neutron.conf to support execution
  of unit tests in decomposed vendor drivers.
* d/control: Add dependency on python-neutron-fwaas to neutron-l3-agent
  package.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright 2014 Big Switch Networks, Inc.
2
 
# All Rights Reserved.
3
 
#
4
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
5
 
#    not use this file except in compliance with the License. You may obtain
6
 
#    a copy of the License at
7
 
#
8
 
#         http://www.apache.org/licenses/LICENSE-2.0
9
 
#
10
 
#    Unless required by applicable law or agreed to in writing, software
11
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13
 
#    License for the specific language governing permissions and limitations
14
 
#    under the License.
15
 
 
16
 
"""
17
 
This module manages the HTTP and HTTPS connections to the backend controllers.
18
 
 
19
 
The main class it provides for external use is ServerPool which manages a set
20
 
of ServerProxy objects that correspond to individual backend controllers.
21
 
 
22
 
The following functionality is handled by this module:
23
 
- Translation of rest_* function calls to HTTP/HTTPS calls to the controllers
24
 
- Automatic failover between controllers
25
 
- SSL Certificate enforcement
26
 
- HTTP Authentication
27
 
 
28
 
"""
29
 
import base64
30
 
import httplib
31
 
import os
32
 
import socket
33
 
import ssl
34
 
import time
35
 
import weakref
36
 
 
37
 
import eventlet
38
 
import eventlet.corolocal
39
 
from oslo.config import cfg
40
 
from oslo.serialization import jsonutils
41
 
from oslo.utils import excutils
42
 
 
43
 
from neutron.common import exceptions
44
 
from neutron.i18n import _LE, _LI, _LW
45
 
from neutron.openstack.common import log as logging
46
 
from neutron.plugins.bigswitch.db import consistency_db as cdb
47
 
 
48
 
LOG = logging.getLogger(__name__)
49
 
 
50
 
# The following are used to invoke the API on the external controller
51
 
CAPABILITIES_PATH = "/capabilities"
52
 
NET_RESOURCE_PATH = "/tenants/%s/networks"
53
 
PORT_RESOURCE_PATH = "/tenants/%s/networks/%s/ports"
54
 
ROUTER_RESOURCE_PATH = "/tenants/%s/routers"
55
 
ROUTER_INTF_OP_PATH = "/tenants/%s/routers/%s/interfaces"
56
 
NETWORKS_PATH = "/tenants/%s/networks/%s"
57
 
FLOATINGIPS_PATH = "/tenants/%s/floatingips/%s"
58
 
PORTS_PATH = "/tenants/%s/networks/%s/ports/%s"
59
 
ATTACHMENT_PATH = "/tenants/%s/networks/%s/ports/%s/attachment"
60
 
ROUTERS_PATH = "/tenants/%s/routers/%s"
61
 
ROUTER_INTF_PATH = "/tenants/%s/routers/%s/interfaces/%s"
62
 
TOPOLOGY_PATH = "/topology"
63
 
HEALTH_PATH = "/health"
64
 
SWITCHES_PATH = "/switches/%s"
65
 
SUCCESS_CODES = range(200, 207)
66
 
FAILURE_CODES = [0, 301, 302, 303, 400, 401, 403, 404, 500, 501, 502, 503,
67
 
                 504, 505]
68
 
BASE_URI = '/networkService/v1.1'
69
 
ORCHESTRATION_SERVICE_ID = 'Neutron v2.0'
70
 
HASH_MATCH_HEADER = 'X-BSN-BVS-HASH-MATCH'
71
 
REQ_CONTEXT_HEADER = 'X-REQ-CONTEXT'
72
 
# error messages
73
 
NXNETWORK = 'NXVNS'
74
 
HTTP_SERVICE_UNAVAILABLE_RETRY_COUNT = 3
75
 
HTTP_SERVICE_UNAVAILABLE_RETRY_INTERVAL = 3
76
 
 
77
 
 
78
 
class RemoteRestError(exceptions.NeutronException):
79
 
    message = _("Error in REST call to remote network "
80
 
                "controller: %(reason)s")
81
 
    status = None
82
 
 
83
 
    def __init__(self, **kwargs):
84
 
        self.status = kwargs.pop('status', None)
85
 
        self.reason = kwargs.get('reason')
86
 
        super(RemoteRestError, self).__init__(**kwargs)
87
 
 
88
 
 
89
 
class ServerProxy(object):
90
 
    """REST server proxy to a network controller."""
91
 
 
92
 
    def __init__(self, server, port, ssl, auth, neutron_id, timeout,
93
 
                 base_uri, name, mypool, combined_cert):
94
 
        self.server = server
95
 
        self.port = port
96
 
        self.ssl = ssl
97
 
        self.base_uri = base_uri
98
 
        self.timeout = timeout
99
 
        self.name = name
100
 
        self.success_codes = SUCCESS_CODES
101
 
        self.auth = None
102
 
        self.neutron_id = neutron_id
103
 
        self.failed = False
104
 
        self.capabilities = []
105
 
        # enable server to reference parent pool
106
 
        self.mypool = mypool
107
 
        # cache connection here to avoid a SSL handshake for every connection
108
 
        self.currentconn = None
109
 
        if auth:
110
 
            self.auth = 'Basic ' + base64.encodestring(auth).strip()
111
 
        self.combined_cert = combined_cert
112
 
 
113
 
    def get_capabilities(self):
114
 
        try:
115
 
            body = self.rest_call('GET', CAPABILITIES_PATH)[2]
116
 
            self.capabilities = jsonutils.loads(body)
117
 
        except Exception:
118
 
            LOG.exception(_LE("Couldn't retrieve capabilities. "
119
 
                              "Newer API calls won't be supported."))
120
 
        LOG.info(_LI("The following capabilities were received "
121
 
                     "for %(server)s: %(cap)s"), {'server': self.server,
122
 
                                                  'cap': self.capabilities})
123
 
        return self.capabilities
124
 
 
125
 
    def rest_call(self, action, resource, data='', headers=None,
126
 
                  timeout=False, reconnect=False, hash_handler=None):
127
 
        uri = self.base_uri + resource
128
 
        body = jsonutils.dumps(data)
129
 
        headers = headers or {}
130
 
        headers['Content-type'] = 'application/json'
131
 
        headers['Accept'] = 'application/json'
132
 
        headers['NeutronProxy-Agent'] = self.name
133
 
        headers['Instance-ID'] = self.neutron_id
134
 
        headers['Orchestration-Service-ID'] = ORCHESTRATION_SERVICE_ID
135
 
        if hash_handler:
136
 
            # this will be excluded on calls that don't need hashes
137
 
            # (e.g. topology sync, capability checks)
138
 
            headers[HASH_MATCH_HEADER] = hash_handler.read_for_update()
139
 
        else:
140
 
            hash_handler = cdb.HashHandler()
141
 
        if 'keep-alive' in self.capabilities:
142
 
            headers['Connection'] = 'keep-alive'
143
 
        else:
144
 
            reconnect = True
145
 
        if self.auth:
146
 
            headers['Authorization'] = self.auth
147
 
 
148
 
        LOG.debug("ServerProxy: server=%(server)s, port=%(port)d, "
149
 
                  "ssl=%(ssl)r",
150
 
                  {'server': self.server, 'port': self.port, 'ssl': self.ssl})
151
 
        LOG.debug("ServerProxy: resource=%(resource)s, data=%(data)r, "
152
 
                  "headers=%(headers)r, action=%(action)s",
153
 
                  {'resource': resource, 'data': data, 'headers': headers,
154
 
                   'action': action})
155
 
 
156
 
        # unspecified timeout is False because a timeout can be specified as
157
 
        # None to indicate no timeout.
158
 
        if timeout is False:
159
 
            timeout = self.timeout
160
 
 
161
 
        if timeout != self.timeout:
162
 
            # need a new connection if timeout has changed
163
 
            reconnect = True
164
 
 
165
 
        if not self.currentconn or reconnect:
166
 
            if self.currentconn:
167
 
                self.currentconn.close()
168
 
            if self.ssl:
169
 
                self.currentconn = HTTPSConnectionWithValidation(
170
 
                    self.server, self.port, timeout=timeout)
171
 
                if self.currentconn is None:
172
 
                    LOG.error(_LE('ServerProxy: Could not establish HTTPS '
173
 
                                  'connection'))
174
 
                    return 0, None, None, None
175
 
                self.currentconn.combined_cert = self.combined_cert
176
 
            else:
177
 
                self.currentconn = httplib.HTTPConnection(
178
 
                    self.server, self.port, timeout=timeout)
179
 
                if self.currentconn is None:
180
 
                    LOG.error(_LE('ServerProxy: Could not establish HTTP '
181
 
                                  'connection'))
182
 
                    return 0, None, None, None
183
 
 
184
 
        try:
185
 
            self.currentconn.request(action, uri, body, headers)
186
 
            response = self.currentconn.getresponse()
187
 
            respstr = response.read()
188
 
            respdata = respstr
189
 
            if response.status in self.success_codes:
190
 
                hash_value = response.getheader(HASH_MATCH_HEADER)
191
 
                # don't clear hash from DB if a hash header wasn't present
192
 
                if hash_value is not None:
193
 
                    hash_handler.put_hash(hash_value)
194
 
                else:
195
 
                    hash_handler.clear_lock()
196
 
                try:
197
 
                    respdata = jsonutils.loads(respstr)
198
 
                except ValueError:
199
 
                    # response was not JSON, ignore the exception
200
 
                    pass
201
 
            else:
202
 
                # release lock so others don't have to wait for timeout
203
 
                hash_handler.clear_lock()
204
 
 
205
 
            ret = (response.status, response.reason, respstr, respdata)
206
 
        except httplib.HTTPException:
207
 
            # If we were using a cached connection, try again with a new one.
208
 
            with excutils.save_and_reraise_exception() as ctxt:
209
 
                self.currentconn.close()
210
 
                if reconnect:
211
 
                    # if reconnect is true, this was on a fresh connection so
212
 
                    # reraise since this server seems to be broken
213
 
                    ctxt.reraise = True
214
 
                else:
215
 
                    # if reconnect is false, it was a cached connection so
216
 
                    # try one more time before re-raising
217
 
                    ctxt.reraise = False
218
 
            return self.rest_call(action, resource, data, headers,
219
 
                                  timeout=timeout, reconnect=True)
220
 
        except (socket.timeout, socket.error) as e:
221
 
            self.currentconn.close()
222
 
            LOG.error(_LE('ServerProxy: %(action)s failure, %(e)r'),
223
 
                      {'action': action, 'e': e})
224
 
            ret = 0, None, None, None
225
 
        LOG.debug("ServerProxy: status=%(status)d, reason=%(reason)r, "
226
 
                  "ret=%(ret)s, data=%(data)r", {'status': ret[0],
227
 
                                                 'reason': ret[1],
228
 
                                                 'ret': ret[2],
229
 
                                                 'data': ret[3]})
230
 
        return ret
231
 
 
232
 
 
233
 
class ServerPool(object):
234
 
 
235
 
    _instance = None
236
 
 
237
 
    @classmethod
238
 
    def get_instance(cls):
239
 
        if cls._instance:
240
 
            return cls._instance
241
 
        cls._instance = cls()
242
 
        return cls._instance
243
 
 
244
 
    def __init__(self, timeout=False,
245
 
                 base_uri=BASE_URI, name='NeutronRestProxy'):
246
 
        LOG.debug("ServerPool: initializing")
247
 
        # 'servers' is the list of network controller REST end-points
248
 
        # (used in order specified till one succeeds, and it is sticky
249
 
        # till next failure). Use 'server_auth' to encode api-key
250
 
        servers = cfg.CONF.RESTPROXY.servers
251
 
        self.auth = cfg.CONF.RESTPROXY.server_auth
252
 
        self.ssl = cfg.CONF.RESTPROXY.server_ssl
253
 
        self.neutron_id = cfg.CONF.RESTPROXY.neutron_id
254
 
        self.base_uri = base_uri
255
 
        self.name = name
256
 
        self.contexts = {}
257
 
        self.timeout = cfg.CONF.RESTPROXY.server_timeout
258
 
        self.always_reconnect = not cfg.CONF.RESTPROXY.cache_connections
259
 
        default_port = 8000
260
 
        if timeout is not False:
261
 
            self.timeout = timeout
262
 
 
263
 
        # Function to use to retrieve topology for consistency syncs.
264
 
        # Needs to be set by module that uses the servermanager.
265
 
        self.get_topo_function = None
266
 
        self.get_topo_function_args = {}
267
 
 
268
 
        if not servers:
269
 
            raise cfg.Error(_('Servers not defined. Aborting server manager.'))
270
 
        servers = [s if len(s.rsplit(':', 1)) == 2
271
 
                   else "%s:%d" % (s, default_port)
272
 
                   for s in servers]
273
 
        if any((len(spl) != 2 or not spl[1].isdigit())
274
 
               for spl in [sp.rsplit(':', 1)
275
 
                           for sp in servers]):
276
 
            raise cfg.Error(_('Servers must be defined as <ip>:<port>. '
277
 
                              'Configuration was %s') % servers)
278
 
        self.servers = [
279
 
            self.server_proxy_for(server, int(port))
280
 
            for server, port in (s.rsplit(':', 1) for s in servers)
281
 
        ]
282
 
        eventlet.spawn(self._consistency_watchdog,
283
 
                       cfg.CONF.RESTPROXY.consistency_interval)
284
 
        ServerPool._instance = self
285
 
        LOG.debug("ServerPool: initialization done")
286
 
 
287
 
    def set_context(self, context):
288
 
        # this context needs to be local to the greenthread
289
 
        # so concurrent requests don't use the wrong context.
290
 
        # Use a weakref so the context is garbage collected
291
 
        # after the plugin is done with it.
292
 
        ref = weakref.ref(context)
293
 
        self.contexts[eventlet.corolocal.get_ident()] = ref
294
 
 
295
 
    def get_context_ref(self):
296
 
        # Try to get the context cached for this thread. If one
297
 
        # doesn't exist or if it's been garbage collected, this will
298
 
        # just return None.
299
 
        try:
300
 
            return self.contexts[eventlet.corolocal.get_ident()]()
301
 
        except KeyError:
302
 
            return None
303
 
 
304
 
    def get_capabilities(self):
305
 
        # lookup on first try
306
 
        try:
307
 
            return self.capabilities
308
 
        except AttributeError:
309
 
            # each server should return a list of capabilities it supports
310
 
            # e.g. ['floatingip']
311
 
            capabilities = [set(server.get_capabilities())
312
 
                            for server in self.servers]
313
 
            # Pool only supports what all of the servers support
314
 
            self.capabilities = set.intersection(*capabilities)
315
 
            return self.capabilities
316
 
 
317
 
    def server_proxy_for(self, server, port):
318
 
        combined_cert = self._get_combined_cert_for_server(server, port)
319
 
        return ServerProxy(server, port, self.ssl, self.auth, self.neutron_id,
320
 
                           self.timeout, self.base_uri, self.name, mypool=self,
321
 
                           combined_cert=combined_cert)
322
 
 
323
 
    def _get_combined_cert_for_server(self, server, port):
324
 
        # The ssl library requires a combined file with all trusted certs
325
 
        # so we make one containing the trusted CAs and the corresponding
326
 
        # host cert for this server
327
 
        combined_cert = None
328
 
        if self.ssl and not cfg.CONF.RESTPROXY.no_ssl_validation:
329
 
            base_ssl = cfg.CONF.RESTPROXY.ssl_cert_directory
330
 
            host_dir = os.path.join(base_ssl, 'host_certs')
331
 
            ca_dir = os.path.join(base_ssl, 'ca_certs')
332
 
            combined_dir = os.path.join(base_ssl, 'combined')
333
 
            combined_cert = os.path.join(combined_dir, '%s.pem' % server)
334
 
            if not os.path.exists(base_ssl):
335
 
                raise cfg.Error(_('ssl_cert_directory [%s] does not exist. '
336
 
                                  'Create it or disable ssl.') % base_ssl)
337
 
            for automake in [combined_dir, ca_dir, host_dir]:
338
 
                if not os.path.exists(automake):
339
 
                    os.makedirs(automake)
340
 
 
341
 
            # get all CA certs
342
 
            certs = self._get_ca_cert_paths(ca_dir)
343
 
 
344
 
            # check for a host specific cert
345
 
            hcert, exists = self._get_host_cert_path(host_dir, server)
346
 
            if exists:
347
 
                certs.append(hcert)
348
 
            elif cfg.CONF.RESTPROXY.ssl_sticky:
349
 
                self._fetch_and_store_cert(server, port, hcert)
350
 
                certs.append(hcert)
351
 
            if not certs:
352
 
                raise cfg.Error(_('No certificates were found to verify '
353
 
                                  'controller %s') % (server))
354
 
            self._combine_certs_to_file(certs, combined_cert)
355
 
        return combined_cert
356
 
 
357
 
    def _combine_certs_to_file(self, certs, cfile):
358
 
        '''
359
 
        Concatenates the contents of each certificate in a list of
360
 
        certificate paths to one combined location for use with ssl
361
 
        sockets.
362
 
        '''
363
 
        with open(cfile, 'w') as combined:
364
 
            for c in certs:
365
 
                with open(c, 'r') as cert_handle:
366
 
                    combined.write(cert_handle.read())
367
 
 
368
 
    def _get_host_cert_path(self, host_dir, server):
369
 
        '''
370
 
        returns full path and boolean indicating existence
371
 
        '''
372
 
        hcert = os.path.join(host_dir, '%s.pem' % server)
373
 
        if os.path.exists(hcert):
374
 
            return hcert, True
375
 
        return hcert, False
376
 
 
377
 
    def _get_ca_cert_paths(self, ca_dir):
378
 
        certs = [os.path.join(root, name)
379
 
                 for name in [
380
 
                     name for (root, dirs, files) in os.walk(ca_dir)
381
 
                     for name in files
382
 
                 ]
383
 
                 if name.endswith('.pem')]
384
 
        return certs
385
 
 
386
 
    def _fetch_and_store_cert(self, server, port, path):
387
 
        '''
388
 
        Grabs a certificate from a server and writes it to
389
 
        a given path.
390
 
        '''
391
 
        try:
392
 
            cert = ssl.get_server_certificate((server, port),
393
 
                                              ssl_version=ssl.PROTOCOL_TLSv1)
394
 
        except Exception as e:
395
 
            raise cfg.Error(_('Could not retrieve initial '
396
 
                              'certificate from controller %(server)s. '
397
 
                              'Error details: %(error)s') %
398
 
                            {'server': server, 'error': e})
399
 
 
400
 
        LOG.warning(_LW("Storing to certificate for host %(server)s "
401
 
                        "at %(path)s"), {'server': server,
402
 
                                         'path': path})
403
 
        self._file_put_contents(path, cert)
404
 
 
405
 
        return cert
406
 
 
407
 
    def _file_put_contents(self, path, contents):
408
 
        # Simple method to write to file.
409
 
        # Created for easy Mocking
410
 
        with open(path, 'w') as handle:
411
 
            handle.write(contents)
412
 
 
413
 
    def server_failure(self, resp, ignore_codes=[]):
414
 
        """Define failure codes as required.
415
 
 
416
 
        Note: We assume 301-303 is a failure, and try the next server in
417
 
        the server pool.
418
 
        """
419
 
        return (resp[0] in FAILURE_CODES and resp[0] not in ignore_codes)
420
 
 
421
 
    def action_success(self, resp):
422
 
        """Defining success codes as required.
423
 
 
424
 
        Note: We assume any valid 2xx as being successful response.
425
 
        """
426
 
        return resp[0] in SUCCESS_CODES
427
 
 
428
 
    def rest_call(self, action, resource, data, headers, ignore_codes,
429
 
                  timeout=False):
430
 
        context = self.get_context_ref()
431
 
        if context:
432
 
            # include the requesting context information if available
433
 
            cdict = context.to_dict()
434
 
            # remove the auth token so it's not present in debug logs on the
435
 
            # backend controller
436
 
            cdict.pop('auth_token', None)
437
 
            headers[REQ_CONTEXT_HEADER] = jsonutils.dumps(cdict)
438
 
        hash_handler = cdb.HashHandler()
439
 
        good_first = sorted(self.servers, key=lambda x: x.failed)
440
 
        first_response = None
441
 
        for active_server in good_first:
442
 
            for x in range(HTTP_SERVICE_UNAVAILABLE_RETRY_COUNT + 1):
443
 
                ret = active_server.rest_call(action, resource, data, headers,
444
 
                                              timeout,
445
 
                                              reconnect=self.always_reconnect,
446
 
                                              hash_handler=hash_handler)
447
 
                if ret[0] != httplib.SERVICE_UNAVAILABLE:
448
 
                    break
449
 
                time.sleep(HTTP_SERVICE_UNAVAILABLE_RETRY_INTERVAL)
450
 
 
451
 
            # If inconsistent, do a full synchronization
452
 
            if ret[0] == httplib.CONFLICT:
453
 
                if not self.get_topo_function:
454
 
                    raise cfg.Error(_('Server requires synchronization, '
455
 
                                      'but no topology function was defined.'))
456
 
                data = self.get_topo_function(**self.get_topo_function_args)
457
 
                active_server.rest_call('PUT', TOPOLOGY_PATH, data,
458
 
                                        timeout=None)
459
 
            # Store the first response as the error to be bubbled up to the
460
 
            # user since it was a good server. Subsequent servers will most
461
 
            # likely be cluster slaves and won't have a useful error for the
462
 
            # user (e.g. 302 redirect to master)
463
 
            if not first_response:
464
 
                first_response = ret
465
 
            if not self.server_failure(ret, ignore_codes):
466
 
                active_server.failed = False
467
 
                return ret
468
 
            else:
469
 
                LOG.error(_LE('ServerProxy: %(action)s failure for servers: '
470
 
                              '%(server)r Response: %(response)s'),
471
 
                          {'action': action,
472
 
                           'server': (active_server.server,
473
 
                                      active_server.port),
474
 
                           'response': ret[3]})
475
 
                LOG.error(_LE("ServerProxy: Error details: status=%(status)d, "
476
 
                              "reason=%(reason)r, ret=%(ret)s, data=%(data)r"),
477
 
                          {'status': ret[0], 'reason': ret[1], 'ret': ret[2],
478
 
                           'data': ret[3]})
479
 
                active_server.failed = True
480
 
 
481
 
        # A failure on a delete means the object is gone from Neutron but not
482
 
        # from the controller. Set the consistency hash to a bad value to
483
 
        # trigger a sync on the next check.
484
 
        # NOTE: The hash must have a comma in it otherwise it will be ignored
485
 
        # by the backend.
486
 
        if action == 'DELETE':
487
 
            hash_handler.put_hash('INCONSISTENT,INCONSISTENT')
488
 
        # All servers failed, reset server list and try again next time
489
 
        LOG.error(_LE('ServerProxy: %(action)s failure for all servers: '
490
 
                      '%(server)r'),
491
 
                  {'action': action,
492
 
                   'server': tuple((s.server,
493
 
                                    s.port) for s in self.servers)})
494
 
        return first_response
495
 
 
496
 
    def rest_action(self, action, resource, data='', errstr='%s',
497
 
                    ignore_codes=None, headers=None, timeout=False):
498
 
        """
499
 
        Wrapper for rest_call that verifies success and raises a
500
 
        RemoteRestError on failure with a provided error string
501
 
        By default, 404 errors on DELETE calls are ignored because
502
 
        they already do not exist on the backend.
503
 
        """
504
 
        ignore_codes = ignore_codes or []
505
 
        headers = headers or {}
506
 
        if not ignore_codes and action == 'DELETE':
507
 
            ignore_codes = [404]
508
 
        resp = self.rest_call(action, resource, data, headers, ignore_codes,
509
 
                              timeout)
510
 
        if self.server_failure(resp, ignore_codes):
511
 
            LOG.error(errstr, resp[2])
512
 
            raise RemoteRestError(reason=resp[2], status=resp[0])
513
 
        if resp[0] in ignore_codes:
514
 
            LOG.info(_LI("NeutronRestProxyV2: Received and ignored error "
515
 
                         "code %(code)s on %(action)s action to resource "
516
 
                         "%(resource)s"),
517
 
                     {'code': resp[2], 'action': action,
518
 
                      'resource': resource})
519
 
        return resp
520
 
 
521
 
    def rest_create_router(self, tenant_id, router):
522
 
        resource = ROUTER_RESOURCE_PATH % tenant_id
523
 
        data = {"router": router}
524
 
        errstr = _("Unable to create remote router: %s")
525
 
        self.rest_action('POST', resource, data, errstr)
526
 
 
527
 
    def rest_update_router(self, tenant_id, router, router_id):
528
 
        resource = ROUTERS_PATH % (tenant_id, router_id)
529
 
        data = {"router": router}
530
 
        errstr = _("Unable to update remote router: %s")
531
 
        self.rest_action('PUT', resource, data, errstr)
532
 
 
533
 
    def rest_delete_router(self, tenant_id, router_id):
534
 
        resource = ROUTERS_PATH % (tenant_id, router_id)
535
 
        errstr = _("Unable to delete remote router: %s")
536
 
        self.rest_action('DELETE', resource, errstr=errstr)
537
 
 
538
 
    def rest_add_router_interface(self, tenant_id, router_id, intf_details):
539
 
        resource = ROUTER_INTF_OP_PATH % (tenant_id, router_id)
540
 
        data = {"interface": intf_details}
541
 
        errstr = _("Unable to add router interface: %s")
542
 
        self.rest_action('POST', resource, data, errstr)
543
 
 
544
 
    def rest_remove_router_interface(self, tenant_id, router_id, interface_id):
545
 
        resource = ROUTER_INTF_PATH % (tenant_id, router_id, interface_id)
546
 
        errstr = _("Unable to delete remote intf: %s")
547
 
        self.rest_action('DELETE', resource, errstr=errstr)
548
 
 
549
 
    def rest_create_network(self, tenant_id, network):
550
 
        resource = NET_RESOURCE_PATH % tenant_id
551
 
        data = {"network": network}
552
 
        errstr = _("Unable to create remote network: %s")
553
 
        self.rest_action('POST', resource, data, errstr)
554
 
 
555
 
    def rest_update_network(self, tenant_id, net_id, network):
556
 
        resource = NETWORKS_PATH % (tenant_id, net_id)
557
 
        data = {"network": network}
558
 
        errstr = _("Unable to update remote network: %s")
559
 
        self.rest_action('PUT', resource, data, errstr)
560
 
 
561
 
    def rest_delete_network(self, tenant_id, net_id):
562
 
        resource = NETWORKS_PATH % (tenant_id, net_id)
563
 
        errstr = _("Unable to update remote network: %s")
564
 
        self.rest_action('DELETE', resource, errstr=errstr)
565
 
 
566
 
    def rest_create_port(self, tenant_id, net_id, port):
567
 
        resource = ATTACHMENT_PATH % (tenant_id, net_id, port["id"])
568
 
        data = {"port": port}
569
 
        device_id = port.get("device_id")
570
 
        if not port["mac_address"] or not device_id:
571
 
            # controller only cares about ports attached to devices
572
 
            LOG.warning(_LW("No device MAC attached to port %s. "
573
 
                            "Skipping notification to controller."),
574
 
                        port["id"])
575
 
            return
576
 
        data["attachment"] = {"id": device_id,
577
 
                              "mac": port["mac_address"]}
578
 
        errstr = _("Unable to create remote port: %s")
579
 
        self.rest_action('PUT', resource, data, errstr)
580
 
 
581
 
    def rest_delete_port(self, tenant_id, network_id, port_id):
582
 
        resource = ATTACHMENT_PATH % (tenant_id, network_id, port_id)
583
 
        errstr = _("Unable to delete remote port: %s")
584
 
        self.rest_action('DELETE', resource, errstr=errstr)
585
 
 
586
 
    def rest_update_port(self, tenant_id, net_id, port):
587
 
        # Controller has no update operation for the port endpoint
588
 
        # the create PUT method will replace
589
 
        self.rest_create_port(tenant_id, net_id, port)
590
 
 
591
 
    def rest_create_floatingip(self, tenant_id, floatingip):
592
 
        resource = FLOATINGIPS_PATH % (tenant_id, floatingip['id'])
593
 
        errstr = _("Unable to create floating IP: %s")
594
 
        self.rest_action('PUT', resource, floatingip, errstr=errstr)
595
 
 
596
 
    def rest_update_floatingip(self, tenant_id, floatingip, oldid):
597
 
        resource = FLOATINGIPS_PATH % (tenant_id, oldid)
598
 
        errstr = _("Unable to update floating IP: %s")
599
 
        self.rest_action('PUT', resource, floatingip, errstr=errstr)
600
 
 
601
 
    def rest_delete_floatingip(self, tenant_id, oldid):
602
 
        resource = FLOATINGIPS_PATH % (tenant_id, oldid)
603
 
        errstr = _("Unable to delete floating IP: %s")
604
 
        self.rest_action('DELETE', resource, errstr=errstr)
605
 
 
606
 
    def rest_get_switch(self, switch_id):
607
 
        resource = SWITCHES_PATH % switch_id
608
 
        errstr = _("Unable to retrieve switch: %s")
609
 
        resp = self.rest_action('GET', resource, errstr=errstr,
610
 
                                ignore_codes=[404])
611
 
        # return None if switch not found, else return switch info
612
 
        return None if resp[0] == 404 else resp[3]
613
 
 
614
 
    def _consistency_watchdog(self, polling_interval=60):
615
 
        if 'consistency' not in self.get_capabilities():
616
 
            LOG.warning(_LW("Backend server(s) do not support automated "
617
 
                            "consitency checks."))
618
 
            return
619
 
        if not polling_interval:
620
 
            LOG.warning(_LW("Consistency watchdog disabled by polling "
621
 
                            "interval setting of %s."), polling_interval)
622
 
            return
623
 
        while True:
624
 
            # If consistency is supported, all we have to do is make any
625
 
            # rest call and the consistency header will be added. If it
626
 
            # doesn't match, the backend will return a synchronization error
627
 
            # that will be handled by the rest_action.
628
 
            eventlet.sleep(polling_interval)
629
 
            try:
630
 
                self.rest_action('GET', HEALTH_PATH)
631
 
            except Exception:
632
 
                LOG.exception(_LE("Encountered an error checking controller "
633
 
                                  "health."))
634
 
 
635
 
 
636
 
class HTTPSConnectionWithValidation(httplib.HTTPSConnection):
637
 
 
638
 
    # If combined_cert is None, the connection will continue without
639
 
    # any certificate validation.
640
 
    combined_cert = None
641
 
 
642
 
    def connect(self):
643
 
        sock = socket.create_connection((self.host, self.port),
644
 
                                        self.timeout, self.source_address)
645
 
        if self._tunnel_host:
646
 
            self.sock = sock
647
 
            self._tunnel()
648
 
 
649
 
        if self.combined_cert:
650
 
            self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
651
 
                                        cert_reqs=ssl.CERT_REQUIRED,
652
 
                                        ca_certs=self.combined_cert,
653
 
                                        ssl_version=ssl.PROTOCOL_TLSv1)
654
 
        else:
655
 
            self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
656
 
                                        cert_reqs=ssl.CERT_NONE,
657
 
                                        ssl_version=ssl.PROTOCOL_TLSv1)