1
# Copyright 2012 VMware, Inc.
5
# Licensed under the Apache License, Version 2.0 (the "License"); you may
6
# not use this file except in compliance with the License. You may obtain
7
# a copy of the License at
9
# http://www.apache.org/licenses/LICENSE-2.0
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
# License for the specific language governing permissions and limitations
22
from oslo.config import cfg
24
from neutron.i18n import _LE, _LI, _LW
25
from neutron.openstack.common import log as logging
26
from neutron.plugins.vmware import api_client
28
LOG = logging.getLogger(__name__)
30
GENERATION_ID_TIMEOUT = -1
31
DEFAULT_CONCURRENT_CONNECTIONS = 3
32
DEFAULT_CONNECT_TIMEOUT = 5
35
@six.add_metaclass(abc.ABCMeta)
36
class ApiClientBase(object):
37
"""An abstract baseclass for all API client implementations."""
39
def _create_connection(self, host, port, is_ssl):
41
return httplib.HTTPSConnection(host, port,
42
timeout=self._connect_timeout)
43
return httplib.HTTPConnection(host, port,
44
timeout=self._connect_timeout)
47
def _conn_params(http_conn):
48
is_ssl = isinstance(http_conn, httplib.HTTPSConnection)
49
return (http_conn.host, http_conn.port, is_ssl)
61
# If NSX_gen_timeout is not -1 then:
62
# Maintain a timestamp along with the generation ID. Hold onto the
63
# ID long enough to be useful and block on sequential requests but
64
# not long enough to persist when Onix db is cleared, which resets
65
# the generation ID, causing the DAL to block indefinitely with some
66
# number that's higher than the cluster's value.
67
if self._gen_timeout != -1:
68
ts = self._config_gen_ts
70
if (time.time() - ts) > self._gen_timeout:
72
return self._config_gen
75
def config_gen(self, value):
76
if self._config_gen != value:
77
if self._gen_timeout != -1:
78
self._config_gen_ts = time.time()
79
self._config_gen = value
81
def auth_cookie(self, conn):
83
data = self._get_provider_data(conn)
88
def set_auth_cookie(self, conn, cookie):
89
data = self._get_provider_data(conn)
91
self._set_provider_data(conn, (data[0], cookie))
93
def acquire_connection(self, auto_login=True, headers=None, rid=-1):
94
'''Check out an available HTTPConnection instance.
96
Blocks until a connection is available.
97
:auto_login: automatically logins before returning conn
98
:headers: header to pass on to login attempt
99
:param rid: request id passed in from request eventlet.
100
:returns: An available HTTPConnection instance or None if no
101
api_providers are configured.
103
if not self._api_providers:
104
LOG.warn(_LW("[%d] no API providers currently available."), rid)
106
if self._conn_pool.empty():
107
LOG.debug("[%d] Waiting to acquire API client connection.", rid)
108
priority, conn = self._conn_pool.get()
110
if getattr(conn, 'last_used', now) < now - cfg.CONF.conn_idle_timeout:
111
LOG.info(_LI("[%(rid)d] Connection %(conn)s idle for %(sec)0.2f "
112
"seconds; reconnecting."),
113
{'rid': rid, 'conn': api_client.ctrl_conn_to_str(conn),
114
'sec': now - conn.last_used})
115
conn = self._create_connection(*self._conn_params(conn))
118
conn.priority = priority # stash current priority for release
119
qsize = self._conn_pool.qsize()
120
LOG.debug("[%(rid)d] Acquired connection %(conn)s. %(qsize)d "
121
"connection(s) available.",
122
{'rid': rid, 'conn': api_client.ctrl_conn_to_str(conn),
124
if auto_login and self.auth_cookie(conn) is None:
125
self._wait_for_login(conn, headers)
128
def release_connection(self, http_conn, bad_state=False,
129
service_unavail=False, rid=-1):
130
'''Mark HTTPConnection instance as available for check-out.
132
:param http_conn: An HTTPConnection instance obtained from this
134
:param bad_state: True if http_conn is known to be in a bad state
135
(e.g. connection fault.)
136
:service_unavail: True if http_conn returned 503 response.
137
:param rid: request id passed in from request eventlet.
139
conn_params = self._conn_params(http_conn)
140
if self._conn_params(http_conn) not in self._api_providers:
141
LOG.debug("[%(rid)d] Released connection %(conn)s is not an "
142
"API provider for the cluster",
144
'conn': api_client.ctrl_conn_to_str(http_conn)})
146
elif hasattr(http_conn, "no_release"):
149
priority = http_conn.priority
151
# Reconnect to provider.
152
LOG.warn(_LW("[%(rid)d] Connection returned in bad state, "
153
"reconnecting to %(conn)s"),
155
'conn': api_client.ctrl_conn_to_str(http_conn)})
156
http_conn = self._create_connection(*self._conn_params(http_conn))
157
elif service_unavail:
158
# http_conn returned a service unaviable response, put other
159
# connections to the same controller at end of priority queue,
161
while not self._conn_pool.empty():
162
priority, conn = self._conn_pool.get()
163
if self._conn_params(conn) == conn_params:
164
priority = self._next_conn_priority
165
self._next_conn_priority += 1
166
conns.append((priority, conn))
167
for priority, conn in conns:
168
self._conn_pool.put((priority, conn))
169
# put http_conn at end of queue also
170
priority = self._next_conn_priority
171
self._next_conn_priority += 1
173
self._conn_pool.put((priority, http_conn))
174
LOG.debug("[%(rid)d] Released connection %(conn)s. %(qsize)d "
175
"connection(s) available.",
176
{'rid': rid, 'conn': api_client.ctrl_conn_to_str(http_conn),
177
'qsize': self._conn_pool.qsize()})
179
def _wait_for_login(self, conn, headers=None):
180
'''Block until a login has occurred for the current API provider.'''
182
data = self._get_provider_data(conn)
184
LOG.error(_LE("Login request for an invalid connection: '%s'"),
185
api_client.ctrl_conn_to_str(conn))
187
provider_sem = data[0]
188
if provider_sem.acquire(blocking=False):
190
cookie = self._login(conn, headers)
191
self.set_auth_cookie(conn, cookie)
193
provider_sem.release()
195
LOG.debug("Waiting for auth to complete")
196
# Wait until we can acquire then release
197
provider_sem.acquire(blocking=True)
198
provider_sem.release()
200
def _get_provider_data(self, conn_or_conn_params, default=None):
201
"""Get data for specified API provider.
204
conn_or_conn_params: either a HTTP(S)Connection object or the
205
resolved conn_params tuple returned by self._conn_params().
206
default: conn_params if ones passed aren't known
207
Returns: Data associated with specified provider
209
conn_params = self._normalize_conn_params(conn_or_conn_params)
210
return self._api_provider_data.get(conn_params, default)
212
def _set_provider_data(self, conn_or_conn_params, data):
213
"""Set data for specified API provider.
216
conn_or_conn_params: either a HTTP(S)Connection object or the
217
resolved conn_params tuple returned by self._conn_params().
218
data: data to associate with API provider
220
conn_params = self._normalize_conn_params(conn_or_conn_params)
222
del self._api_provider_data[conn_params]
224
self._api_provider_data[conn_params] = data
226
def _normalize_conn_params(self, conn_or_conn_params):
227
"""Normalize conn_param tuple.
230
conn_or_conn_params: either a HTTP(S)Connection object or the
231
resolved conn_params tuple returned by self._conn_params().
233
Returns: Normalized conn_param tuple
235
if (not isinstance(conn_or_conn_params, tuple) and
236
not isinstance(conn_or_conn_params, httplib.HTTPConnection)):
237
LOG.debug("Invalid conn_params value: '%s'",
238
str(conn_or_conn_params))
239
return conn_or_conn_params
240
if isinstance(conn_or_conn_params, httplib.HTTPConnection):
241
conn_params = self._conn_params(conn_or_conn_params)
243
conn_params = conn_or_conn_params
244
host, port, is_ssl = conn_params
246
port = 443 if is_ssl else 80
247
return (host, port, is_ssl)