~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to nova/tests/users_unittest.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
# Copyright [2010] [Anso Labs, LLC]
 
3
#
 
4
#    Licensed under the Apache License, Version 2.0 (the "License");
 
5
#    you may not use this file except in compliance with the License.
 
6
#    You may obtain a copy of the License at
 
7
#
 
8
#        http://www.apache.org/licenses/LICENSE-2.0
 
9
#
 
10
#    Unless required by applicable law or agreed to in writing, software
 
11
#    distributed under the License is distributed on an "AS IS" BASIS,
 
12
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
13
#    See the License for the specific language governing permissions and
 
14
#    limitations under the License.
 
15
 
 
16
import logging
 
17
import unittest
 
18
 
 
19
from nova import vendor
 
20
from M2Crypto import BIO
 
21
from M2Crypto import RSA
 
22
from M2Crypto import X509
 
23
 
 
24
from nova import crypto
 
25
from nova import flags
 
26
from nova import test
 
27
from nova import utils
 
28
from nova.auth import users
 
29
from nova.endpoint import cloud
 
30
 
 
31
 
 
32
FLAGS = flags.FLAGS
 
33
 
 
34
 
 
35
class UserTestCase(test.BaseTestCase):
 
36
    def setUp(self):
 
37
        super(UserTestCase, self).setUp()
 
38
        self.flags(fake_libvirt=True,
 
39
                   fake_storage=True,
 
40
                   redis_db=8)
 
41
        self.users = users.UserManager.instance()
 
42
 
 
43
    def test_001_can_create_user(self):
 
44
        self.users.create_user('test1', 'access', 'secret')
 
45
 
 
46
    def test_002_can_get_user(self):
 
47
        user = self.users.get_user('test1')
 
48
 
 
49
    def test_003_can_retreive_properties(self):
 
50
        user = self.users.get_user('test1')
 
51
        self.assertEqual('test1', user.id)
 
52
        self.assertEqual('access', user.access)
 
53
        self.assertEqual('secret', user.secret)
 
54
 
 
55
    def test_004_signature_is_valid(self):
 
56
        #self.assertTrue(self.users.authenticate( **boto.generate_url ... ? ? ? ))
 
57
        pass
 
58
        #raise NotImplementedError
 
59
 
 
60
    def test_005_can_get_credentials(self):
 
61
        return
 
62
        credentials = self.users.get_user('test1').get_credentials()
 
63
        self.assertEqual(credentials,
 
64
        'export EC2_ACCESS_KEY="access"\n' +
 
65
        'export EC2_SECRET_KEY="secret"\n' +
 
66
        'export EC2_URL="http://127.0.0.1:8773/services/Cloud"\n' +
 
67
        'export S3_URL="http://127.0.0.1:3333/"\n' +
 
68
        'export EC2_USER_ID="test1"\n')
 
69
 
 
70
    def test_006_test_key_storage(self):
 
71
        user = self.users.get_user('test1')
 
72
        user.create_key_pair('public', 'key', 'fingerprint')
 
73
        key = user.get_key_pair('public')
 
74
        self.assertEqual('key', key.public_key)
 
75
        self.assertEqual('fingerprint', key.fingerprint)
 
76
 
 
77
    def test_007_test_key_generation(self):
 
78
        user = self.users.get_user('test1')
 
79
        private_key, fingerprint = user.generate_key_pair('public2')
 
80
        key = RSA.load_key_string(private_key, callback=lambda: None)
 
81
        bio = BIO.MemoryBuffer()
 
82
        public_key = user.get_key_pair('public2').public_key
 
83
        key.save_pub_key_bio(bio)
 
84
        converted = crypto.ssl_pub_to_ssh_pub(bio.read())
 
85
        # assert key fields are equal
 
86
        print converted
 
87
        self.assertEqual(public_key.split(" ")[1].strip(),
 
88
                         converted.split(" ")[1].strip())
 
89
 
 
90
    def test_008_can_list_key_pairs(self):
 
91
        keys = self.users.get_user('test1').get_key_pairs()
 
92
        self.assertTrue(filter(lambda k: k.name == 'public', keys))
 
93
        self.assertTrue(filter(lambda k: k.name == 'public2', keys))
 
94
 
 
95
    def test_009_can_delete_key_pair(self):
 
96
        self.users.get_user('test1').delete_key_pair('public')
 
97
        keys = self.users.get_user('test1').get_key_pairs()
 
98
        self.assertFalse(filter(lambda k: k.name == 'public', keys))
 
99
 
 
100
    def test_010_can_list_users(self):
 
101
        users = self.users.get_users()
 
102
        self.assertTrue(filter(lambda u: u.id == 'test1', users))
 
103
 
 
104
    def test_011_can_generate_x509(self):
 
105
        # MUST HAVE RUN CLOUD SETUP BY NOW
 
106
        self.cloud = cloud.CloudController()
 
107
        self.cloud.setup()
 
108
        private_key, signed_cert_string = self.users.get_user('test1').generate_x509_cert()
 
109
        logging.debug(signed_cert_string)
 
110
 
 
111
        # Need to verify that it's signed by the right intermediate CA
 
112
        full_chain = crypto.fetch_ca(username='test1', chain=True)
 
113
        int_cert = crypto.fetch_ca(username='test1', chain=False)
 
114
        cloud_cert = crypto.fetch_ca()
 
115
        logging.debug("CA chain:\n\n =====\n%s\n\n=====" % full_chain)
 
116
        signed_cert = X509.load_cert_string(signed_cert_string)
 
117
        chain_cert = X509.load_cert_string(full_chain)
 
118
        int_cert = X509.load_cert_string(int_cert)
 
119
        cloud_cert = X509.load_cert_string(cloud_cert)
 
120
        self.assertTrue(signed_cert.verify(chain_cert.get_pubkey()))
 
121
        self.assertTrue(signed_cert.verify(int_cert.get_pubkey()))
 
122
 
 
123
        if not FLAGS.use_intermediate_ca:
 
124
            self.assertTrue(signed_cert.verify(cloud_cert.get_pubkey()))
 
125
        else:
 
126
            self.assertFalse(signed_cert.verify(cloud_cert.get_pubkey()))
 
127
 
 
128
    def test_012_can_delete_user(self):
 
129
        self.users.delete_user('test1')
 
130
        users = self.users.get_users()
 
131
        if users != None:
 
132
            self.assertFalse(filter(lambda u: u.id == 'test1', users))
 
133
 
 
134
 
 
135
if __name__ == "__main__":
 
136
    # TODO: Implement use_fake as an option
 
137
    unittest.main()