1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2011 Justin Santa Barbara
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may
7
# not use this file except in compliance with the License. You may obtain
8
# a copy of the License at
10
# http://www.apache.org/licenses/LICENSE-2.0
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15
# License for the specific language governing permissions and limitations
21
from nova.openstack.common.log import logging
22
from nova import service
23
from nova.tests.integrated.api import client
24
from nova.tests.integrated import integrated_helpers
25
from nova.volume import driver
28
LOG = logging.getLogger(__name__)
31
class VolumesTest(integrated_helpers._IntegratedTestBase):
33
super(VolumesTest, self).setUp()
34
driver.LoggingVolumeDriver.clear_logs()
36
def _start_api_service(self):
37
self.osapi = service.WSGIService("osapi_volume")
39
self.auth_url = 'http://%s:%s/v1' % (self.osapi.host, self.osapi.port)
40
LOG.warn(self.auth_url)
43
f = super(VolumesTest, self)._get_flags()
44
f['use_local_volumes'] = False # Avoids calling local_path
45
f['volume_driver'] = 'nova.volume.driver.LoggingVolumeDriver'
48
def test_get_volumes_summary(self):
49
"""Simple check that listing volumes works."""
50
volumes = self.api.get_volumes(False)
51
for volume in volumes:
52
LOG.debug("volume: %s" % volume)
54
def test_get_volumes(self):
55
"""Simple check that listing volumes works."""
56
volumes = self.api.get_volumes()
57
for volume in volumes:
58
LOG.debug("volume: %s" % volume)
60
def _poll_while(self, volume_id, continue_states, max_retries=5):
61
"""Poll (briefly) while the state is in continue_states."""
65
found_volume = self.api.get_volume(volume_id)
66
except client.OpenStackApiNotFoundException:
68
LOG.debug("Got 404, proceeding")
71
LOG.debug("Found %s" % found_volume)
73
self.assertEqual(volume_id, found_volume['id'])
75
if not found_volume['status'] in continue_states:
80
if retries > max_retries:
84
def test_create_and_delete_volume(self):
85
"""Creates and deletes a volume."""
88
created_volume = self.api.post_volume({'volume': {'size': 1}})
89
LOG.debug("created_volume: %s" % created_volume)
90
self.assertTrue(created_volume['id'])
91
created_volume_id = created_volume['id']
94
found_volume = self.api.get_volume(created_volume_id)
95
self.assertEqual(created_volume_id, found_volume['id'])
97
# It should also be in the all-volume list
98
volumes = self.api.get_volumes()
99
volume_names = [volume['id'] for volume in volumes]
100
self.assertTrue(created_volume_id in volume_names)
102
# Wait (briefly) for creation. Delay is due to the 'message queue'
103
found_volume = self._poll_while(created_volume_id, ['creating'])
105
# It should be available...
106
self.assertEqual('available', found_volume['status'])
109
self.api.delete_volume(created_volume_id)
111
# Wait (briefly) for deletion. Delay is due to the 'message queue'
112
found_volume = self._poll_while(created_volume_id, ['deleting'])
115
self.assertFalse(found_volume)
117
LOG.debug("Logs: %s" % driver.LoggingVolumeDriver.all_logs())
119
create_actions = driver.LoggingVolumeDriver.logs_like(
121
id=created_volume_id)
122
LOG.debug("Create_Actions: %s" % create_actions)
124
self.assertEquals(1, len(create_actions))
125
create_action = create_actions[0]
126
self.assertEquals(create_action['id'], created_volume_id)
127
self.assertEquals(create_action['availability_zone'], 'nova')
128
self.assertEquals(create_action['size'], 1)
130
export_actions = driver.LoggingVolumeDriver.logs_like(
132
id=created_volume_id)
133
self.assertEquals(1, len(export_actions))
134
export_action = export_actions[0]
135
self.assertEquals(export_action['id'], created_volume_id)
136
self.assertEquals(export_action['availability_zone'], 'nova')
138
delete_actions = driver.LoggingVolumeDriver.logs_like(
140
id=created_volume_id)
141
self.assertEquals(1, len(delete_actions))
142
delete_action = export_actions[0]
143
self.assertEquals(delete_action['id'], created_volume_id)
145
def test_create_volume_with_metadata(self):
146
"""Creates a volume with metadata."""
149
metadata = {'key1': 'value1',
151
created_volume = self.api.post_volume(
152
{'volume': {'size': 1,
153
'metadata': metadata}})
154
LOG.debug("created_volume: %s" % created_volume)
155
self.assertTrue(created_volume['id'])
156
created_volume_id = created_volume['id']
158
# Check it's there and metadata present
159
found_volume = self.api.get_volume(created_volume_id)
160
self.assertEqual(created_volume_id, found_volume['id'])
161
self.assertEqual(metadata, found_volume['metadata'])
163
def test_create_volume_in_availability_zone(self):
164
"""Creates a volume in availability_zone."""
167
availability_zone = 'zone1:host1'
168
created_volume = self.api.post_volume(
169
{'volume': {'size': 1,
170
'availability_zone': availability_zone}})
171
LOG.debug("created_volume: %s" % created_volume)
172
self.assertTrue(created_volume['id'])
173
created_volume_id = created_volume['id']
175
# Check it's there and availability zone present
176
found_volume = self.api.get_volume(created_volume_id)
177
self.assertEqual(created_volume_id, found_volume['id'])
178
self.assertEqual(availability_zone, found_volume['availability_zone'])
180
if __name__ == "__main__":