~ubuntu-branches/ubuntu/saucy/quantum/saucy

« back to all changes in this revision

Viewing changes to quantum/plugins/nicira/api_client/client.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2013-05-31 09:37:25 UTC
  • mto: This revision was merged to the branch mainline in revision 34.
  • Revision ID: package-import@ubuntu.com-20130531093725-09i9oem8a2xlw442
Tags: upstream-2013.2~b1
ImportĀ upstreamĀ versionĀ 2013.2~b1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
 
 
3
# Copyright 2012 Nicira, Inc.
 
4
# All Rights Reserved
 
5
#
 
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 
7
# not use this file except in compliance with the License. You may obtain
 
8
# a copy of the License at
 
9
#
 
10
# http://www.apache.org/licenses/LICENSE-2.0
 
11
#
 
12
# Unless required by applicable law or agreed to in writing, software
 
13
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
14
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
15
# License for the specific language governing permissions and limitations
 
16
# under the License.
 
17
#
 
18
# @author: David Lapsley <dlapsley@nicira.com>, Nicira Networks, Inc.
 
19
# @author: Aaron Rosen, Nicira Networks, Inc.
 
20
 
 
21
 
 
22
from abc import ABCMeta
 
23
import httplib
 
24
import logging
 
25
import time
 
26
 
 
27
 
 
28
from quantum.plugins.nicira.api_client.common import (
 
29
    _conn_str)
 
30
 
 
31
 
 
32
logging.basicConfig(level=logging.INFO)
 
33
LOG = logging.getLogger(__name__)
 
34
#Default parameters.
 
35
GENERATION_ID_TIMEOUT = -1
 
36
DEFAULT_CONCURRENT_CONNECTIONS = 3
 
37
DEFAULT_CONNECT_TIMEOUT = 5
 
38
 
 
39
 
 
40
class NvpApiClient(object):
 
41
    '''An abstract baseclass for all NvpApiClient implementations.
 
42
 
 
43
    This defines the interface and property structure for synchronous and
 
44
    coroutine-based classes.
 
45
    '''
 
46
 
 
47
    __metaclass__ = ABCMeta
 
48
 
 
49
    CONN_IDLE_TIMEOUT = 60 * 15
 
50
 
 
51
    def _create_connection(self, host, port, is_ssl):
 
52
        if is_ssl:
 
53
            return httplib.HTTPSConnection(host, port,
 
54
                                           timeout=self._connect_timeout)
 
55
        return httplib.HTTPConnection(host, port,
 
56
                                      timeout=self._connect_timeout)
 
57
 
 
58
    @staticmethod
 
59
    def _conn_params(http_conn):
 
60
        is_ssl = isinstance(http_conn, httplib.HTTPSConnection)
 
61
        return (http_conn.host, http_conn.port, is_ssl)
 
62
 
 
63
    @property
 
64
    def user(self):
 
65
        return self._user
 
66
 
 
67
    @property
 
68
    def password(self):
 
69
        return self._password
 
70
 
 
71
    @property
 
72
    def nvp_config_gen(self):
 
73
        # If nvp_gen_timeout is not -1 then:
 
74
        # Maintain a timestamp along with the generation ID.  Hold onto the
 
75
        # ID long enough to be useful and block on sequential requests but
 
76
        # not long enough to persist when Onix db is cleared, which resets
 
77
        # the generation ID, causing the DAL to block indefinitely with some
 
78
        # number that's higher than the cluster's value.
 
79
        if self._nvp_gen_timeout != -1:
 
80
            ts = self._nvp_config_gen_ts
 
81
            if ts is not None:
 
82
                if (time.time() - ts) > self._nvp_gen_timeout:
 
83
                    return None
 
84
        return self._nvp_config_gen
 
85
 
 
86
    @nvp_config_gen.setter
 
87
    def nvp_config_gen(self, value):
 
88
        if self._nvp_config_gen != value:
 
89
            if self._nvp_gen_timeout != -1:
 
90
                self._nvp_config_gen_ts = time.time()
 
91
        self._nvp_config_gen = value
 
92
 
 
93
    def auth_cookie(self, conn):
 
94
        cookie = None
 
95
        data = self._get_provider_data(conn)
 
96
        if data:
 
97
            cookie = data[1]
 
98
        return cookie
 
99
 
 
100
    def set_auth_cookie(self, conn, cookie):
 
101
        data = self._get_provider_data(conn)
 
102
        if data:
 
