~ubuntu-branches/ubuntu/precise/keystone/precise-security

« back to all changes in this revision

Viewing changes to keystone/test/unit/test_token.py

  • Committer: Bazaar Package Importer
  • Author(s): Chuck Short
  • Date: 2011-08-23 10:18:22 UTC
  • Revision ID: james.westby@ubuntu.com-20110823101822-enve6zceb3lqhuvj
Tags: upstream-1.0~d4~20110823.1078
ImportĀ upstreamĀ versionĀ 1.0~d4~20110823.1078

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
# Copyright (c) 2010-2011 OpenStack, LLC.
 
3
#
 
4
# Licensed under the Apache License, Version 2.0 (the "License");
 
5
# you may not use this file except in compliance with the License.
 
6
# You may obtain a copy of the License at
 
7
#
 
8
#    http://www.apache.org/licenses/LICENSE-2.0
 
9
#
 
10
# Unless required by applicable law or agreed to in writing, software
 
11
# distributed under the License is distributed on an "AS IS" BASIS,
 
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
13
# implied.
 
14
# See the License for the specific language governing permissions and
 
15
# limitations under the License.
 
16
 
 
17
 
 
18
import httplib2
 
19
import os
 
20
import sys
 
21
app_path = os.path.abspath(os.path.join(os.path.abspath(__file__),
 
22
    '..', '..', '..', '..', '..', '..', 'keystone'))
 
23
sys.path.append(app_path)
 
24
import unittest2 as unittest
 
25
import test_common as utils
 
26
import json
 
27
import keystone.logic.types.fault as fault
 
28
from lxml import etree
 
29
 
 
30
 
 
31
class ValidateToken(unittest.TestCase):
 
32
 
 
33
    def setUp(self):
 
34
        self.tenant = utils.get_tenant()
 
35
        self.user = 'joeuser'
 
36
        self.token = utils.get_token('joeuser', 'secrete', self.tenant,
 
37
                                    'token')
 
38
        self.auth_token = utils.get_auth_token()
 
39
        self.exp_auth_token = utils.get_exp_auth_token()
 
40
        _resp, content = utils.create_role_ref(self.user, 'Admin', self.tenant,
 
41
            str(self.auth_token))
 
42
        obj = json.loads(content)
 
43
        if not "roleRef" in obj:
 
44
            raise fault.BadRequestFault("Expecting RoleRef")
 
45
        roleRef = obj["roleRef"]
 
46
        if not "id" in roleRef:
 
47
            self.role_ref_id = None
 
48
        else:
 
49
            self.role_ref_id = roleRef["id"]
 
50
 
 
51
    def tearDown(self):
 
52
        _resp, _content = utils.delete_role_ref(self.user, self.role_ref_id,
 
53
            self.auth_token)
 
54
        utils.delete_token(self.token, self.auth_token)
 
55
 
 
56
    def test_validate_token_true(self):
 
57
        header = httplib2.Http(".cache")
 
58
 
 
59
        url = '%stokens/%s?belongsTo=%s' % (utils.URL_V2, self.token,
 
60
            self.tenant)
 
61
        resp, content = header.request(url, "GET", body='', headers={
 
62
            "Content-Type": "application/json",
 
63
            "X-Auth-Token": self.auth_token})
 
64
        if int(resp['status']) == 500:
 
65
            self.fail('Identity Fault')
 
66
        elif int(resp['status']) == 503:
 
67
            self.fail('Service Not Available')
 
68
        self.assertEqual(200, int(resp['status']))
 
69
        self.assertEqual('application/json', utils.content_type(resp))
 
70
        #verify content
 
71
        obj = json.loads(content)
 
72
        if not "auth" in obj:
 
73
            raise self.fail("Expecting Auth")
 
74
        role_refs = obj["auth"]["user"]["roleRefs"]
 
75
        role_ref = role_refs[0]
 
76
        role_ref_id = role_ref["id"]
 
77
        self.assertEqual(self.role_ref_id, role_ref_id)
 
78
 
 
79
    def test_validate_token_true_using_service_token(self):
 
80
        header = httplib2.Http(".cache")
 
81
 
 
82
        url = '%stokens/%s?belongsTo=%s' % (utils.URL_V2, self.token,
 
83
            self.tenant)
 
84
        resp, content = header.request(url, "GET", body='', headers={
 
85
            "Content-Type": "application/json",
 
86
            "X-Auth-Token": utils.get_service_token()})
 
87
        if int(resp['status']) == 500:
 
88
            self.fail('Identity Fault')
 
89
        elif int(resp['status']) == 503:
 
90
            self.fail('Service Not Available')
 
91
        self.assertEqual(200, int(resp['status']))
 
92
        self.assertEqual('application/json', utils.content_type(resp))
 
93
        #verify content
 
94
        obj = json.loads(content)
 
95
        if not "auth" in obj:
 
96
            raise self.fail("Expecting Auth")
 
97
        role_refs = obj["auth"]["user"]["roleRefs"]
 
98
        role_ref = role_refs[0]
 
99
        role_ref_id = role_ref["id"]
 
100
        self.assertEqual(self.role_ref_id, role_ref_id)
 
