~ubuntu-branches/ubuntu/quantal/nova/quantal-proposed

« back to all changes in this revision

Viewing changes to nova/tests/test_volume.py

  • Committer: Bazaar Package Importer
  • Author(s): Chuck Short
  • Date: 2011-01-21 11:48:06 UTC
  • mto: This revision was merged to the branch mainline in revision 9.
  • Revision ID: james.westby@ubuntu.com-20110121114806-v8fvnnl6az4m4ohv
Tags: upstream-2011.1~bzr597
ImportĀ upstreamĀ versionĀ 2011.1~bzr597

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
 
 
3
# Copyright 2010 United States Government as represented by the
 
4
# Administrator of the National Aeronautics and Space Administration.
 
5
# All Rights Reserved.
 
6
#
 
7
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 
8
#    not use this file except in compliance with the License. You may obtain
 
9
#    a copy of the License at
 
10
#
 
11
#         http://www.apache.org/licenses/LICENSE-2.0
 
12
#
 
13
#    Unless required by applicable law or agreed to in writing, software
 
14
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
15
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
16
#    License for the specific language governing permissions and limitations
 
17
#    under the License.
 
18
"""
 
19
Tests for Volume Code.
 
20
 
 
21
"""
 
22
 
 
23
from nova import context
 
24
from nova import exception
 
25
from nova import db
 
26
from nova import flags
 
27
from nova import log as logging
 
28
from nova import test
 
29
from nova import utils
 
30
 
 
31
FLAGS = flags.FLAGS
 
32
LOG = logging.getLogger('nova.tests.volume')
 
33
 
 
34
 
 
35
class VolumeTestCase(test.TestCase):
 
36
    """Test Case for volumes."""
 
37
 
 
38
    def setUp(self):
 
39
        super(VolumeTestCase, self).setUp()
 
40
        self.compute = utils.import_object(FLAGS.compute_manager)
 
41
        self.flags(connection_type='fake')
 
42
        self.volume = utils.import_object(FLAGS.volume_manager)
 
43
        self.context = context.get_admin_context()
 
44
 
 
45
    @staticmethod
 
46
    def _create_volume(size='0'):
 
47
        """Create a volume object."""
 
48
        vol = {}
 
49
        vol['size'] = size
 
50
        vol['user_id'] = 'fake'
 
51
        vol['project_id'] = 'fake'
 
52
        vol['availability_zone'] = FLAGS.storage_availability_zone
 
53
        vol['status'] = "creating"
 
54
        vol['attach_status'] = "detached"
 
55
        return db.volume_create(context.get_admin_context(), vol)['id']
 
56
 
 
57
    def test_create_delete_volume(self):
 
58
        """Test volume can be created and deleted."""
 
59
        volume_id = self._create_volume()
 
60
        self.volume.create_volume(self.context, volume_id)
 
61
        self.assertEqual(volume_id, db.volume_get(context.get_admin_context(),
 
62
                         volume_id).id)
 
63
 
 
64
        self.volume.delete_volume(self.context, volume_id)
 
65
        self.assertRaises(exception.NotFound,
 
66
                          db.volume_get,
 
67
                          self.context,
 
68
                          volume_id)
 
69
 
 
70
    def test_too_big_volume(self):
 
71
        """Ensure failure if a too large of a volume is requested."""
 
72
        # FIXME(vish): validation needs to move into the data layer in
 
73
        #              volume_create
 
74
        return True
 
75
        try:
 
76
            volume_id = self._create_volume('1001')
 
77
            self.volume.create_volume(self.context, volume_id)
 
78
            self.fail("Should have thrown TypeError")
 
79
        except TypeError:
 
80
            pass
 
81
 
 
82
    def test_too_many_volumes(self):
 
83
        """Ensure that NoMoreTargets is raised when we run out of volumes."""
 
84
        vols = []
 
85
        total_slots = FLAGS.iscsi_num_targets
 
86
        for _index in xrange(total_slots):
 
87
            volume_id = self._create_volume()
 
88
            self.volume.create_volume(self.context, volume_id)
 
89
            vols.append(volume_id)
 