103
            self._set_provider_data(conn, (data[0], cookie))
 
104
 
 
105
    def acquire_connection(self, auto_login=True, headers=None, rid=-1):
 
106
        '''Check out an available HTTPConnection instance.
 
107
 
 
108
        Blocks until a connection is available.
 
109
        :auto_login: automatically logins before returning conn
 
110
        :headers: header to pass on to login attempt
 
111
        :param rid: request id passed in from request eventlet.
 
112
        :returns: An available HTTPConnection instance or None if no
 
113
                 api_providers are configured.
 
114
        '''
 
115
        if not self._api_providers:
 
116
            LOG.warn(_("[%d] no API providers currently available."), rid)
 
117
            return None
 
118
        if self._conn_pool.empty():
 
119
            LOG.debug(_("[%d] Waiting to acquire API client connection."), rid)
 
120
        priority, conn = self._conn_pool.get()
 
121
        now = time.time()
 
122
        if getattr(conn, 'last_used', now) < now - self.CONN_IDLE_TIMEOUT:
 
123
            LOG.info(_("[%(rid)d] Connection %(conn)s idle for %(sec)0.2f "
 
124
                       "seconds; reconnecting."),
 
125
                     {'rid': rid, 'conn': _conn_str(conn),
 
126
                      'sec': now - conn.last_used})
 
127
            conn = self._create_connection(*self._conn_params(conn))
 
128
 
 
129
        conn.last_used = now
 
130
        conn.priority = priority  # stash current priority for release
 
131
        qsize = self._conn_pool.qsize()
 
132
        LOG.debug(_("[%(rid)d] Acquired connection %(conn)s. %(qsize)d "
 
133
                    "connection(s) available."),
 
134
                  {'rid': rid, 'conn': _conn_str(conn), 'qsize': qsize})
 
135
        if auto_login and self.auth_cookie(conn) is None:
 
136
            self._wait_for_login(conn, headers)
 
137
        return conn
 
138
 
 
139
    def release_connection(self, http_conn, bad_state=False,
 
140
                           service_unavail=False, rid=-1):
 
141
        '''Mark HTTPConnection instance as available for check-out.
 
142
 
 
143
        :param http_conn: An HTTPConnection instance obtained from this
 
144
            instance.
 
145
        :param bad_state: True if http_conn is known to be in a bad state
 
146
                (e.g. connection fault.)
 
147
        :service_unavail: True if http_conn returned 503 response.
 
148
        :param rid: request id passed in from request eventlet.
 
149
        '''
 
150
        conn_params = self._conn_params(http_conn)
 
151
        if self._conn_params(http_conn) not in self._api_providers:
 
152
            LOG.debug(_("[%(rid)d] Released connection %(conn)s is not an "
 
153
                        "API provider for the cluster"),
 
154
                      {'rid': rid, 'conn': _conn_str(http_conn)})
 
155
            return
 
156
        elif hasattr(http_conn, "no_release"):
 
157
            return
 
158
 
 
159
        if bad_state:
 
160
            # Reconnect to provider.
 
161
            LOG.warn(_("[%(rid)d] Connection returned in bad state, "
 
162
                       "reconnecting to %(conn)s"),
 
163
                     {'rid': rid, 'conn': _conn_str(http_conn)})
 
164
            http_conn = self._create_connection(*self._conn_params(http_conn))
 
165
            priority = self._next_conn_priority
 
166
            self._next_conn_priority += 1
 
167
        elif service_unavail:
 
168
            # http_conn returned a service unaviable response, put other
 
169
            # connections to the same controller at end of priority queue,
 
170
            conns = []
 
171
            while not self._conn_pool.empty():
 
172
                priority, conn = self._conn_pool.get()
 
173
                if self._conn_params(conn) == conn_params:
 
174
                    priority = self._next_conn_priority
 
175
                    self._next_conn_priority += 1
 
176
                conns.append((priority, conn))
 
177
            for priority, conn in conns:
 
178
                self._conn_pool.put((priority, conn))
 
179
            # put http_conn at end of queue also
 
180
            priority = self._next_conn_priority
 
181
            self._next_conn_priority += 1
 
182
        else:
 
183
            priority = http_conn.priority
 
184
 
 
185
        self._conn_pool.put((priority, http_conn))
 
186
        LOG.debug(_("[%(rid)d] Released connection %(conn)s. %(qsize)d "
 
187
                    "connection(s) available."),
 
188
                  {'rid': rid, 'conn': _conn_str(http_conn),
 
189
                   'qsize': self._conn_pool.qsize()})
 
