1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
# Copyright (c) 2010-2011 OpenStack, LLC.
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
8
# http://www.apache.org/licenses/LICENSE-2.0
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
21
app_path = os.path.abspath(os.path.join(os.path.abspath(__file__),
22
'..', '..', '..', '..', '..', '..', 'keystone'))
23
sys.path.append(app_path)
24
import unittest2 as unittest
25
import test_common as utils
27
import keystone.logic.types.fault as fault
28
from lxml import etree
31
class ValidateToken(unittest.TestCase):
34
self.tenant = utils.get_tenant()
36
self.token = utils.get_token('joeuser', 'secrete', self.tenant,
38
self.auth_token = utils.get_auth_token()
39
self.exp_auth_token = utils.get_exp_auth_token()
40
_resp, content = utils.create_role_ref(self.user, 'Admin', self.tenant,
42
obj = json.loads(content)
43
if not "roleRef" in obj:
44
raise fault.BadRequestFault("Expecting RoleRef")
45
roleRef = obj["roleRef"]
46
if not "id" in roleRef:
47
self.role_ref_id = None
49
self.role_ref_id = roleRef["id"]
52
_resp, _content = utils.delete_role_ref(self.user, self.role_ref_id,
54
utils.delete_token(self.token, self.auth_token)
56
def test_validate_token_true(self):
57
header = httplib2.Http(".cache")
59
url = '%stokens/%s?belongsTo=%s' % (utils.URL_V2, self.token,
61
resp, content = header.request(url, "GET", body='', headers={
62
"Content-Type": "application/json",
63
"X-Auth-Token": self.auth_token})
64
if int(resp['status']) == 500:
65
self.fail('Identity Fault')
66
elif int(resp['status']) == 503:
67
self.fail('Service Not Available')
68
self.assertEqual(200, int(resp['status']))
69
self.assertEqual('application/json', utils.content_type(resp))
71
obj = json.loads(content)
73
raise self.fail("Expecting Auth")
74
role_refs = obj["auth"]["user"]["roleRefs"]
75
role_ref = role_refs[0]
76
role_ref_id = role_ref["id"]
77
self.assertEqual(self.role_ref_id, role_ref_id)
79
def test_validate_token_true_using_service_token(self):
80
header = httplib2.Http(".cache")
82
url = '%stokens/%s?belongsTo=%s' % (utils.URL_V2, self.token,
84
resp, content = header.request(url, "GET", body='', headers={
85
"Content-Type": "application/json",
86
"X-Auth-Token": utils.get_service_token()})
87
if int(resp['status']) == 500:
88
self.fail('Identity Fault')
89
elif int(resp['status']) == 503:
90
self.fail('Service Not Available')
91
self.assertEqual(200, int(resp['status']))
92
self.assertEqual('application/json', utils.content_type(resp))
94
obj = json.loads(content)
96
raise self.fail("Expecting Auth")
97
role_refs = obj["auth"]["user"]["roleRefs"]
98
role_ref = role_refs[0]
99
role_ref_id = role_ref["id"]
100
self.assertEqual(self.role_ref_id, role_ref_id)
102
def test_validate_token_true_xml(self):
103
header = httplib2.Http(".cache")
104
url = '%stokens/%s?belongsTo=%s' % (
105
utils.URL_V2, self.token, self.tenant)
106
resp, content = header.request(url, "GET", body='', headers={
107
"Content-Type": "application/xml",
108
"X-Auth-Token": self.auth_token,
109
"ACCEPT": "application/xml"})
110
if int(resp['status']) == 500:
111
self.fail('Identity Fault')
112
elif int(resp['status']) == 503:
113
self.fail('Service Not Available')
114
self.assertEqual(200, int(resp['status']))
115
self.assertEqual('application/xml', utils.content_type(resp))
117
dom = etree.Element("root")
118
dom.append(etree.fromstring(content))
119
auth = dom.find("{http://docs.openstack.org/identity/api/v2.0}" \
122
self.fail("Expecting Auth")
124
user = auth.find("{http://docs.openstack.org/identity/api/v2.0}" \
127
self.fail("Expecting User")
128
roleRefs = user.find("{http://docs.openstack.org/identity/api/v2.0}" \
131
self.fail("Expecting Role Refs")
132
roleRef = roleRefs.find(
133
"{http://docs.openstack.org/identity/api/v2.0}roleRef")
135
self.fail("Expecting Role Refs")
136
self.assertEqual(str(self.role_ref_id), roleRef.get("id"))
138
def test_validate_token_expired(self):
139
header = httplib2.Http(".cache")
140
url = '%stokens/%s?belongsTo=%s' % (utils.URL_V2, self.exp_auth_token,
142
resp, _content = header.request(url, "GET", body='',
143
headers={"Content-Type": "application/json",
144
"X-Auth-Token": self.exp_auth_token})
145
if int(resp['status']) == 500:
146
self.fail('Identity Fault')
147
elif int(resp['status']) == 503:
148
self.fail('Service Not Available')
149
self.assertEqual(403, int(resp['status']))
150
self.assertEqual('application/json', utils.content_type(resp))
152
def test_validate_token_expired_xml(self):
153
header = httplib2.Http(".cache")
155
url = '%stokens/%s?belongsTo=%s' % (utils.URL_V2, self.exp_auth_token,
157
resp, _content = header.request(url, "GET", body='',
158
headers={"Content-Type": "application/xml",
159
"X-Auth-Token": self.exp_auth_token,
160
"ACCEPT": "application/xml"})
161
if int(resp['status']) == 500:
162
self.fail('Identity Fault')
163
elif int(resp['status']) == 503:
164
self.fail('Service Not Available')
165
self.assertEqual(403, int(resp['status']))
166
self.assertEqual('application/xml', utils.content_type(resp))
168
def test_validate_token_invalid(self):
169
header = httplib2.Http(".cache")
170
url = '%stokens/%s?belongsTo=%s' % (utils.URL_V2, 'NonExistingToken',
172
resp, _content = header.request(url, "GET", body='',
173
headers={"Content-Type": "application/json",
174
"X-Auth-Token": self.auth_token})
176
if int(resp['status']) == 500:
177
self.fail('Identity Fault')
178
elif int(resp['status']) == 503:
179
self.fail('Service Not Available')
180
self.assertEqual(401, int(resp['status']))
181
self.assertEqual('application/json', utils.content_type(resp))
183
def test_validate_token_invalid_xml(self):
184
header = httplib2.Http(".cache")
185
url = '%stokens/%s?belongsTo=%s' % (utils.URL_V2, 'NonExistingToken',
187
resp, _content = header.request(url, "GET", body='',
188
headers={"Content-Type": "application/json",
189
"X-Auth-Token": self.auth_token})
190
if int(resp['status']) == 500:
191
self.fail('Identity Fault')
192
elif int(resp['status']) == 503:
193
self.fail('Service Not Available')
194
self.assertEqual(401, int(resp['status']))
195
self.assertEqual('application/json', utils.content_type(resp))
197
if __name__ == '__main__':