~ubuntu-branches/ubuntu/trusty/python-keystoneclient/trusty-proposed

« back to all changes in this revision

Viewing changes to tests/v2_0/test_users.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2013-01-18 07:44:54 UTC
  • mfrom: (1.1.17)
  • Revision ID: package-import@ubuntu.com-20130118074454-g7w5blpynohn1s48
Tags: 1:0.2.2-0ubuntu1
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import copy
1
2
import urlparse
2
3
import json
3
4
 
4
 
import httplib2
 
5
import requests
5
6
 
6
7
from keystoneclient.v2_0 import users
7
8
from tests import utils
58
59
                "email": "test@example.com",
59
60
            }
60
61
        }
61
 
        resp = httplib2.Response({
62
 
            "status": 200,
63
 
            "body": json.dumps(resp_body),
 
62
        resp = utils.TestResponse({
 
63
            "status_code": 200,
 
64
            "text": json.dumps(resp_body),
64
65
        })
65
66
 
66
 
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL, 'v2.0/users'),
67
 
                              'POST',
68
 
                              body=json.dumps(req_body),
69
 
                              headers=self.TEST_POST_HEADERS) \
70
 
            .AndReturn((resp, resp['body']))
 
67
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 
68
        kwargs['headers'] = self.TEST_POST_HEADERS
 
69
        kwargs['data'] = json.dumps(req_body)
 
70
        requests.request(
 
71
            'POST',
 
72
            urlparse.urljoin(self.TEST_URL, 'v2.0/users'),
 
73
            **kwargs).AndReturn((resp))
71
74
        self.mox.ReplayAll()
72
75
 