101
 
 
102
    def test_validate_token_true_xml(self):
 
103
        header = httplib2.Http(".cache")
 
104
        url = '%stokens/%s?belongsTo=%s' % (
 
105
            utils.URL_V2, self.token, self.tenant)
 
106
        resp, content = header.request(url, "GET", body='', headers={
 
107
            "Content-Type": "application/xml",
 
108
            "X-Auth-Token": self.auth_token,
 
109
            "ACCEPT": "application/xml"})
 
110
        if int(resp['status']) == 500:
 
111
            self.fail('Identity Fault')
 
112
        elif int(resp['status']) == 503:
 
113
            self.fail('Service Not Available')
 
114
        self.assertEqual(200, int(resp['status']))
 
115
        self.assertEqual('application/xml', utils.content_type(resp))
 
116
        #verify content
 
117
        dom = etree.Element("root")
 
118
        dom.append(etree.fromstring(content))
 
119
        auth = dom.find("{http://docs.openstack.org/identity/api/v2.0}" \
 
120
            "auth")
 
121
        if auth == None:
 
122
            self.fail("Expecting Auth")
 
123
 
 
124
        user = auth.find("{http://docs.openstack.org/identity/api/v2.0}" \
 
125
            "user")
 
126
        if user == None:
 
127
            self.fail("Expecting User")
 
128
        roleRefs = user.find("{http://docs.openstack.org/identity/api/v2.0}" \
 
129
               "roleRefs")
 
130
        if roleRefs == None:
 
131
            self.fail("Expecting Role Refs")
 
132
        roleRef = roleRefs.find(
 
133
            "{http://docs.openstack.org/identity/api/v2.0}roleRef")
 
134
        if roleRef == None:
 
135
            self.fail("Expecting Role Refs")
 
136
        self.assertEqual(str(self.role_ref_id), roleRef.get("id"))
 
137
 
 
138
    def test_validate_token_expired(self):
 
139
        header = httplib2.Http(".cache")
 
140
        url = '%stokens/%s?belongsTo=%s' % (utils.URL_V2, self.exp_auth_token,
 
141
                                           self.tenant)
 
142
        resp, _content = header.request(url, "GET", body='',
 
143
                                  headers={"Content-Type": "application/json",
 
144
                                         "X-Auth-Token": self.exp_auth_token})
 
145
        if int(resp['status']) == 500:
 
146
            self.fail('Identity Fault')
 
147
        elif int(resp['status']) == 503:
 
148
            self.fail('Service Not Available')
 
149
        self.assertEqual(403, int(resp['status']))
 
150
        self.assertEqual('application/json', utils.content_type(resp))
 
151
 
 
152
    def test_validate_token_expired_xml(self):
 
153
        header = httplib2.Http(".cache")
 
154
 
 
155
        url = '%stokens/%s?belongsTo=%s' % (utils.URL_V2, self.exp_auth_token,
 
156
                                           self.tenant)
 
157
        resp, _content = header.request(url, "GET", body='',
 
158
                                  headers={"Content-Type": "application/xml",
 
159
                                           "X-Auth-Token": self.exp_auth_token,
 
160
                                           "ACCEPT": "application/xml"})
 
161
        if int(resp['status']) == 500:
 
162
            self.fail('Identity Fault')
 
163
        elif int(resp['status']) == 503:
 
164
            self.fail('Service Not Available')
 
165
        self.assertEqual(403, int(resp['status']))
 
166
        self.assertEqual('application/xml', utils.content_type(resp))
 
167
 
 
168
    def test_validate_token_invalid(self):
 
169
        header = httplib2.Http(".cache")
 
170
        url = '%stokens/%s?belongsTo=%s' % (utils.URL_V2, 'NonExistingToken',
 
171
                                           self.tenant)
 
172
        resp, _content = header.request(url, "GET", body='',
 
173
                                  headers={"Content-Type": "application/json",
 
174
                                           "X-Auth-Token": self.auth_token})
 
175
 
 
176
        if int(resp['status']) == 500:
 
177
            self.fail('Identity Fault')
 
178
        elif int(resp['status']) == 503:
 
179
            self.fail('Service Not Available')
 
180
        self.assertEqual(401, int(resp['status']))
 
181
        self.assertEqual('application/json', utils.content_type(resp))
 
182
 
 
183
    def test_validate_token_invalid_xml(self):
 
184
        header = httplib2.Http(".cache")
 
185
        url = '%stokens/%s?belongsTo=%s' % (utils.URL_V2, 'NonExistingToken',
 
186
                                           self.tenant)
 
187
        resp, _content = header.request(url, "GET", body='',
 
188
                                  headers={"Content-Type": "application/json",
 
189
                                           "X-Auth-Token": self.auth_token})
 
190
        if int(resp['status']) == 500:
 
191
            self.fail('Identity Fault')
 
192
        elif int(resp['status']) == 503:
 
193
            self.fail('Service Not Available')
 
194
        self.assertEqual(401, int(resp['status']))
 
195
        self.assertEqual('application/json', utils.content_type(resp))
 
196
 
 
197
if __name__ == '__main__':
 
198
    unittest.main()