24
24
from cinder.openstack.common import jsonutils
25
25
from cinder.openstack.common.rpc import common as rpc_common
26
26
from cinder import test
27
from cinder.tests.api.openstack import fakes
27
from cinder.tests.api import fakes
28
from cinder.tests.api.v2 import stubs
28
29
from cinder import volume
29
30
from cinder.volume import api as volume_api
58
59
def test_simple_api_actions(self):
59
60
app = fakes.wsgi_app()
60
61
for _action in self._actions:
61
req = webob.Request.blank('/v1/fake/volumes/%s/action' %
62
req = webob.Request.blank('/v2/fake/volumes/%s/action' %
63
64
req.method = 'POST'
64
65
req.body = jsonutils.dumps({_action: None})
65
66
req.content_type = 'application/json'
73
74
fake_initialize_connection)
75
76
body = {'os-initialize_connection': {'connector': 'fake'}}
76
req = webob.Request.blank('/v1/fake/volumes/1/action')
77
req = webob.Request.blank('/v2/fake/volumes/1/action')
77
78
req.method = "POST"
78
79
req.body = jsonutils.dumps(body)
79
80
req.headers["content-type"] = "application/json"
88
89
fake_terminate_connection)
90
91
body = {'os-terminate_connection': {'connector': 'fake'}}
91
req = webob.Request.blank('/v1/fake/volumes/1/action')
92
req = webob.Request.blank('/v2/fake/volumes/1/action')
92
93
req.method = "POST"
93
94
req.body = jsonutils.dumps(body)
94
95
req.headers["content-type"] = "application/json"
99
100
def test_attach(self):
100
101
body = {'os-attach': {'instance_uuid': 'fake',
101
102
'mountpoint': '/dev/vdc'}}
102
req = webob.Request.blank('/v1/fake/volumes/1/action')
103
req = webob.Request.blank('/v2/fake/volumes/1/action')
103
104
req.method = "POST"
104
105
req.body = jsonutils.dumps(body)
105
106
req.headers["content-type"] = "application/json"
111
112
def stub_volume_get(self, context, volume_id):
112
volume = fakes.stub_volume(volume_id)
113
volume = stubs.stub_volume(volume_id)
113
114
if volume_id == 5:
114
115
volume['status'] = 'in-use'
150
151
"image_name": 'image_name',
152
153
body = {"os-volume_upload_image": vol}
153
req = fakes.HTTPRequest.blank('/v1/tenant1/volumes/%s/action' % id)
154
req = fakes.HTTPRequest.blank('/v2/tenant1/volumes/%s/action' % id)
154
155
res_dict = self.controller._volume_upload_image(req, id, body)
155
156
expected = {'os-volume_upload_image': {'id': id,
156
'updated_at': datetime.datetime(1, 1, 1, 1, 1, 1),
157
'status': 'uploading',
158
'display_description': 'displaydesc',
160
'volume_type': {'name': 'vol_type_name'},
162
'container_format': 'bare',
163
'disk_format': 'raw',
164
'image_name': 'image_name'}}
157
'updated_at': datetime.datetime(1, 1, 1, 1, 1, 1),
158
'status': 'uploading',
159
'display_description': 'displaydesc',
161
'volume_type': {'name': 'vol_type_name'},
163
'container_format': 'bare',
164
'disk_format': 'raw',
165
'image_name': 'image_name'}}
165
166
self.assertDictMatch(res_dict, expected)
167
168
def test_copy_volume_to_image_volumenotfound(self):
176
177
"image_name": 'image_name',
178
179
body = {"os-volume_upload_image": vol}
179
req = fakes.HTTPRequest.blank('/v1/tenant1/volumes/%s/action' % id)
180
req = fakes.HTTPRequest.blank('/v2/tenant1/volumes/%s/action' % id)
180
181
self.assertRaises(webob.exc.HTTPNotFound,
181
182
self.controller._volume_upload_image,
186
187
def test_copy_volume_to_image_invalidvolume(self):
187
188
def stub_upload_volume_to_image_service_raise(self, context, volume,
189
raise exception.InvalidVolume
190
raise exception.InvalidVolume(reason='blah')
190
191
self.stubs.Set(volume_api.API,
191
192
"copy_volume_to_image",
192
193
stub_upload_volume_to_image_service_raise)
197
198
"image_name": 'image_name',
199
200
body = {"os-volume_upload_image": vol}
200
req = fakes.HTTPRequest.blank('/v1/tenant1/volumes/%s/action' % id)
201
req = fakes.HTTPRequest.blank('/v2/tenant1/volumes/%s/action' % id)
201
202
self.assertRaises(webob.exc.HTTPBadRequest,
202
203
self.controller._volume_upload_image,
207
208
def test_copy_volume_to_image_valueerror(self):
208
209
def stub_upload_volume_to_image_service_raise(self, context, volume,
211
212
self.stubs.Set(volume_api.API,
212
213
"copy_volume_to_image",
218
219
"image_name": 'image_name',
220
221
body = {"os-volume_upload_image": vol}
221
req = fakes.HTTPRequest.blank('/v1/tenant1/volumes/%s/action' % id)
222
req = fakes.HTTPRequest.blank('/v2/tenant1/volumes/%s/action' % id)
222
223
self.assertRaises(webob.exc.HTTPBadRequest,
223
224
self.controller._volume_upload_image,
228
229
def test_copy_volume_to_image_remoteerror(self):
229
230
def stub_upload_volume_to_image_service_raise(self, context, volume,
231
232
raise rpc_common.RemoteError
232
233
self.stubs.Set(volume_api.API,
233
234
"copy_volume_to_image",
239
240
"image_name": 'image_name',
241
242
body = {"os-volume_upload_image": vol}
242
req = fakes.HTTPRequest.blank('/v1/tenant1/volumes/%s/action' % id)
243
req = fakes.HTTPRequest.blank('/v2/tenant1/volumes/%s/action' % id)
243
244
self.assertRaises(webob.exc.HTTPBadRequest,
244
245
self.controller._volume_upload_image,