4
6
from keystoneclient.v2_0 import client
5
7
from keystoneclient import exceptions
6
8
from tests import utils
9
def to_http_response(resp_dict):
11
Utility function to convert a python dictionary
12
(e.g. {'status':status, 'body': body, 'headers':headers}
13
to an httplib2 response.
15
resp = httplib2.Response(resp_dict)
16
for k, v in resp_dict['headers'].items():
21
11
class AuthenticateAgainstKeystoneTests(utils.TestCase):
23
13
super(AuthenticateAgainstKeystoneTests, self).setUp()
55
45
_cred = 'passwordCredentials'
57
47
self.TEST_REQUEST_BODY[_auth][_cred][_pass] = 'bad_key'
58
resp = httplib2.Response({
48
resp = utils.TestResponse({
62
52
"message": "Unauthorized",
68
httplib2.Http.request(self.TEST_URL + "/tokens",
70
body=json.dumps(self.TEST_REQUEST_BODY),
71
headers=self.TEST_REQUEST_HEADERS) \
72
.AndReturn((resp, resp['body']))
58
kwargs = copy.copy(self.TEST_REQUEST_BASE)
59
kwargs['headers'] = self.TEST_REQUEST_HEADERS
60
kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY)
61
requests.request('POST',
62
self.TEST_URL + "/tokens",
63
**kwargs).AndReturn((resp))
73
64
self.mox.ReplayAll()
75
66
# Workaround for issue with assertRaises on python2.6
91
82
'location': self.TEST_ADMIN_URL + "/tokens",
99
"body": correct_response,
90
"text": correct_response,
102
responses = [(to_http_response(resp), resp['body'])
93
responses = [(utils.TestResponse(resp))
103
94
for resp in dict_responses]
105
httplib2.Http.request(self.TEST_URL + "/tokens",
107
body=json.dumps(self.TEST_REQUEST_BODY),
108
headers=self.TEST_REQUEST_HEADERS) \
109
.AndReturn(responses[0])
110
httplib2.Http.request(self.TEST_ADMIN_URL + "/tokens",
112
body=json.dumps(self.TEST_REQUEST_BODY),
113
headers=self.TEST_REQUEST_HEADERS) \
114
.AndReturn(responses[1])
96
kwargs = copy.copy(self.TEST_REQUEST_BASE)
97
kwargs['headers'] = self.TEST_REQUEST_HEADERS
98
kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY)
99
requests.request('POST',
100
self.TEST_URL + "/tokens",
101
**kwargs).AndReturn(responses[0])
102
kwargs = copy.copy(self.TEST_REQUEST_BASE)
103
kwargs['headers'] = self.TEST_REQUEST_HEADERS
104
kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY)
105
requests.request('POST',
106
self.TEST_ADMIN_URL + "/tokens",
107
**kwargs).AndReturn(responses[1])
115
108
self.mox.ReplayAll()
117
110
cs = client.Client(username=self.TEST_USER,
126
119
self.TEST_RESPONSE_DICT["access"]["token"]["id"])
128
121
def test_authenticate_success_password_scoped(self):
129
resp = httplib2.Response({
131
"body": json.dumps(self.TEST_RESPONSE_DICT),
122
resp = utils.TestResponse({
124
"text": json.dumps(self.TEST_RESPONSE_DICT),
134
httplib2.Http.request(self.TEST_URL + "/tokens",
136
body=json.dumps(self.TEST_REQUEST_BODY),
137
headers=self.TEST_REQUEST_HEADERS) \
138
.AndReturn((resp, resp['body']))
127
kwargs = copy.copy(self.TEST_REQUEST_BASE)
128
kwargs['headers'] = self.TEST_REQUEST_HEADERS
129
kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY)
130
requests.request('POST',
131
self.TEST_URL + "/tokens",
132
**kwargs).AndReturn((resp))
139
133
self.mox.ReplayAll()
141
135
cs = client.Client(username=self.TEST_USER,
151
145
def test_authenticate_success_password_unscoped(self):
152
146
del self.TEST_RESPONSE_DICT['access']['serviceCatalog']
153
147
del self.TEST_REQUEST_BODY['auth']['tenantId']
154
resp = httplib2.Response({
156
"body": json.dumps(self.TEST_RESPONSE_DICT),
148
resp = utils.TestResponse({
150
"text": json.dumps(self.TEST_RESPONSE_DICT),
159
httplib2.Http.request(self.TEST_URL + "/tokens",
161
body=json.dumps(self.TEST_REQUEST_BODY),
162
headers=self.TEST_REQUEST_HEADERS) \
163
.AndReturn((resp, resp['body']))
153
kwargs = copy.copy(self.TEST_REQUEST_BASE)
154
kwargs['headers'] = self.TEST_REQUEST_HEADERS
155
kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY)
156
requests.request('POST',
157
self.TEST_URL + "/tokens",
158
**kwargs).AndReturn((resp))
164
159
self.mox.ReplayAll()
166
161
cs = client.Client(username=self.TEST_USER,
174
169
del self.TEST_REQUEST_BODY['auth']['passwordCredentials']
175
170
self.TEST_REQUEST_BODY['auth']['token'] = {'id': self.TEST_TOKEN}
176
171
self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN
177
resp = httplib2.Response({
179
"body": json.dumps(self.TEST_RESPONSE_DICT),
172
resp = utils.TestResponse({
174
"text": json.dumps(self.TEST_RESPONSE_DICT),
182
httplib2.Http.request(self.TEST_URL + "/tokens",
184
body=json.dumps(self.TEST_REQUEST_BODY),
185
headers=self.TEST_REQUEST_HEADERS) \
186
.AndReturn((resp, resp['body']))
177
kwargs = copy.copy(self.TEST_REQUEST_BASE)
178
kwargs['headers'] = self.TEST_REQUEST_HEADERS
179
kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY)
180
requests.request('POST',
181
self.TEST_URL + "/tokens",
182
**kwargs).AndReturn((resp))
187
183
self.mox.ReplayAll()
189
185
cs = client.Client(token=self.TEST_TOKEN,
201
197
del self.TEST_RESPONSE_DICT['access']['serviceCatalog']
202
198
self.TEST_REQUEST_BODY['auth']['token'] = {'id': self.TEST_TOKEN}
203
199
self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN
204
resp = httplib2.Response({
206
"body": json.dumps(self.TEST_RESPONSE_DICT),
200
resp = utils.TestResponse({
202
"text": json.dumps(self.TEST_RESPONSE_DICT),
209
httplib2.Http.request(self.TEST_URL + "/tokens",
211
body=json.dumps(self.TEST_REQUEST_BODY),
212
headers=self.TEST_REQUEST_HEADERS) \
213
.AndReturn((resp, resp['body']))
205
kwargs = copy.copy(self.TEST_REQUEST_BASE)
206
kwargs['headers'] = self.TEST_REQUEST_HEADERS
207
kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY)
208
requests.request('POST',
209
self.TEST_URL + "/tokens",
210
**kwargs).AndReturn((resp))
214
211
self.mox.ReplayAll()
216
213
cs = client.Client(token=self.TEST_TOKEN,