73
76
        user = self.client.users.create(req_body['user']['name'],
81
84
        self.assertEqual(user.email, "test@example.com")
82
85
 
83
86
    def test_delete(self):
84
 
        resp = httplib2.Response({
85
 
            "status": 204,
86
 
            "body": "",
 
87
        resp = utils.TestResponse({
 
88
            "status_code": 204,
 
89
            "text": "",
87
90
        })
88
 
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL, 'v2.0/users/1'),
89
 
                              'DELETE',
90
 
                              headers=self.TEST_REQUEST_HEADERS) \
91
 
            .AndReturn((resp, resp['body']))
 
91
 
 
92
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 
93
        kwargs['headers'] = self.TEST_REQUEST_HEADERS
 
94
        requests.request(
 
95
            'DELETE',
 
96
            urlparse.urljoin(self.TEST_URL, 'v2.0/users/1'),
 
97
            **kwargs).AndReturn((resp))
92
98
        self.mox.ReplayAll()
93
99
 
94
100
        self.client.users.delete(1)
95
101
 
96
102
    def test_get(self):
97
 
        resp = httplib2.Response({
98
 
            "status": 200,
99
 
            "body": json.dumps({
 
103
        resp = utils.TestResponse({
 
104
            "status_code": 200,
 
105
            "text": json.dumps({
100
106
                'user': self.TEST_USERS['users']['values'][0],
101
107
            })
102
108
        })
103
 
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
104
 
                              'v2.0/users/1'),
105
 
                              'GET',
106
 
                              headers=self.TEST_REQUEST_HEADERS) \
107
 
            .AndReturn((resp, resp['body']))
 
109
 
 
110
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 
111
        kwargs['headers'] = self.TEST_REQUEST_HEADERS
 
112
        requests.request(
 
113
            'GET',
 
114
            urlparse.urljoin(self.TEST_URL, 'v2.0/users/1'),
 
115
            **kwargs).AndReturn((resp))
108
116
        self.mox.ReplayAll()
109
117
 
110
118
        u = self.client.users.get(1)
113
121
        self.assertEqual(u.name, 'admin')
114
122
 
115
123
    def test_list(self):
116
 
        resp = httplib2.Response({
117
 
            "status": 200,
118
 
            "body": json.dumps(self.TEST_USERS),
 
124
        resp = utils.TestResponse({
 
125
            "status_code": 200,
 
126
            "text": json.dumps(self.TEST_USERS),
119
127
        })
120
128
 
121
 
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
122
 
                              'v2.0/users'),
123
 
                              'GET',
124
 
                              headers=self.TEST_REQUEST_HEADERS) \
125
 
            .AndReturn((resp, resp['body']))
 
129
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 
130
        kwargs['headers'] = self.TEST_REQUEST_HEADERS
 
131
        requests.request(
 
132
            'GET',
 
133
            urlparse.urljoin(self.TEST_URL, 'v2.0/users'),
 
134
            **kwargs).AndReturn((resp))
126
135
        self.mox.ReplayAll()
127
136
 
128
137
        user_list = self.client.users.list()
129
138
        [self.assertTrue(isinstance(u, users.User)) for u in user_list]
130
139
 
131
140
    def test_list_limit(self):
132
 
        resp = httplib2.Response({
133
 
            "status": 200,
134
 
            "body": json.dumps(self.TEST_USERS),
 
141
        resp = utils.TestResponse({
 
142
            "status_code": 200,
 
143
            "text": json.dumps(self.TEST_USERS),
135
144
        })
136
145
 
137
 
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
138
 
                              'v2.0/users?limit=1'),
139
 
                              'GET',
140
 
                              headers=self.TEST_REQUEST_HEADERS) \
141
 
            .AndReturn((resp, resp['body']))
 
146
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 
147
        kwargs['headers'] = self.TEST_REQUEST_HEADERS
 
148
        requests.request(
 
149
            'GET',
 
150
            urlparse.urljoin(self.TEST_URL, 'v2.0/users?limit=1'),
 
151
            **kwargs).AndReturn((resp))
142
152
        self.mox.ReplayAll()
143
153
 
144
154
        user_list = self.client.users.list(limit=1)
145
155
        [self.assertTrue(isinstance(u, users.User)) for u in user_list]
146
156
 
147
157
    def test_list_marker(self):
148
 
        resp = httplib2.Response({
149
 
            "status": 200,
150
 
            "body": json.dumps(self.TEST_USERS),
 
158
        resp = utils.TestResponse({
 
159
            "status_code": 200,
 
160
            "text": json.dumps(self.TEST_USERS),
151
161
        })
152
162
 
153
 
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
154
 
                              'v2.0/users?marker=1'),
155
 
                              'GET',
156
 
                              headers=self.TEST_REQUEST_HEADERS) \
157
 
            .AndReturn((resp, resp['body']))
 
163
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 
164
        kwargs['headers'] = self.TEST_REQUEST_HEADERS
 
165
        requests.request(
 
166
            'GET',
 
167
            urlparse.urljoin(self.TEST_URL, 'v2.0/users?marker=1'),
 
168
            **kwargs).AndReturn((resp))
158
169
        self.mox.ReplayAll()
159
170
 
160
171
        user_list = self.client.users.list(marker=1)
161
172
        [self.assertTrue(isinstance(u, users.User)) for u in user_list]
162
173
 
163
174
    def test_list_limit_marker(self):
164
 
        resp = httplib2.Response({
165
 
            "status": 200,
166
 
            "body": json.dumps(self.TEST_USERS),
 
175
        resp = utils.TestResponse({
 
176
            "status_code": 200,
 
177
            "text": json.dumps(self.TEST_USERS),
167
178
        })
168
179
 
169
 
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
170
 
                              'v2.0/users?marker=1&limit=1'),
171
 
                              'GET',
172
 
                              headers=self.TEST_REQUEST_HEADERS) \
173
 
            .AndReturn((resp, resp['body']))
 
180
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 
181
        kwargs['headers'] = self.TEST_REQUEST_HEADERS
 
182
        requests.request(
 
183
            'GET',
 
184
            urlparse.urljoin(self.TEST_URL, 'v2.0/users?marker=1&limit=1'),
 
185
            **kwargs).AndReturn((resp))
174
186
        self.mox.ReplayAll()
175
187
 
176
188
        user_list = self.client.users.list(limit=1, marker=1)
177
189
        [self.assertTrue(isinstance(u, users.User)) for u in user_list]
178
190
 
179
191
    def test_update(self):
180
 
        req_1 = {"user": {"password": "swordfish", "id": 2}}
181
 
        req_2 = {"user": {"id": 2,
182
 
                          "email": "gabriel@example.com",
183
 
                          "name": "gabriel"}}
184
 
        req_3 = {"user": {"tenantId": 1, "id": 2}}
185
 
        req_4 = {"user": {"enabled": False, "id": 2}}
 
192
        req_1 = {
 
193
            "user": {
 
194
                "id": 2,
 
195
                "email": "gabriel@example.com",
 
196
                "name": "gabriel",
 
197
            }
 
198
        }
 
199
        req_2 = {
 
200
            "user": {
 
201
                "id": 2,
 
202
                "password": "swordfish",
 
203
            }
 
204
        }
 
205
        req_3 = {
 
206
            "user": {
 
207
                "id": 2,
 
208
                "tenantId": 1,
 
209
            }
 
210
        }
 
211
        req_4 = {
 
212
            "user": {
 
213
                "id": 2,
 
214
                "enabled": False,
 
215
            }
 
216
        }
 
217
 
186
218
        # Keystone basically echoes these back... including the password :-/
187
 
        resp_1 = httplib2.Response({"status": 200, "body": json.dumps(req_1)})
188
 
        resp_2 = httplib2.Response({"status": 200, "body": json.dumps(req_2)})
189
 
        resp_3 = httplib2.Response({"status": 200, "body": json.dumps(req_3)})
190
 
        resp_4 = httplib2.Response({"status": 200, "body": json.dumps(req_3)})
 
219
        resp_1 = utils.TestResponse({
 
220
            "status_code": 200,
 
221
            "text": json.dumps(req_1)
 
222
        })
 
223
        resp_2 = utils.TestResponse({
 
224
            "status_code": 200,
 
225
            "text": json.dumps(req_2)
 
226
        })
 
227
        resp_3 = utils.TestResponse({
 
228
            "status_code": 200,
 
229
            "text": json.dumps(req_3)
 
230
        })
 
231
        resp_4 = utils.TestResponse({
 
232
            "status_code": 200,
 
233
            "text": json.dumps(req_4)
 
234
        })
191
235
 
192
 
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL, 'v2.0/users/2'),
193
 
                              'PUT',
194
 
                              body=json.dumps(req_2),
195
 
                              headers=self.TEST_POST_HEADERS) \
196
 
            .AndReturn((resp_2, resp_2['body']))
197
 
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
198
 
                              'v2.0/users/2/OS-KSADM/password'),
