~nova-coresec/nova/ppa-lucid

« back to all changes in this revision

Viewing changes to nova/tests/quota_unittest.py

  • Committer: Soren Hansen
  • Date: 2010-09-28 22:06:35 UTC
  • mfrom: (195.1.70 ubuntu-packaging)
  • Revision ID: soren.hansen@rackspace.com-20100928220635-dd3170esyd303n8o
Merge ubuntu packaging branch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
 
 
3
# Copyright 2010 United States Government as represented by the
 
4
# Administrator of the National Aeronautics and Space Administration.
 
5
# All Rights Reserved.
 
6
#
 
7
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 
8
#    not use this file except in compliance with the License. You may obtain
 
9
#    a copy of the License at
 
10
#
 
11
#         http://www.apache.org/licenses/LICENSE-2.0
 
12
#
 
13
#    Unless required by applicable law or agreed to in writing, software
 
14
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
15
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
16
#    License for the specific language governing permissions and limitations
 
17
#    under the License.
 
18
 
 
19
import logging
 
20
 
 
21
from nova import db
 
22
from nova import exception
 
23
from nova import flags
 
24
from nova import quota
 
25
from nova import test
 
26
from nova import utils
 
27
from nova.auth import manager
 
28
from nova.api.ec2 import cloud
 
29
from nova.api.ec2 import context
 
30
 
 
31
 
 
32
FLAGS = flags.FLAGS
 
33
 
 
34
 
 
35
class QuotaTestCase(test.TrialTestCase):
 
36
    def setUp(self):  # pylint: disable-msg=C0103
 
37
        logging.getLogger().setLevel(logging.DEBUG)
 
38
        super(QuotaTestCase, self).setUp()
 
39
        self.flags(connection_type='fake',
 
40
                   quota_instances=2,
 
41
                   quota_cores=4,
 
42
                   quota_volumes=2,
 
43
                   quota_gigabytes=20,
 
44
                   quota_floating_ips=1)
 
45
 
 
46
        self.cloud = cloud.CloudController()
 
47
        self.manager = manager.AuthManager()
 
48
        self.user = self.manager.create_user('admin', 'admin', 'admin', True)
 
49
        self.project = self.manager.create_project('admin', 'admin', 'admin')
 
50
        self.network = utils.import_object(FLAGS.network_manager)
 
51
        self.context = context.APIRequestContext(project=self.project,
 
52
                                                 user=self.user)
 
53
 
 
54
    def tearDown(self):  # pylint: disable-msg=C0103
 
55
        manager.AuthManager().delete_project(self.project)
 
56
        manager.AuthManager().delete_user(self.user)
 
57
        super(QuotaTestCase, self).tearDown()
 
58
 
 
59
    def _create_instance(self, cores=2):
 
60
        """Create a test instance"""
 
61
        inst = {}
 
62
        inst['image_id'] = 'ami-test'
 
63
        inst['reservation_id'] = 'r-fakeres'
 
64
        inst['user_id'] = self.user.id
 
65
        inst['project_id'] = self.project.id
 
66
        inst['instance_type'] = 'm1.large'
 
67
        inst['vcpus'] = cores
 
68
        inst['mac_address'] = utils.generate_mac()
 
69
        return db.instance_create(self.context, inst)['id']
 
70
 
 
71
    def _create_volume(self, size=10):
 
72
        """Create a test volume"""
 
73
        vol = {}
 
74
        vol['user_id'] = self.user.id
 
75
        vol['project_id'] = self.project.id
 
76
        vol['size'] = size
 
77
        return db.volume_create(self.context, vol)['id']
 
78
 
 
79
    def test_quota_overrides(self):
 
80
        """Make sure overriding a projects quotas works"""
 
81
        num_instances = quota.allowed_instances(self.context, 100, 'm1.small')
 
82
        self.assertEqual(num_instances, 2)
 
83
        db.quota_create(self.context, {'project_id': self.project.id,
 
84
                                       'instances': 10})
 
85
        num_instances = quota.allowed_instances(self.context, 100, 'm1.small')
 
86
        self.assertEqual(num_instances, 4)
 
87
        db.quota_update(self.context, self.project.id, {'cores': 100})
 
88
        num_instances = quota.allowed_instances(self.context, 100, 'm1.small')
 
89
        self.assertEqual(num_instances, 10)
 
90
        db.quota_destroy(self.context, self.project.id)
 
91
 
 
92
    def test_too_many_instances(self):
 
93
        instance_ids = []
 
94
        for i in range(FLAGS.quota_instances):
 
95
            instance_id = self._create_instance()
 
96
            instance_ids.append(instance_id)
 
97
        self.assertRaises(cloud.QuotaError, self.cloud.run_instances, 
 
98
                                            self.context,
 
99
                                            min_count=1,
 
100
                                            max_count=1,
 
101
                                            instance_type='m1.small')
 
102
        for instance_id in instance_ids:
 
103
            db.instance_destroy(self.context, instance_id)
 
104
 
 
105
    def test_too_many_cores(self):
 
106
        instance_ids = []
 
107
        instance_id = self._create_instance(cores=4)
 
108
        instance_ids.append(instance_id)
 
109
        self.assertRaises(cloud.QuotaError, self.cloud.run_instances, 
 
110
                                            self.context,
 
111
                                            min_count=1,
 
112
                                            max_count=1,
 
113
                                            instance_type='m1.small')
 
114
        for instance_id in instance_ids:
 
115
            db.instance_destroy(self.context, instance_id)
 
116
 
 
117
    def test_too_many_volumes(self):
 
118
        volume_ids = []
 
119
        for i in range(FLAGS.quota_volumes):
 
120
            volume_id = self._create_volume()
 
121
            volume_ids.append(volume_id)
 
122
        self.assertRaises(cloud.QuotaError, self.cloud.create_volume,
 
123
                                            self.context,
 
124
                                            size=10)
 
125
        for volume_id in volume_ids:
 
126
            db.volume_destroy(self.context, volume_id)
 
127
 
 
128
    def test_too_many_gigabytes(self):
 
129
        volume_ids = []
 
130
        volume_id = self._create_volume(size=20)
 
131
        volume_ids.append(volume_id)
 
132
        self.assertRaises(cloud.QuotaError,
 
133
                          self.cloud.create_volume,
 
134
                          self.context,
 
135
                          size=10)
 
136
        for volume_id in volume_ids:
 
137
            db.volume_destroy(self.context, volume_id)
 
138
 
 
139
    def test_too_many_addresses(self):
 
140
        address = '192.168.0.100'
 
141
        try:
 
142
            db.floating_ip_get_by_address(None, address)
 
143
        except exception.NotFound:
 
144
            db.floating_ip_create(None, {'address': address,
 
145
                                         'host': FLAGS.host})
 
146
        float_addr = self.network.allocate_floating_ip(self.context,
 
147
                                                       self.project.id)
 
148
        # NOTE(vish): This assert never fails. When cloud attempts to
 
149
        #             make an rpc.call, the test just finishes with OK. It
 
150
        #             appears to be something in the magic inline callbacks
 
151
        #             that is breaking.
 
152
        self.assertRaises(cloud.QuotaError, self.cloud.allocate_address, self.context)