1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
# Copyright [2010] [Anso Labs, LLC]
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
8
# http://www.apache.org/licenses/LICENSE-2.0
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
19
from nova import vendor
20
from M2Crypto import BIO
21
from M2Crypto import RSA
22
from M2Crypto import X509
24
from nova import crypto
25
from nova import flags
27
from nova import utils
28
from nova.auth import users
29
from nova.endpoint import cloud
35
class UserTestCase(test.BaseTestCase):
37
super(UserTestCase, self).setUp()
38
self.flags(fake_libvirt=True,
41
self.users = users.UserManager.instance()
43
def test_001_can_create_user(self):
44
self.users.create_user('test1', 'access', 'secret')
46
def test_002_can_get_user(self):
47
user = self.users.get_user('test1')
49
def test_003_can_retreive_properties(self):
50
user = self.users.get_user('test1')
51
self.assertEqual('test1', user.id)
52
self.assertEqual('access', user.access)
53
self.assertEqual('secret', user.secret)
55
def test_004_signature_is_valid(self):
56
#self.assertTrue(self.users.authenticate( **boto.generate_url ... ? ? ? ))
58
#raise NotImplementedError
60
def test_005_can_get_credentials(self):
62
credentials = self.users.get_user('test1').get_credentials()
63
self.assertEqual(credentials,
64
'export EC2_ACCESS_KEY="access"\n' +
65
'export EC2_SECRET_KEY="secret"\n' +
66
'export EC2_URL="http://127.0.0.1:8773/services/Cloud"\n' +
67
'export S3_URL="http://127.0.0.1:3333/"\n' +
68
'export EC2_USER_ID="test1"\n')
70
def test_006_test_key_storage(self):
71
user = self.users.get_user('test1')
72
user.create_key_pair('public', 'key', 'fingerprint')
73
key = user.get_key_pair('public')
74
self.assertEqual('key', key.public_key)
75
self.assertEqual('fingerprint', key.fingerprint)
77
def test_007_test_key_generation(self):
78
user = self.users.get_user('test1')
79
private_key, fingerprint = user.generate_key_pair('public2')
80
key = RSA.load_key_string(private_key, callback=lambda: None)
81
bio = BIO.MemoryBuffer()
82
public_key = user.get_key_pair('public2').public_key
83
key.save_pub_key_bio(bio)
84
converted = crypto.ssl_pub_to_ssh_pub(bio.read())
85
# assert key fields are equal
87
self.assertEqual(public_key.split(" ")[1].strip(),
88
converted.split(" ")[1].strip())
90
def test_008_can_list_key_pairs(self):
91
keys = self.users.get_user('test1').get_key_pairs()
92
self.assertTrue(filter(lambda k: k.name == 'public', keys))
93
self.assertTrue(filter(lambda k: k.name == 'public2', keys))
95
def test_009_can_delete_key_pair(self):
96
self.users.get_user('test1').delete_key_pair('public')
97
keys = self.users.get_user('test1').get_key_pairs()
98
self.assertFalse(filter(lambda k: k.name == 'public', keys))
100
def test_010_can_list_users(self):
101
users = self.users.get_users()
102
self.assertTrue(filter(lambda u: u.id == 'test1', users))
104
def test_011_can_generate_x509(self):
105
# MUST HAVE RUN CLOUD SETUP BY NOW
106
self.cloud = cloud.CloudController()
108
private_key, signed_cert_string = self.users.get_user('test1').generate_x509_cert()
109
logging.debug(signed_cert_string)
111
# Need to verify that it's signed by the right intermediate CA
112
full_chain = crypto.fetch_ca(username='test1', chain=True)
113
int_cert = crypto.fetch_ca(username='test1', chain=False)
114
cloud_cert = crypto.fetch_ca()
115
logging.debug("CA chain:\n\n =====\n%s\n\n=====" % full_chain)
116
signed_cert = X509.load_cert_string(signed_cert_string)
117
chain_cert = X509.load_cert_string(full_chain)
118
int_cert = X509.load_cert_string(int_cert)
119
cloud_cert = X509.load_cert_string(cloud_cert)
120
self.assertTrue(signed_cert.verify(chain_cert.get_pubkey()))
121
self.assertTrue(signed_cert.verify(int_cert.get_pubkey()))
123
if not FLAGS.use_intermediate_ca:
124
self.assertTrue(signed_cert.verify(cloud_cert.get_pubkey()))
126
self.assertFalse(signed_cert.verify(cloud_cert.get_pubkey()))
128
def test_012_can_delete_user(self):
129
self.users.delete_user('test1')
130
users = self.users.get_users()
132
self.assertFalse(filter(lambda u: u.id == 'test1', users))
135
if __name__ == "__main__":
136
# TODO: Implement use_fake as an option