1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2010 United States Government as represented by the
4
# Administrator of the National Aeronautics and Space Administration.
7
# Licensed under the Apache License, Version 2.0 (the "License"); you may
8
# not use this file except in compliance with the License. You may obtain
9
# a copy of the License at
11
# http://www.apache.org/licenses/LICENSE-2.0
13
# Unless required by applicable law or agreed to in writing, software
14
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16
# License for the specific language governing permissions and limitations
19
Tests for Volume Code.
23
from nova import context
24
from nova import exception
26
from nova import flags
27
from nova import log as logging
29
from nova import utils
32
LOG = logging.getLogger('nova.tests.volume')
35
class VolumeTestCase(test.TestCase):
36
"""Test Case for volumes."""
39
super(VolumeTestCase, self).setUp()
40
self.compute = utils.import_object(FLAGS.compute_manager)
41
self.flags(connection_type='fake')
42
self.volume = utils.import_object(FLAGS.volume_manager)
43
self.context = context.get_admin_context()
46
def _create_volume(size='0'):
47
"""Create a volume object."""
50
vol['user_id'] = 'fake'
51
vol['project_id'] = 'fake'
52
vol['availability_zone'] = FLAGS.storage_availability_zone
53
vol['status'] = "creating"
54
vol['attach_status'] = "detached"
55
return db.volume_create(context.get_admin_context(), vol)['id']
57
def test_create_delete_volume(self):
58
"""Test volume can be created and deleted."""
59
volume_id = self._create_volume()
60
self.volume.create_volume(self.context, volume_id)
61
self.assertEqual(volume_id, db.volume_get(context.get_admin_context(),
64
self.volume.delete_volume(self.context, volume_id)
65
self.assertRaises(exception.NotFound,
70
def test_too_big_volume(self):
71
"""Ensure failure if a too large of a volume is requested."""
72
# FIXME(vish): validation needs to move into the data layer in
76
volume_id = self._create_volume('1001')
77
self.volume.create_volume(self.context, volume_id)
78
self.fail("Should have thrown TypeError")
82
def test_too_many_volumes(self):
83
"""Ensure that NoMoreTargets is raised when we run out of volumes."""
85
total_slots = FLAGS.iscsi_num_targets
86
for _index in xrange(total_slots):
87
volume_id = self._create_volume()
88
self.volume.create_volume(self.context, volume_id)
89
vols.append(volume_id)
90
volume_id = self._create_volume()
91
self.assertRaises(db.NoMoreTargets,
92
self.volume.create_volume,
95
db.volume_destroy(context.get_admin_context(), volume_id)
96
for volume_id in vols:
97
self.volume.delete_volume(self.context, volume_id)
99
def test_run_attach_detach_volume(self):
100
"""Make sure volume can be attached and detached from instance."""
102
inst['image_id'] = 'ami-test'
103
inst['reservation_id'] = 'r-fakeres'
104
inst['launch_time'] = '10'
105
inst['user_id'] = 'fake'
106
inst['project_id'] = 'fake'
107
inst['instance_type'] = 'm1.tiny'
108
inst['mac_address'] = utils.generate_mac()
109
inst['ami_launch_index'] = 0
110
instance_id = db.instance_create(self.context, inst)['id']
111
mountpoint = "/dev/sdf"
112
volume_id = self._create_volume()
113
self.volume.create_volume(self.context, volume_id)
115
db.volume_attached(self.context, volume_id, instance_id,
118
self.compute.attach_volume(self.context,
122
vol = db.volume_get(context.get_admin_context(), volume_id)
123
self.assertEqual(vol['status'], "in-use")
124
self.assertEqual(vol['attach_status'], "attached")
125
self.assertEqual(vol['mountpoint'], mountpoint)
126
instance_ref = db.volume_get_instance(self.context, volume_id)
127
self.assertEqual(instance_ref['id'], instance_id)
129
self.assertRaises(exception.Error,
130
self.volume.delete_volume,
134
db.volume_detached(self.context, volume_id)
136
self.compute.detach_volume(self.context,
139
vol = db.volume_get(self.context, volume_id)
140
self.assertEqual(vol['status'], "available")
142
self.volume.delete_volume(self.context, volume_id)
143
self.assertRaises(exception.Error,
147
db.instance_destroy(self.context, instance_id)
149
def test_concurrent_volumes_get_different_targets(self):
150
"""Ensure multiple concurrent volumes get different targets."""
154
def _check(volume_id):
155
"""Make sure targets aren't duplicated."""
156
volume_ids.append(volume_id)
157
admin_context = context.get_admin_context()
158
iscsi_target = db.volume_get_iscsi_target_num(admin_context,
160
self.assert_(iscsi_target not in targets)
161
targets.append(iscsi_target)
162
LOG.debug(_("Target %s allocated"), iscsi_target)
163
total_slots = FLAGS.iscsi_num_targets
164
for _index in xrange(total_slots):
165
volume_id = self._create_volume()
166
d = self.volume.create_volume(self.context, volume_id)
168
for volume_id in volume_ids:
169
self.volume.delete_volume(self.context, volume_id)
171
def test_multi_node(self):
172
# TODO(termie): Figure out how to test with two nodes,
173
# each of them having a different FLAG for storage_node
174
# This will allow us to test cross-node interactions