90
        volume_id = self._create_volume()
 
91
        self.assertRaises(db.NoMoreTargets,
 
92
                          self.volume.create_volume,
 
93
                          self.context,
 
94
                          volume_id)
 
95
        db.volume_destroy(context.get_admin_context(), volume_id)
 
96
        for volume_id in vols:
 
97
            self.volume.delete_volume(self.context, volume_id)
 
98
 
 
99
    def test_run_attach_detach_volume(self):
 
100
        """Make sure volume can be attached and detached from instance."""
 
101
        inst = {}
 
102
        inst['image_id'] = 'ami-test'
 
103
        inst['reservation_id'] = 'r-fakeres'
 
104
        inst['launch_time'] = '10'
 
105
        inst['user_id'] = 'fake'
 
106
        inst['project_id'] = 'fake'
 
107
        inst['instance_type'] = 'm1.tiny'
 
108
        inst['mac_address'] = utils.generate_mac()
 
109
        inst['ami_launch_index'] = 0
 
110
        instance_id = db.instance_create(self.context, inst)['id']
 
111
        mountpoint = "/dev/sdf"
 
112
        volume_id = self._create_volume()
 
113
        self.volume.create_volume(self.context, volume_id)
 
114
        if FLAGS.fake_tests:
 
115
            db.volume_attached(self.context, volume_id, instance_id,
 
116
                               mountpoint)
 
117
        else:
 
118
            self.compute.attach_volume(self.context,
 
119
                                       instance_id,
 
120
                                       volume_id,
 
121
                                       mountpoint)
 
122
        vol = db.volume_get(context.get_admin_context(), volume_id)
 
123
        self.assertEqual(vol['status'], "in-use")
 
124
        self.assertEqual(vol['attach_status'], "attached")
 
125
        self.assertEqual(vol['mountpoint'], mountpoint)
 
126
        instance_ref = db.volume_get_instance(self.context, volume_id)
 
127
        self.assertEqual(instance_ref['id'], instance_id)
 
128
 
 
129
        self.assertRaises(exception.Error,
 
130
                          self.volume.delete_volume,
 
131
                          self.context,
 
132
                          volume_id)
 
133
        if FLAGS.fake_tests:
 
134
            db.volume_detached(self.context, volume_id)
 
135
        else:
 
136
            self.compute.detach_volume(self.context,
 
137
                                       instance_id,
 
138
                                       volume_id)
 
139
        vol = db.volume_get(self.context, volume_id)
 
140
        self.assertEqual(vol['status'], "available")
 
141
 
 
142
        self.volume.delete_volume(self.context, volume_id)
 
143
        self.assertRaises(exception.Error,
 
144
                          db.volume_get,
 
145
                          self.context,
 
146
                          volume_id)
 
147
        db.instance_destroy(self.context, instance_id)
 
148
 
 
149
    def test_concurrent_volumes_get_different_targets(self):
 
150
        """Ensure multiple concurrent volumes get different targets."""
 
151
        volume_ids = []
 
152
        targets = []
 
153
 
 
154
        def _check(volume_id):
 
155
            """Make sure targets aren't duplicated."""
 
156
            volume_ids.append(volume_id)
 
157
            admin_context = context.get_admin_context()
 
158
            iscsi_target = db.volume_get_iscsi_target_num(admin_context,
 
159
                                                          volume_id)
 
160
            self.assert_(iscsi_target not in targets)
 
161
            targets.append(iscsi_target)
 
162
            LOG.debug(_("Target %s allocated"), iscsi_target)
 
163
        total_slots = FLAGS.iscsi_num_targets
 
164
        for _index in xrange(total_slots):
 
165
            volume_id = self._create_volume()
 
166
            d = self.volume.create_volume(self.context, volume_id)
 
167
            _check(d)
 
168
        for volume_id in volume_ids:
 
169
            self.volume.delete_volume(self.context, volume_id)
 
170
 
 
171
    def test_multi_node(self):
 
172
        # TODO(termie): Figure out how to test with two nodes,
 
173
        # each of them having a different FLAG for storage_node
 
174
        # This will allow us to test cross-node interactions
 
175
        pass