~ubuntu-branches/ubuntu/saucy/keystone/saucy-proposed

« back to all changes in this revision

Viewing changes to keystone/test/unit/test_ec2_authn.py

  • Committer: Bazaar Package Importer
  • Author(s): Chuck Short
  • Date: 2011-08-23 10:18:22 UTC
  • Revision ID: james.westby@ubuntu.com-20110823101822-enve6zceb3lqhuvj
Tags: upstream-1.0~d4~20110823.1078
ImportĀ upstreamĀ versionĀ 1.0~d4~20110823.1078

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
# Copyright (c) 2011 OpenStack, LLC.
 
3
#
 
4
# Licensed under the Apache License, Version 2.0 (the "License");
 
5
# you may not use this file except in compliance with the License.
 
6
# You may obtain a copy of the License at
 
7
#
 
8
#    http://www.apache.org/licenses/LICENSE-2.0
 
9
#
 
10
# Unless required by applicable law or agreed to in writing, software
 
11
# distributed under the License is distributed on an "AS IS" BASIS,
 
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
13
# implied.
 
14
# See the License for the specific language governing permissions and
 
15
# limitations under the License.
 
16
 
 
17
import json
 
18
import logging
 
19
import unittest2 as unittest
 
20
 
 
21
import base
 
22
from keystone.test.unit.decorators import jsonify
 
23
from keystone.logic import signer
 
24
from keystone.logic.types import auth
 
25
 
 
26
LOGGER = logging.getLogger('test.unit.test_ec2_authn')
 
27
 
 
28
 
 
29
class EC2AuthnMethods(base.ServiceAPITest):
 
30
 
 
31
    @jsonify
 
32
    def test_authn_ec2_success_json(self):
 
33
        """
 
34
        Test that good ec2 credentials returns a 200 OK
 
35
        """
 
36
        access = "xpd285.access"
 
37
        secret = "345fgi.secret"
 
38
        kwargs = {
 
39
                  "user_id": self.auth_user['id'],
 
40
                  "tenant_id": self.auth_user['tenant_id'],
 
41
                  "type": "EC2",
 
42
                  "key": access,
 
43
                  "secret": secret,
 
44
                 }
 
45
        self.fixture_create_credentials(**kwargs)
 
46
        url = "/ec2tokens"
 
47
        req = self.get_request('POST', url)
 
48
        params = {
 
49
            "SignatureVersion": "2",
 
50
            "one_param": "5",
 
51
            "two_params": "happy",
 
52
        }
 
53
        credentials = {
 
54
            "access": access,
 
55
            "verb": "GET",
 
56
            "params": params,
 
57
            "host": "some.host.com:8773",
 
58
            "path": "services/Cloud",
 
59
            "signature": None,
 
60
        }
 
61
        sign = signer.Signer(secret)
 
62
        obj_creds = auth.Ec2Credentials(**credentials)
 
63
        credentials['signature'] = sign.generate(obj_creds)
 
64
        body = {
 
65
            "ec2Credentials": credentials,
 
66
        }
 
67
        req.body = json.dumps(body)
 
68
        self.get_response()
 
69
 
 
70
        expected = {
 
71
            u'auth': {
 
72
                u'serviceCatalog': {},
 
73
                u'token': {
 
74
                    u'expires': self.expires.strftime("%Y-%m-%dT%H:%M:%S.%f"),
 
75
                    u'id': self.auth_token_id,
 
76
                }
 
77
            }
 
78
        }
 
79
        self.assert_dict_equal(expected, json.loads(self.res.body))
 
80
        self.status_ok()
 
81
 
 
82
    @jsonify
 
83
    def test_authn_ec2_success_json_bad_user(self):
 
84
        """
 
85
        Test that bad credentials returns a 401
 
86
        """
 
87
        access = "xpd285.access"
 
88
        secret = "345fgi.secret"
 
89
        kwargs = {
 
90
                  "user_id": 'bad',
 
91
                  "tenant_id": self.auth_user['tenant_id'],
 
92
                  "type": "EC2",
 
93
                  "key": access,
 
94
                  "secret": secret,
 
95
                 }
 
96
        self.fixture_create_credentials(**kwargs)
 
97
        url = "/ec2tokens"
 
98
        req = self.get_request('POST', url)
 
99
        params = {
 
100
            "SignatureVersion": "2",
 
101
            "one_param": "5",
 
102
            "two_params": "happy",
 
103
        }
 
104
        credentials = {
 
105
            "access": access,
 
106
            "verb": "GET",
 
107
            "params": params,
 
108
            "host": "some.host.com:8773",
 
109
            "path": "services/Cloud",
 
110
            "signature": None,
 
111
        }
 
112
        sign = signer.Signer(secret)
 
113
        obj_creds = auth.Ec2Credentials(**credentials)
 
114
        credentials['signature'] = sign.generate(obj_creds)
 
115
        body = {
 
116
            "ec2Credentials": credentials,
 
117
        }
 
118
        req.body = json.dumps(body)
 
119
        self.get_response()
 
120
 
 
121
        expected = {
 
122
            u'unauthorized': {
 
123
                u'code': u'401',
 
124
                u'message': u'Unauthorized on this tenant',
 
125
            }
 
126
        }
 
127
        self.assert_dict_equal(expected, json.loads(self.res.body))
 
128
        self.assertEqual(self.res.status_int, 401)
 
129
 
 
130
    @jsonify
 
131
    def test_authn_ec2_success_json_bad_tenant(self):
 
132
        """
 
133
        Test that bad credentials returns a 401
 
134
        """
 
135
        access = "xpd285.access"
 
136
        secret = "345fgi.secret"
 
137
        kwargs = {
 
138
                  "user_id": self.auth_user['id'],
 
139
                  "tenant_id": 'bad',
 
140
                  "type": "EC2",
 
141
                  "key": access,
 
142
                  "secret": secret,
 
143
                 }
 
144
        self.fixture_create_credentials(**kwargs)
 
145
        url = "/ec2tokens"
 
146
        req = self.get_request('POST', url)
 
147
        params = {
 
148
            "SignatureVersion": "2",
 
149
            "one_param": "5",
 
150
            "two_params": "happy",
 
151
        }
 
152
        credentials = {
 
153
            "access": access,
 
154
            "verb": "GET",
 
155
            "params": params,
 
156
            "host": "some.host.com:8773",
 
157
            "path": "services/Cloud",
 
158
            "signature": None,
 
159
        }
 
160
        sign = signer.Signer(secret)
 
161
        obj_creds = auth.Ec2Credentials(**credentials)
 
162
        credentials['signature'] = sign.generate(obj_creds)
 
163
        body = {
 
164
            "ec2Credentials": credentials,
 
165
        }
 
166
        req.body = json.dumps(body)
 
167
        self.get_response()
 
168
 
 
169
        expected = {
 
170
            u'unauthorized': {
 
171
                u'code': u'401',
 
172
                u'message': u'Unauthorized on this tenant',
 
173
            }
 
174
        }
 
175
        self.assert_dict_equal(expected, json.loads(self.res.body))
 
176
        self.assertEqual(self.res.status_int, 401)
 
177
 
 
178
 
 
179
if __name__ == '__main__':
 
180
    unittest.main()