1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2010 United States Government as represented by the
4
# Administrator of the National Aeronautics and Space Administration.
7
# Licensed under the Apache License, Version 2.0 (the "License"); you may
8
# not use this file except in compliance with the License. You may obtain
9
# a copy of the License at
11
# http://www.apache.org/licenses/LICENSE-2.0
13
# Unless required by applicable law or agreed to in writing, software
14
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16
# License for the specific language governing permissions and limitations
20
Handles all requests relating to volumes.
25
from nova.db import base
26
from nova import exception
27
from nova import flags
28
from nova.image import glance
29
from nova.openstack.common import cfg
30
from nova.openstack.common import log as logging
31
from nova.openstack.common import rpc
32
from nova.openstack.common import timeutils
34
from nova import quota
35
from nova.scheduler import rpcapi as scheduler_rpcapi
37
volume_host_opt = cfg.BoolOpt('snapshot_same_host',
39
help='Create volume from snapshot at the host where snapshot resides')
42
FLAGS.register_opt(volume_host_opt)
43
flags.DECLARE('storage_availability_zone', 'nova.volume.manager')
45
LOG = logging.getLogger(__name__)
51
def wrap_check_policy(func):
52
"""Check policy corresponding to the wrapped methods prior to execution
54
This decorator requires the first 3 args of the wrapped function
55
to be (self, context, volume)
57
@functools.wraps(func)
58
def wrapped(self, context, target_obj, *args, **kwargs):
59
check_policy(context, func.__name__, target_obj)
60
return func(self, context, target_obj, *args, **kwargs)
65
def check_policy(context, action, target_obj=None):
67
'project_id': context.project_id,
68
'user_id': context.user_id,
70
target.update(target_obj or {})
71
_action = 'volume:%s' % action
72
nova.policy.enforce(context, _action, target)
76
"""API for interacting with the volume manager."""
78
def __init__(self, image_service=None, **kwargs):
79
self.image_service = (image_service or
80
glance.get_default_image_service())
81
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
82
super(API, self).__init__(**kwargs)
84
def create(self, context, size, name, description, snapshot=None,
85
image_id=None, volume_type=None, metadata=None,
86
availability_zone=None):
87
check_policy(context, 'create')
88
if snapshot is not None:
89
if snapshot['status'] != "available":
90
msg = _("status must be available")
91
raise exception.InvalidSnapshot(reason=msg)
93
size = snapshot['volume_size']
95
snapshot_id = snapshot['id']
105
# tolerate size as stringified int
108
if not isinstance(size, int) or size <= 0:
109
msg = (_("Volume size '%s' must be an integer and greater than 0")
111
raise exception.InvalidInput(reason=msg)
115
reservations = QUOTAS.reserve(context, volumes=1, gigabytes=size)
116
except exception.OverQuota as e:
117
overs = e.kwargs['overs']
118
usages = e.kwargs['usages']
119
quotas = e.kwargs['quotas']
122
return (usages[name]['reserved'] + usages[name]['in_use'])
124
pid = context.project_id
125
if 'gigabytes' in overs:
126
consumed = _consumed('gigabytes')
127
quota = quotas['gigabytes']
128
LOG.warn(_("Quota exceeded for %(pid)s, tried to create "
129
"%(size)sG volume (%(consumed)dG of %(quota)dG "
130
"already consumed)") % locals())
131
raise exception.VolumeSizeTooLarge()
132
elif 'volumes' in overs:
133
consumed = _consumed('volumes')
134
LOG.warn(_("Quota exceeded for %(pid)s, tried to create "
135
"volume (%(consumed)d volumes already consumed)")
137
raise exception.VolumeLimitExceeded(allowed=quotas['volumes'])
140
# check image existence
141
image_meta = self.image_service.show(context, image_id)
142
image_size_in_gb = (int(image_meta['size']) + GB - 1) / GB
143
#check image size is not larger than volume size.
144
if image_size_in_gb > size:
145
msg = _('Size of specified image is larger than volume size.')
146
raise exception.InvalidInput(reason=msg)
148
if availability_zone is None:
149
availability_zone = FLAGS.storage_availability_zone
151
if volume_type is None:
152
volume_type_id = None
154
volume_type_id = volume_type.get('id', None)
158
'user_id': context.user_id,
159
'project_id': context.project_id,
160
'snapshot_id': snapshot_id,
161
'availability_zone': availability_zone,
162
'status': "creating",
163
'attach_status': "detached",
164
'display_name': name,
165
'display_description': description,
166
'volume_type_id': volume_type_id,
167
'metadata': metadata,
169
volume = self.db.volume_create(context, options)
172
QUOTAS.commit(context, reservations)
174
self._cast_create_volume(context, volume['id'],
175
snapshot_id, image_id)
178
def _cast_create_volume(self, context, volume_id,
179
snapshot_id, image_id):
181
# NOTE(Rongze Zhu): It is a simple solution for bug 1008866
182
# If snapshot_id is set, make the call create volume directly to
183
# the volume host where the snapshot resides instead of passing it
184
# through the scheduer. So snapshot can be copy to new volume.
186
if snapshot_id and FLAGS.snapshot_same_host:
187
snapshot_ref = self.db.snapshot_get(context, snapshot_id)
188
src_volume_ref = self.db.volume_get(context,
189
snapshot_ref['volume_id'])
190
topic = rpc.queue_get_for(context,
192
src_volume_ref['host'])
195
{"method": "create_volume",
196
"args": {"volume_id": volume_id,
197
"snapshot_id": snapshot_id,
198
"image_id": image_id}})
201
self.scheduler_rpcapi.create_volume(
202
context, volume_id, snapshot_id, image_id)
205
def delete(self, context, volume, force=False):
206
volume_id = volume['id']
207
if not volume['host']:
208
# NOTE(vish): scheduling failed, so delete it
209
# Note(zhiteng): update volume quota reservation
211
reservations = QUOTAS.reserve(context, volumes=-1,
212
gigabytes=-volume['size'])
215
LOG.exception(_("Failed to update quota for deleting volume."))
217
self.db.volume_destroy(context, volume_id)
220
QUOTAS.commit(context, reservations)
222
if not force and volume['status'] not in ["available", "error"]:
223
msg = _("Volume status must be available or error")
224
raise exception.InvalidVolume(reason=msg)
226
snapshots = self.db.snapshot_get_all_for_volume(context, volume_id)
228
msg = _("Volume still has %d dependent snapshots") % len(snapshots)
229
raise exception.InvalidVolume(reason=msg)
231
now = timeutils.utcnow()
232
self.db.volume_update(context, volume_id, {'status': 'deleting',
233
'terminated_at': now})
234
host = volume['host']
236
rpc.queue_get_for(context, FLAGS.volume_topic, host),
237
{"method": "delete_volume",
238
"args": {"volume_id": volume_id}})
241
def update(self, context, volume, fields):
242
self.db.volume_update(context, volume['id'], fields)
244
def get(self, context, volume_id):
245
rv = self.db.volume_get(context, volume_id)
246
volume = dict(rv.iteritems())
247
check_policy(context, 'get', volume)
250
def get_all(self, context, search_opts=None):
251
check_policy(context, 'get_all')
253
if search_opts is None:
256
if (context.is_admin and 'all_tenants' in search_opts):
257
# Need to remove all_tenants to pass the filtering below.
258
del search_opts['all_tenants']
259
volumes = self.db.volume_get_all(context)
261
volumes = self.db.volume_get_all_by_project(context,
264
LOG.debug(_("Searching by: %s") % str(search_opts))
266
def _check_metadata_match(volume, searchdict):
268
for i in volume.get('volume_metadata'):
269
volume_metadata[i['key']] = i['value']
271
for k, v in searchdict.iteritems():
272
if (k not in volume_metadata.keys() or
273
volume_metadata[k] != v):
277
# search_option to filter_name mapping.
278
filter_mapping = {'metadata': _check_metadata_match}
281
for volume in volumes:
282
# go over all filters in the list
283
for opt, values in search_opts.iteritems():
285
filter_func = filter_mapping[opt]
287
# no such filter - ignore it, go to next filter
290
if filter_func(volume, values):
291
result.append(volume)
296
def get_snapshot(self, context, snapshot_id):
297
check_policy(context, 'get_snapshot')
298
rv = self.db.snapshot_get(context, snapshot_id)
299
return dict(rv.iteritems())
301
def get_all_snapshots(self, context, search_opts=None):
302
check_policy(context, 'get_all_snapshots')
304
search_opts = search_opts or {}
306
if (context.is_admin and 'all_tenants' in search_opts):
307
# Need to remove all_tenants to pass the filtering below.
308
del search_opts['all_tenants']
309
return self.db.snapshot_get_all(context)
311
return self.db.snapshot_get_all_by_project(context,
315
def check_attach(self, context, volume):
316
# TODO(vish): abstract status checking?
317
if volume['status'] != "available":
318
msg = _("status must be available")
319
raise exception.InvalidVolume(reason=msg)
320
if volume['attach_status'] == "attached":
321
msg = _("already attached")
322
raise exception.InvalidVolume(reason=msg)
325
def check_detach(self, context, volume):
326
# TODO(vish): abstract status checking?
327
if volume['status'] == "available":
328
msg = _("already detached")
329
raise exception.InvalidVolume(reason=msg)
332
def reserve_volume(self, context, volume):
333
self.db.volume_update(context, volume['id'], {"status": "attaching"})
336
def unreserve_volume(self, context, volume):
337
if volume['status'] == "attaching":
338
self.db.volume_update(context,
340
{"status": "available"})
343
def begin_detaching(self, context, volume):
344
self.db.volume_update(context, volume['id'], {"status": "detaching"})
347
def roll_detaching(self, context, volume):
348
if volume['status'] == "detaching":
349
self.db.volume_update(context,
351
{"status": "in-use"})
354
def attach(self, context, volume, instance_uuid, mountpoint):
355
host = volume['host']
356
queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
357
return rpc.call(context, queue,
358
{"method": "attach_volume",
359
"args": {"volume_id": volume['id'],
360
"instance_uuid": instance_uuid,
361
"mountpoint": mountpoint}})
364
def detach(self, context, volume):
365
host = volume['host']
366
queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
367
return rpc.call(context, queue,
368
{"method": "detach_volume",
369
"args": {"volume_id": volume['id']}})
372
def initialize_connection(self, context, volume, connector):
373
host = volume['host']
374
queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
375
return rpc.call(context, queue,
376
{"method": "initialize_connection",
377
"args": {"volume_id": volume['id'],
378
"connector": connector}})
381
def terminate_connection(self, context, volume, connector):
382
self.unreserve_volume(context, volume)
383
host = volume['host']
384
queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
385
return rpc.call(context, queue,
386
{"method": "terminate_connection",
387
"args": {"volume_id": volume['id'],
388
"connector": connector}})
390
def _create_snapshot(self, context, volume, name, description,
392
check_policy(context, 'create_snapshot', volume)
394
if ((not force) and (volume['status'] != "available")):
395
msg = _("must be available")
396
raise exception.InvalidVolume(reason=msg)
399
'volume_id': volume['id'],
400
'user_id': context.user_id,
401
'project_id': context.project_id,
402
'status': "creating",
404
'volume_size': volume['size'],
405
'display_name': name,
406
'display_description': description}
408
snapshot = self.db.snapshot_create(context, options)
409
host = volume['host']
411
rpc.queue_get_for(context, FLAGS.volume_topic, host),
412
{"method": "create_snapshot",
413
"args": {"volume_id": volume['id'],
414
"snapshot_id": snapshot['id']}})
417
def create_snapshot(self, context, volume, name, description):
418
return self._create_snapshot(context, volume, name, description,
421
def create_snapshot_force(self, context, volume, name, description):
422
return self._create_snapshot(context, volume, name, description,
426
def delete_snapshot(self, context, snapshot):
427
if snapshot['status'] not in ["available", "error"]:
428
msg = _("Volume Snapshot status must be available or error")
429
raise exception.InvalidVolume(reason=msg)
430
self.db.snapshot_update(context, snapshot['id'],
431
{'status': 'deleting'})
432
volume = self.db.volume_get(context, snapshot['volume_id'])
433
host = volume['host']
435
rpc.queue_get_for(context, FLAGS.volume_topic, host),
436
{"method": "delete_snapshot",
437
"args": {"snapshot_id": snapshot['id']}})
440
def get_volume_metadata(self, context, volume):
441
"""Get all metadata associated with a volume."""
442
rv = self.db.volume_metadata_get(context, volume['id'])
443
return dict(rv.iteritems())
446
def delete_volume_metadata(self, context, volume, key):
447
"""Delete the given metadata item from a volume."""
448
self.db.volume_metadata_delete(context, volume['id'], key)
451
def update_volume_metadata(self, context, volume, metadata, delete=False):
452
"""Updates or creates volume metadata.
454
If delete is True, metadata items that are not specified in the
455
`metadata` argument will be deleted.
461
_metadata = self.get_volume_metadata(context, volume['id'])
462
_metadata.update(metadata)
464
self.db.volume_metadata_update(context, volume['id'], _metadata, True)
467
def get_volume_metadata_value(self, volume, key):
468
"""Get value of particular metadata key."""
469
metadata = volume.get('volume_metadata')
471
for i in volume['volume_metadata']:
476
def _check_volume_availability(self, context, volume, force):
477
"""Check if the volume can be used."""
478
if volume['status'] not in ['available', 'in-use']:
479
msg = _('Volume status must be available/in-use.')
480
raise exception.InvalidVolume(reason=msg)
481
if not force and 'in-use' == volume['status']:
482
msg = _('Volume status is in-use.')
483
raise exception.InvalidVolume(reason=msg)
486
def copy_volume_to_image(self, context, volume, metadata, force):
487
"""Create a new image from the specified volume."""
488
self._check_volume_availability(context, volume, force)
490
recv_metadata = self.image_service.create(context, metadata)
491
self.update(context, volume, {'status': 'uploading'})
493
rpc.queue_get_for(context,
496
{"method": "copy_volume_to_image",
497
"args": {"volume_id": volume['id'],
498
"image_id": recv_metadata['id']}})
500
response = {"id": volume['id'],
501
"updated_at": volume['updated_at'],
502
"status": 'uploading',
503
"display_description": volume['display_description'],
504
"size": volume['size'],
505
"volume_type": volume['volume_type'],
506
"image_id": recv_metadata['id'],
507
"container_format": recv_metadata['container_format'],
508
"disk_format": recv_metadata['disk_format'],
509
"image_name": recv_metadata.get('name', None)