190
 
 
191
    def _wait_for_login(self, conn, headers=None):
 
192
        '''Block until a login has occurred for the current API provider.'''
 
193
 
 
194
        data = self._get_provider_data(conn)
 
195
        if data is None:
 
196
            LOG.error(_("Login request for an invalid connection: '%s'"),
 
197
                      _conn_str(conn))
 
198
            return
 
199
        provider_sem = data[0]
 
200
        if provider_sem.acquire(blocking=False):
 
201
            try:
 
202
                cookie = self._login(conn, headers)
 
203
                self.set_auth_cookie(conn, cookie)
 
204
            finally:
 
205
                provider_sem.release()
 
206
        else:
 
207
            LOG.debug(_("Waiting for auth to complete"))
 
208
            # Wait until we can aquire then release
 
209
            provider_sem.acquire(blocking=True)
 
210
            provider_sem.release()
 
211
 
 
212
    def _get_provider_data(self, conn_or_conn_params, default=None):
 
213
        """Get data for specified API provider.
 
214
 
 
215
        Args:
 
216
            conn_or_conn_params: either a HTTP(S)Connection object or the
 
217
                resolved conn_params tuple returned by self._conn_params().
 
218
            default: conn_params if ones passed aren't known
 
219
        Returns: Data associated with specified provider
 
220
        """
 
221
        conn_params = self._normalize_conn_params(conn_or_conn_params)
 
222
        return self._api_provider_data.get(conn_params, default)
 
223
 
 
224
    def _set_provider_data(self, conn_or_conn_params, data):
 
225
        """Set data for specified API provider.
 
226
 
 
227
        Args:
 
228
            conn_or_conn_params: either a HTTP(S)Connection object or the
 
229
                resolved conn_params tuple returned by self._conn_params().
 
230
            data: data to associate with API provider
 
231
        """
 
232
        conn_params = self._normalize_conn_params(conn_or_conn_params)
 
233
        if data is None:
 
234
            del self._api_provider_data[conn_params]
 
235
        else:
 
236
            self._api_provider_data[conn_params] = data
 
237
 
 
238
    def _normalize_conn_params(self, conn_or_conn_params):
 
239
        """Normalize conn_param tuple.
 
240
 
 
241
        Args:
 
242
            conn_or_conn_params: either a HTTP(S)Connection object or the
 
243
                resolved conn_params tuple returned by self._conn_params().
 
244
 
 
245
        Returns: Normalized conn_param tuple
 
246
        """
 
247
        if (not isinstance(conn_or_conn_params, tuple) and
 
248
            not isinstance(conn_or_conn_params, httplib.HTTPConnection)):
 
249
            LOG.debug(_("Invalid conn_params value: '%s'"),
 
250
                      str(conn_or_conn_params))
 
251
            return conn_or_conn_params
 
252
        if isinstance(conn_or_conn_params, httplib.HTTPConnection):
 
253
            conn_params = self._conn_params(conn_or_conn_params)
 
254
        else:
 
255
            conn_params = conn_or_conn_params
 
256
        host, port, is_ssl = conn_params
 
257
        if port is None:
 
258
            port = 443 if is_ssl else 80
 
259
        return (host, port, is_ssl)
 
260
 
 
261
    def update_providers(self, api_providers):
 
262
        new_providers = set([tuple(p) for p in api_providers])
 
263
        if new_providers != self._api_providers:
 
264
            new_conns = []
 
265
            while not self._conn_pool.empty():
 
266
                priority, conn = self._conn_pool.get_nowait()
 
267
                if self._conn_params(conn) in new_providers:
 
268
                    new_conns.append((priority, conn))
 
269
 
 
270
            to_subtract = self._api_providers - new_providers
 
271
            for p in to_subtract:
 
272
                self._set_provider_data(p, None)
 
273
            to_add = new_providers - self._api_providers
 
274
            for unused_i in range(self._concurrent_connections):
 
275
                for host, port, is_ssl in to_add:
 
276
                    conn = self._create_connection(host, port, is_ssl)
 
277
                    new_conns.append((self._next_conn_priority, conn))
 
278
                    self._next_conn_priority += 1
 
279
 
 
280
            for priority, conn in new_conns:
 
281
                self._conn_pool.put((priority, conn))
 
282
            self._api_providers = new_providers