1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2012 Nicira, Inc.
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may
7
# not use this file except in compliance with the License. You may obtain
8
# a copy of the License at
10
# http://www.apache.org/licenses/LICENSE-2.0
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15
# License for the specific language governing permissions and limitations
18
# @author: David Lapsley <dlapsley@nicira.com>, Nicira Networks, Inc.
19
# @author: Aaron Rosen, Nicira Networks, Inc.
22
from abc import ABCMeta
28
from quantum.plugins.nicira.api_client.common import (
32
logging.basicConfig(level=logging.INFO)
33
LOG = logging.getLogger(__name__)
35
GENERATION_ID_TIMEOUT = -1
36
DEFAULT_CONCURRENT_CONNECTIONS = 3
37
DEFAULT_CONNECT_TIMEOUT = 5
40
class NvpApiClient(object):
41
'''An abstract baseclass for all NvpApiClient implementations.
43
This defines the interface and property structure for synchronous and
44
coroutine-based classes.
47
__metaclass__ = ABCMeta
49
CONN_IDLE_TIMEOUT = 60 * 15
51
def _create_connection(self, host, port, is_ssl):
53
return httplib.HTTPSConnection(host, port,
54
timeout=self._connect_timeout)
55
return httplib.HTTPConnection(host, port,
56
timeout=self._connect_timeout)
59
def _conn_params(http_conn):
60
is_ssl = isinstance(http_conn, httplib.HTTPSConnection)
61
return (http_conn.host, http_conn.port, is_ssl)
72
def nvp_config_gen(self):
73
# If nvp_gen_timeout is not -1 then:
74
# Maintain a timestamp along with the generation ID. Hold onto the
75
# ID long enough to be useful and block on sequential requests but
76
# not long enough to persist when Onix db is cleared, which resets
77
# the generation ID, causing the DAL to block indefinitely with some
78
# number that's higher than the cluster's value.
79
if self._nvp_gen_timeout != -1:
80
ts = self._nvp_config_gen_ts
82
if (time.time() - ts) > self._nvp_gen_timeout:
84
return self._nvp_config_gen
86
@nvp_config_gen.setter
87
def nvp_config_gen(self, value):
88
if self._nvp_config_gen != value:
89
if self._nvp_gen_timeout != -1:
90
self._nvp_config_gen_ts = time.time()
91
self._nvp_config_gen = value
93
def auth_cookie(self, conn):
95
data = self._get_provider_data(conn)
100
def set_auth_cookie(self, conn, cookie):
101
data = self._get_provider_data(conn)
103
self._set_provider_data(conn, (data[0], cookie))
105
def acquire_connection(self, auto_login=True, headers=None, rid=-1):
106
'''Check out an available HTTPConnection instance.
108
Blocks until a connection is available.
109
:auto_login: automatically logins before returning conn
110
:headers: header to pass on to login attempt
111
:param rid: request id passed in from request eventlet.
112
:returns: An available HTTPConnection instance or None if no
113
api_providers are configured.
115
if not self._api_providers:
116
LOG.warn(_("[%d] no API providers currently available."), rid)
118
if self._conn_pool.empty():
119
LOG.debug(_("[%d] Waiting to acquire API client connection."), rid)
120
priority, conn = self._conn_pool.get()
122
if getattr(conn, 'last_used', now) < now - self.CONN_IDLE_TIMEOUT:
123
LOG.info(_("[%(rid)d] Connection %(conn)s idle for %(sec)0.2f "
124
"seconds; reconnecting."),
125
{'rid': rid, 'conn': _conn_str(conn),
126
'sec': now - conn.last_used})
127
conn = self._create_connection(*self._conn_params(conn))
130
conn.priority = priority # stash current priority for release
131
qsize = self._conn_pool.qsize()
132
LOG.debug(_("[%(rid)d] Acquired connection %(conn)s. %(qsize)d "
133
"connection(s) available."),
134
{'rid': rid, 'conn': _conn_str(conn), 'qsize': qsize})
135
if auto_login and self.auth_cookie(conn) is None:
136
self._wait_for_login(conn, headers)
139
def release_connection(self, http_conn, bad_state=False,
140
service_unavail=False, rid=-1):
141
'''Mark HTTPConnection instance as available for check-out.
143
:param http_conn: An HTTPConnection instance obtained from this
145
:param bad_state: True if http_conn is known to be in a bad state
146
(e.g. connection fault.)
147
:service_unavail: True if http_conn returned 503 response.
148
:param rid: request id passed in from request eventlet.
150
conn_params = self._conn_params(http_conn)
151
if self._conn_params(http_conn) not in self._api_providers:
152
LOG.debug(_("[%(rid)d] Released connection %(conn)s is not an "
153
"API provider for the cluster"),
154
{'rid': rid, 'conn': _conn_str(http_conn)})
156
elif hasattr(http_conn, "no_release"):
160
# Reconnect to provider.
161
LOG.warn(_("[%(rid)d] Connection returned in bad state, "
162
"reconnecting to %(conn)s"),
163
{'rid': rid, 'conn': _conn_str(http_conn)})
164
http_conn = self._create_connection(*self._conn_params(http_conn))
165
priority = self._next_conn_priority
166
self._next_conn_priority += 1
167
elif service_unavail:
168
# http_conn returned a service unaviable response, put other
169
# connections to the same controller at end of priority queue,
171
while not self._conn_pool.empty():
172
priority, conn = self._conn_pool.get()
173
if self._conn_params(conn) == conn_params:
174
priority = self._next_conn_priority
175
self._next_conn_priority += 1
176
conns.append((priority, conn))
177
for priority, conn in conns:
178
self._conn_pool.put((priority, conn))
179
# put http_conn at end of queue also
180
priority = self._next_conn_priority
181
self._next_conn_priority += 1
183
priority = http_conn.priority
185
self._conn_pool.put((priority, http_conn))
186
LOG.debug(_("[%(rid)d] Released connection %(conn)s. %(qsize)d "
187
"connection(s) available."),
188
{'rid': rid, 'conn': _conn_str(http_conn),
189
'qsize': self._conn_pool.qsize()})
191
def _wait_for_login(self, conn, headers=None):
192
'''Block until a login has occurred for the current API provider.'''
194
data = self._get_provider_data(conn)
196
LOG.error(_("Login request for an invalid connection: '%s'"),
199
provider_sem = data[0]
200
if provider_sem.acquire(blocking=False):
202
cookie = self._login(conn, headers)
203
self.set_auth_cookie(conn, cookie)
205
provider_sem.release()
207
LOG.debug(_("Waiting for auth to complete"))
208
# Wait until we can aquire then release
209
provider_sem.acquire(blocking=True)
210
provider_sem.release()
212
def _get_provider_data(self, conn_or_conn_params, default=None):
213
"""Get data for specified API provider.
216
conn_or_conn_params: either a HTTP(S)Connection object or the
217
resolved conn_params tuple returned by self._conn_params().
218
default: conn_params if ones passed aren't known
219
Returns: Data associated with specified provider
221
conn_params = self._normalize_conn_params(conn_or_conn_params)
222
return self._api_provider_data.get(conn_params, default)
224
def _set_provider_data(self, conn_or_conn_params, data):
225
"""Set data for specified API provider.
228
conn_or_conn_params: either a HTTP(S)Connection object or the
229
resolved conn_params tuple returned by self._conn_params().
230
data: data to associate with API provider
232
conn_params = self._normalize_conn_params(conn_or_conn_params)
234
del self._api_provider_data[conn_params]
236
self._api_provider_data[conn_params] = data
238
def _normalize_conn_params(self, conn_or_conn_params):
239
"""Normalize conn_param tuple.
242
conn_or_conn_params: either a HTTP(S)Connection object or the
243
resolved conn_params tuple returned by self._conn_params().
245
Returns: Normalized conn_param tuple
247
if (not isinstance(conn_or_conn_params, tuple) and
248
not isinstance(conn_or_conn_params, httplib.HTTPConnection)):
249
LOG.debug(_("Invalid conn_params value: '%s'"),
250
str(conn_or_conn_params))
251
return conn_or_conn_params
252
if isinstance(conn_or_conn_params, httplib.HTTPConnection):
253
conn_params = self._conn_params(conn_or_conn_params)
255
conn_params = conn_or_conn_params
256
host, port, is_ssl = conn_params
258
port = 443 if is_ssl else 80
259
return (host, port, is_ssl)
261
def update_providers(self, api_providers):
262
new_providers = set([tuple(p) for p in api_providers])
263
if new_providers != self._api_providers:
265
while not self._conn_pool.empty():
266
priority, conn = self._conn_pool.get_nowait()
267
if self._conn_params(conn) in new_providers:
268
new_conns.append((priority, conn))
270
to_subtract = self._api_providers - new_providers
271
for p in to_subtract:
272
self._set_provider_data(p, None)
273
to_add = new_providers - self._api_providers
274
for unused_i in range(self._concurrent_connections):
275
for host, port, is_ssl in to_add:
276
conn = self._create_connection(host, port, is_ssl)
277
new_conns.append((self._next_conn_priority, conn))
278
self._next_conn_priority += 1
280
for priority, conn in new_conns:
281
self._conn_pool.put((priority, conn))
282
self._api_providers = new_providers