1
# Copyright 2015 NEC Corporation. All rights reserved.
3
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4
# not use this file except in compliance with the License. You may obtain
5
# a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12
# License for the specific language governing permissions and limitations
16
from tempest_lib.common import rest_client
17
from tempest_lib import exceptions as lib_exc
19
from neutron.tests.tempest.common import service_client
20
from neutron.tests.tempest import exceptions
23
class TokenClientJSON(rest_client.RestClient):
25
def __init__(self, auth_url, disable_ssl_certificate_validation=None,
26
ca_certs=None, trace_requests=None):
27
dscv = disable_ssl_certificate_validation
28
super(TokenClientJSON, self).__init__(
29
None, None, None, disable_ssl_certificate_validation=dscv,
30
ca_certs=ca_certs, trace_requests=trace_requests)
32
# Normalize URI to ensure /tokens is in it.
33
if 'tokens' not in auth_url:
34
auth_url = auth_url.rstrip('/') + '/tokens'
36
self.auth_url = auth_url
38
def auth(self, user, password, tenant=None):
41
'passwordCredentials': {
49
creds['auth']['tenantName'] = tenant
51
body = json.dumps(creds)
52
resp, body = self.post(self.auth_url, body=body)
53
self.expected_success(200, resp.status)
55
return service_client.ResponseBody(resp, body['access'])
57
def auth_token(self, token_id, tenant=None):
67
creds['auth']['tenantName'] = tenant
69
body = json.dumps(creds)
70
resp, body = self.post(self.auth_url, body=body)
71
self.expected_success(200, resp.status)
73
return service_client.ResponseBody(resp, body['access'])
75
def request(self, method, url, extra_headers=False, headers=None,
77
"""A simple HTTP request interface."""
79
headers = self.get_headers(accept_type="json")
82
headers.update(self.get_headers(accept_type="json"))
83
except (ValueError, TypeError):
84
headers = self.get_headers(accept_type="json")
86
resp, resp_body = self.raw_request(url, method,
87
headers=headers, body=body)
88
self._log_request(method, url, resp)
90
if resp.status in [401, 403]:
91
resp_body = json.loads(resp_body)
92
raise lib_exc.Unauthorized(resp_body['error']['message'])
93
elif resp.status not in [200, 201]:
94
raise exceptions.IdentityError(
95
'Unexpected status code {0}'.format(resp.status))
97
if isinstance(resp_body, str):
98
resp_body = json.loads(resp_body)
99
return resp, resp_body
101
def get_token(self, user, password, tenant, auth_data=False):
103
Returns (token id, token data) for supplied credentials
105
body = self.auth(user, password, tenant)
108
return body['token']['id'], body
110
return body['token']['id']