1
# Copyright 2015 NEC Corporation. All rights reserved.
3
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4
# not use this file except in compliance with the License. You may obtain
5
# a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12
# License for the specific language governing permissions and limitations
16
from tempest_lib.common import rest_client
17
from tempest_lib import exceptions as lib_exc
19
from neutron.tests.tempest.common import service_client
20
from neutron.tests.tempest import exceptions
23
class V3TokenClientJSON(rest_client.RestClient):
25
def __init__(self, auth_url, disable_ssl_certificate_validation=None,
26
ca_certs=None, trace_requests=None):
27
dscv = disable_ssl_certificate_validation
28
super(V3TokenClientJSON, self).__init__(
29
None, None, None, disable_ssl_certificate_validation=dscv,
30
ca_certs=ca_certs, trace_requests=trace_requests)
32
raise exceptions.InvalidConfiguration('you must specify a v3 uri '
33
'if using the v3 identity '
35
if 'auth/tokens' not in auth_url:
36
auth_url = auth_url.rstrip('/') + '/auth/tokens'
38
self.auth_url = auth_url
40
def auth(self, user_id=None, username=None, password=None, project_id=None,
41
project_name=None, user_domain_id=None, user_domain_name=None,
42
project_domain_id=None, project_domain_name=None, domain_id=None,
43
domain_name=None, token=None):
45
:param user_id: user id
46
:param username: user name
47
:param user_domain_id: the user domain id
48
:param user_domain_name: the user domain name
49
:param project_domain_id: the project domain id
50
:param project_domain_name: the project domain name
51
:param domain_id: a domain id to scope to
52
:param domain_name: a domain name to scope to
53
:param project_id: a project id to scope to
54
:param project_name: a project name to scope to
55
:param token: a token to re-scope.
57
Accepts different combinations of credentials.
58
Sample sample valid combinations:
60
- token, project_name, project_domain_id
62
- username, password, user_domain_id
63
- username, password, project_name, user_domain_id, project_domain_id
64
Validation is left to the server side.
73
id_obj = creds['auth']['identity']
75
id_obj['methods'].append('token')
80
if (user_id or username) and password:
81
id_obj['methods'].append('password')
82
id_obj['password'] = {
88
id_obj['password']['user']['id'] = user_id
90
id_obj['password']['user']['name'] = username
93
if user_domain_id is not None:
94
_domain = dict(id=user_domain_id)
95
elif user_domain_name is not None:
96
_domain = dict(name=user_domain_name)
98
id_obj['password']['user']['domain'] = _domain
100
if (project_id or project_name):
104
_project['id'] = project_id
106
_project['name'] = project_name
108
if project_domain_id is not None:
109
_project['domain'] = {'id': project_domain_id}
110
elif project_domain_name is not None:
111
_project['domain'] = {'name': project_domain_name}
113
creds['auth']['scope'] = dict(project=_project)
115
creds['auth']['scope'] = dict(domain={'id': domain_id})
117
creds['auth']['scope'] = dict(domain={'name': domain_name})
119
body = json.dumps(creds)
120
resp, body = self.post(self.auth_url, body=body)
121
self.expected_success(201, resp.status)
122
return service_client.ResponseBody(resp, body)
124
def request(self, method, url, extra_headers=False, headers=None,
126
"""A simple HTTP request interface."""
128
# Always accept 'json', for xml token client too.
129
# Because XML response is not easily
130
# converted to the corresponding JSON one
131
headers = self.get_headers(accept_type="json")
134
headers.update(self.get_headers(accept_type="json"))
135
except (ValueError, TypeError):
136
headers = self.get_headers(accept_type="json")
138
resp, resp_body = self.raw_request(url, method,
139
headers=headers, body=body)
140
self._log_request(method, url, resp)
142
if resp.status in [401, 403]:
143
resp_body = json.loads(resp_body)
144
raise lib_exc.Unauthorized(resp_body['error']['message'])
145
elif resp.status not in [200, 201, 204]:
146
raise exceptions.IdentityError(
147
'Unexpected status code {0}'.format(resp.status))
149
return resp, json.loads(resp_body)
151
def get_token(self, **kwargs):
153
Returns (token id, token data) for supplied credentials
156
auth_data = kwargs.pop('auth_data', False)
158
if not (kwargs.get('user_domain_id') or
159
kwargs.get('user_domain_name')):
160
kwargs['user_domain_name'] = 'Default'
162
if not (kwargs.get('project_domain_id') or
163
kwargs.get('project_domain_name')):
164
kwargs['project_domain_name'] = 'Default'
166
body = self.auth(**kwargs)
168
token = body.response.get('x-subject-token')
170
return token, body['token']