199
 
                              'PUT',
200
 
                              body=json.dumps(req_1),
201
 
                              headers=self.TEST_POST_HEADERS) \
202
 
            .AndReturn((resp_1, resp_1['body']))
203
 
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
204
 
                              'v2.0/users/2/OS-KSADM/tenant'),
205
 
                              'PUT',
206
 
                              body=json.dumps(req_3),
207
 
                              headers=self.TEST_POST_HEADERS) \
208
 
            .AndReturn((resp_3, resp_3['body']))
209
 
        httplib2.Http.request(urlparse.urljoin(self.TEST_URL,
210
 
                              'v2.0/users/2/OS-KSADM/enabled'),
211
 
                              'PUT',
212
 
                              body=json.dumps(req_4),
213
 
                              headers=self.TEST_POST_HEADERS) \
214
 
            .AndReturn((resp_4, resp_4['body']))
 
236
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 
237
        kwargs['headers'] = self.TEST_POST_HEADERS
 
238
        kwargs['data'] = json.dumps(req_1)
 
239
        requests.request(
 
240
            'PUT',
 
241
            urlparse.urljoin(self.TEST_URL, 'v2.0/users/2'),
 
242
            **kwargs).AndReturn((resp_1))
 
243
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 
244
        kwargs['headers'] = self.TEST_POST_HEADERS
 
245
        kwargs['data'] = json.dumps(req_2)
 
246
        requests.request(
 
247
            'PUT',
 
248
            urlparse.urljoin(self.TEST_URL, 'v2.0/users/2/OS-KSADM/password'),
 
249
            **kwargs).AndReturn((resp_2))
 
250
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 
251
        kwargs['headers'] = self.TEST_POST_HEADERS
 
252
        kwargs['data'] = json.dumps(req_3)
 
253
        requests.request(
 
254
            'PUT',
 
255
            urlparse.urljoin(self.TEST_URL, 'v2.0/users/2/OS-KSADM/tenant'),
 
256
            **kwargs).AndReturn((resp_3))
 
257
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 
258
        kwargs['headers'] = self.TEST_POST_HEADERS
 
259
        kwargs['data'] = json.dumps(req_4)
 
260
        requests.request(
 
261
            'PUT',
 
262
            urlparse.urljoin(self.TEST_URL, 'v2.0/users/2/OS-KSADM/enabled'),
 
263
            **kwargs).AndReturn((resp_4))
215
264
        self.mox.ReplayAll()
216
265
 
217
266
        user = self.client.users.update(2,
220
269
        user = self.client.users.update_password(2, 'swordfish')
221
270
        user = self.client.users.update_tenant(2, 1)
222
271
        user = self.client.users.update_enabled(2, False)
 
272
 
 
273
    def test_update_own_password(self):
 
274
        req_body = {
 
275
            'user': {
 
276
                'password': 'ABCD', 'original_password': 'DCBA'
 
277
            }
 
278
        }
 
279
        resp_body = {
 
280
            'access': {}
 
281
        }
 
282
        resp = utils.TestResponse({
 
283
            "status_code": 200,
 
284
            "text": json.dumps(resp_body)
 
285
        })
 
286
 
 
287
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 
288
        kwargs['headers'] = self.TEST_POST_HEADERS
 
289
        kwargs['data'] = json.dumps(req_body)
 
290
        requests.request(
 
291
            'PATCH',
 
292
            urlparse.urljoin(self.TEST_URL, 'v2.0/OS-KSCRUD/users/123'),
 
293
            **kwargs).AndReturn((resp))
 
294
 
 
295
        self.mox.ReplayAll()
 
296
 
 
297
        self.client.user_id = '123'
 
298
        self.client.users.update_own_password('DCBA', 'ABCD')