~ubuntu-branches/ubuntu/raring/nova/raring-proposed

« back to all changes in this revision

Viewing changes to nova/volume/api.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Adam Gandelman, Chuck Short
  • Date: 2012-11-23 09:04:58 UTC
  • mfrom: (1.1.66)
  • Revision ID: package-import@ubuntu.com-20121123090458-91565o7aev1i1h71
Tags: 2013.1~g1-0ubuntu1
[ Adam Gandelman ]
* debian/control: Ensure novaclient is upgraded with nova,
  require python-keystoneclient >= 1:2.9.0. (LP: #1073289)
* debian/patches/{ubuntu/*, rbd-security.patch}: Dropped, applied
  upstream.
* debian/control: Add python-testtools to Build-Depends.

[ Chuck Short ]
* New upstream version.
* Refreshed debian/patches/avoid_setuptools_git_dependency.patch.
* debian/rules: FTBFS if missing binaries.
* debian/nova-scheudler.install: Add missing rabbit-queues and
  nova-rpc-zmq-receiver.
* Remove nova-volume since it doesnt exist anymore, transition to cinder-*.
* debian/rules: install apport hook in the right place.
* debian/patches/ubuntu-show-tests.patch: Display test failures.
* debian/control: Add depends on genisoimage
* debian/control: Suggest guestmount.
* debian/control: Suggest websockify. (LP: #1076442)
* debian/nova.conf: Disable nova-volume service.
* debian/control: Depend on xen-system-* rather than the hypervisor.
* debian/control, debian/mans/nova-conductor.8, debian/nova-conductor.init,
  debian/nova-conductor.install, debian/nova-conductor.logrotate
  debian/nova-conductor.manpages, debian/nova-conductor.postrm
  debian/nova-conductor.upstart.in: Add nova-conductor service.
* debian/control: Add python-fixtures as a build deps.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
 
 
3
 
# Copyright 2010 United States Government as represented by the
4
 
# Administrator of the National Aeronautics and Space Administration.
5
 
# All Rights Reserved.
6
 
#
7
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
8
 
#    not use this file except in compliance with the License. You may obtain
9
 
#    a copy of the License at
10
 
#
11
 
#         http://www.apache.org/licenses/LICENSE-2.0
12
 
#
13
 
#    Unless required by applicable law or agreed to in writing, software
14
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16
 
#    License for the specific language governing permissions and limitations
17
 
#    under the License.
18
 
 
19
 
"""
20
 
Handles all requests relating to volumes.
21
 
"""
22
 
 
23
 
import functools
24
 
 
25
 
from nova.db import base
26
 
from nova import exception
27
 
from nova import flags
28
 
from nova.image import glance
29
 
from nova.openstack.common import cfg
30
 
from nova.openstack.common import log as logging
31
 
from nova.openstack.common import rpc
32
 
from nova.openstack.common import timeutils
33
 
import nova.policy
34
 
from nova import quota
35
 
from nova.scheduler import rpcapi as scheduler_rpcapi
36
 
 
37
 
volume_host_opt = cfg.BoolOpt('snapshot_same_host',
38
 
        default=True,
39
 
        help='Create volume from snapshot at the host where snapshot resides')
40
 
 
41
 
FLAGS = flags.FLAGS
42
 
FLAGS.register_opt(volume_host_opt)
43
 
flags.DECLARE('storage_availability_zone', 'nova.volume.manager')
44
 
 
45
 
LOG = logging.getLogger(__name__)
46
 
GB = 1048576 * 1024
47
 
 
48
 
QUOTAS = quota.QUOTAS
49
 
 
50
 
 
51
 
def wrap_check_policy(func):
52
 
    """Check policy corresponding to the wrapped methods prior to execution
53
 
 
54
 
    This decorator requires the first 3 args of the wrapped function
55
 
    to be (self, context, volume)
56
 
    """
57
 
    @functools.wraps(func)
58
 
    def wrapped(self, context, target_obj, *args, **kwargs):
59
 
        check_policy(context, func.__name__, target_obj)
60
 
        return func(self, context, target_obj, *args, **kwargs)
61
 
 
62
 
    return wrapped
63
 
 
64
 
 
65
 
def check_policy(context, action, target_obj=None):
66
 
    target = {
67
 
        'project_id': context.project_id,
68
 
        'user_id': context.user_id,
69
 
    }
70
 
    target.update(target_obj or {})
71
 
    _action = 'volume:%s' % action
72
 
    nova.policy.enforce(context, _action, target)
73
 
 
74
 
 
75
 
class API(base.Base):
76
 
    """API for interacting with the volume manager."""
77
 
 
78
 
    def __init__(self, image_service=None, **kwargs):
79
 
        self.image_service = (image_service or
80
 
                              glance.get_default_image_service())
81
 
        self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
82
 
        super(API, self).__init__(**kwargs)
83
 
 
84
 
    def create(self, context, size, name, description, snapshot=None,
85
 
                image_id=None, volume_type=None, metadata=None,
86
 
                availability_zone=None):
87
 
        check_policy(context, 'create')
88
 
        if snapshot is not None:
89
 
            if snapshot['status'] != "available":
90
 
                msg = _("status must be available")
91
 
                raise exception.InvalidSnapshot(reason=msg)
92
 
            if not size:
93
 
                size = snapshot['volume_size']
94
 
 
95
 
            snapshot_id = snapshot['id']
96
 
        else:
97
 
            snapshot_id = None
98
 
 
99
 
        def as_int(s):
100
 
            try:
101
 
                return int(s)
102
 
            except ValueError:
103
 
                return s
104
 
 
105
 
        # tolerate size as stringified int
106
 
        size = as_int(size)
107
 
 
108
 
        if not isinstance(size, int) or size <= 0:
109
 
            msg = (_("Volume size '%s' must be an integer and greater than 0")
110
 
                   % size)
111
 
            raise exception.InvalidInput(reason=msg)
112
 
 
113
 
        reservations = None
114
 
        try:
115
 
            reservations = QUOTAS.reserve(context, volumes=1, gigabytes=size)
116
 
        except exception.OverQuota as e:
117
 
            overs = e.kwargs['overs']
118
 
            usages = e.kwargs['usages']
119
 
            quotas = e.kwargs['quotas']
120
 
 
121
 
            def _consumed(name):
122
 
                return (usages[name]['reserved'] + usages[name]['in_use'])
123
 
 
124
 
            pid = context.project_id
125
 
            if 'gigabytes' in overs:
126
 
                consumed = _consumed('gigabytes')
127
 
                quota = quotas['gigabytes']
128
 
                LOG.warn(_("Quota exceeded for %(pid)s, tried to create "
129
 
                           "%(size)sG volume (%(consumed)dG of %(quota)dG "
130
 
                           "already consumed)") % locals())
131
 
                raise exception.VolumeSizeTooLarge()
132
 
            elif 'volumes' in overs:
133
 
                consumed = _consumed('volumes')
134
 
                LOG.warn(_("Quota exceeded for %(pid)s, tried to create "
135
 
                           "volume (%(consumed)d volumes already consumed)")
136
 
                           % locals())
137
 
                raise exception.VolumeLimitExceeded(allowed=quotas['volumes'])
138
 
 
139
 
        if image_id:
140
 
            # check image existence
141
 
            image_meta = self.image_service.show(context, image_id)
142
 
            image_size_in_gb = (int(image_meta['size']) + GB - 1) / GB
143
 
            #check image size is not larger than volume size.
144
 
            if image_size_in_gb > size:
145
 
                msg = _('Size of specified image is larger than volume size.')
146
 
                raise exception.InvalidInput(reason=msg)
147
 
 
148
 
        if availability_zone is None:
149
 
            availability_zone = FLAGS.storage_availability_zone
150
 
 
151
 
        if volume_type is None:
152
 
            volume_type_id = None
153
 
        else:
154
 
            volume_type_id = volume_type.get('id', None)
155
 
 
156
 
        options = {
157
 
            'size': size,
158
 
            'user_id': context.user_id,
159
 
            'project_id': context.project_id,
160
 
            'snapshot_id': snapshot_id,
161
 
            'availability_zone': availability_zone,
162
 
            'status': "creating",
163
 
            'attach_status': "detached",
164
 
            'display_name': name,
165
 
            'display_description': description,
166
 
            'volume_type_id': volume_type_id,
167
 
            'metadata': metadata,
168
 
            }
169
 
        volume = self.db.volume_create(context, options)
170
 
 
171
 
        if reservations:
172
 
            QUOTAS.commit(context, reservations)
173
 
 
174
 
        self._cast_create_volume(context, volume['id'],
175
 
                                 snapshot_id, image_id)
176
 
        return volume
177
 
 
178
 
    def _cast_create_volume(self, context, volume_id,
179
 
                            snapshot_id, image_id):
180
 
 
181
 
        # NOTE(Rongze Zhu): It is a simple solution for bug 1008866
182
 
        # If snapshot_id is set, make the call create volume directly to
183
 
        # the volume host where the snapshot resides instead of passing it
184
 
        # through the scheduer. So snapshot can be copy to new volume.
185
 
 
186
 
        if snapshot_id and FLAGS.snapshot_same_host:
187
 
            snapshot_ref = self.db.snapshot_get(context, snapshot_id)
188
 
            src_volume_ref = self.db.volume_get(context,
189
 
                                                snapshot_ref['volume_id'])
190
 
            topic = rpc.queue_get_for(context,
191
 
                                      FLAGS.volume_topic,
192
 
                                      src_volume_ref['host'])
193
 
            rpc.cast(context,
194
 
                     topic,
195
 
                     {"method": "create_volume",
196
 
                      "args": {"volume_id": volume_id,
197
 
                               "snapshot_id": snapshot_id,
198
 
                               "image_id": image_id}})
199
 
 
200
 
        else:
201
 
            self.scheduler_rpcapi.create_volume(
202
 
                context, volume_id, snapshot_id, image_id)
203
 
 
204
 
    @wrap_check_policy
205
 
    def delete(self, context, volume, force=False):
206
 
        volume_id = volume['id']
207
 
        if not volume['host']:
208
 
            # NOTE(vish): scheduling failed, so delete it
209
 
            # Note(zhiteng): update volume quota reservation
210
 
            try:
211
 
                reservations = QUOTAS.reserve(context, volumes=-1,
212
 
                                              gigabytes=-volume['size'])
213
 
            except Exception:
214
 
                reservations = None
215
 
                LOG.exception(_("Failed to update quota for deleting volume."))
216
 
 
217
 
            self.db.volume_destroy(context, volume_id)
218
 
 
219
 
            if reservations:
220
 
                QUOTAS.commit(context, reservations)
221
 
            return
222
 
        if not force and volume['status'] not in ["available", "error"]:
223
 
            msg = _("Volume status must be available or error")
224
 
            raise exception.InvalidVolume(reason=msg)
225
 
 
226
 
        snapshots = self.db.snapshot_get_all_for_volume(context, volume_id)
227
 
        if len(snapshots):
228
 
            msg = _("Volume still has %d dependent snapshots") % len(snapshots)
229
 
            raise exception.InvalidVolume(reason=msg)
230
 
 
231
 
        now = timeutils.utcnow()
232
 
        self.db.volume_update(context, volume_id, {'status': 'deleting',
233
 
                                                   'terminated_at': now})
234
 
        host = volume['host']
235
 
        rpc.cast(context,
236
 
                 rpc.queue_get_for(context, FLAGS.volume_topic, host),
237
 
                 {"method": "delete_volume",
238
 
                  "args": {"volume_id": volume_id}})
239
 
 
240
 
    @wrap_check_policy
241
 
    def update(self, context, volume, fields):
242
 
        self.db.volume_update(context, volume['id'], fields)
243
 
 
244
 
    def get(self, context, volume_id):
245
 
        rv = self.db.volume_get(context, volume_id)
246
 
        volume = dict(rv.iteritems())
247
 
        check_policy(context, 'get', volume)
248
 
        return volume
249
 
 
250
 
    def get_all(self, context, search_opts=None):
251
 
        check_policy(context, 'get_all')
252
 
 
253
 
        if search_opts is None:
254
 
            search_opts = {}
255
 
 
256
 
        if (context.is_admin and 'all_tenants' in search_opts):
257
 
            # Need to remove all_tenants to pass the filtering below.
258
 
            del search_opts['all_tenants']
259
 
            volumes = self.db.volume_get_all(context)
260
 
        else:
261
 
            volumes = self.db.volume_get_all_by_project(context,
262
 
                                                        context.project_id)
263
 
        if search_opts:
264
 
            LOG.debug(_("Searching by: %s") % str(search_opts))
265
 
 
266
 
            def _check_metadata_match(volume, searchdict):
267
 
                volume_metadata = {}
268
 
                for i in volume.get('volume_metadata'):
269
 
                    volume_metadata[i['key']] = i['value']
270
 
 
271
 
                for k, v in searchdict.iteritems():
272
 
                    if (k not in volume_metadata.keys() or
273
 
                        volume_metadata[k] != v):
274
 
                        return False
275
 
                return True
276
 
 
277
 
            # search_option to filter_name mapping.
278
 
            filter_mapping = {'metadata': _check_metadata_match}
279
 
 
280
 
            result = []
281
 
            for volume in volumes:
282
 
                # go over all filters in the list
283
 
                for opt, values in search_opts.iteritems():
284
 
                    try:
285
 
                        filter_func = filter_mapping[opt]
286
 
                    except KeyError:
287
 
                        # no such filter - ignore it, go to next filter
288
 
                        continue
289
 
                    else:
290
 
                        if filter_func(volume, values):
291
 
                            result.append(volume)
292
 
                            break
293
 
            volumes = result
294
 
        return volumes
295
 
 
296
 
    def get_snapshot(self, context, snapshot_id):
297
 
        check_policy(context, 'get_snapshot')
298
 
        rv = self.db.snapshot_get(context, snapshot_id)
299
 
        return dict(rv.iteritems())
300
 
 
301
 
    def get_all_snapshots(self, context, search_opts=None):
302
 
        check_policy(context, 'get_all_snapshots')
303
 
 
304
 
        search_opts = search_opts or {}
305
 
 
306
 
        if (context.is_admin and 'all_tenants' in search_opts):
307
 
            # Need to remove all_tenants to pass the filtering below.
308
 
            del search_opts['all_tenants']
309
 
            return self.db.snapshot_get_all(context)
310
 
        else:
311
 
            return self.db.snapshot_get_all_by_project(context,
312
 
                                                       context.project_id)
313
 
 
314
 
    @wrap_check_policy
315
 
    def check_attach(self, context, volume):
316
 
        # TODO(vish): abstract status checking?
317
 
        if volume['status'] != "available":
318
 
            msg = _("status must be available")
319
 
            raise exception.InvalidVolume(reason=msg)
320
 
        if volume['attach_status'] == "attached":
321
 
            msg = _("already attached")
322
 
            raise exception.InvalidVolume(reason=msg)
323
 
 
324
 
    @wrap_check_policy
325
 
    def check_detach(self, context, volume):
326
 
        # TODO(vish): abstract status checking?
327
 
        if volume['status'] == "available":
328
 
            msg = _("already detached")
329
 
            raise exception.InvalidVolume(reason=msg)
330
 
 
331
 
    @wrap_check_policy
332
 
    def reserve_volume(self, context, volume):
333
 
        self.db.volume_update(context, volume['id'], {"status": "attaching"})
334
 
 
335
 
    @wrap_check_policy
336
 
    def unreserve_volume(self, context, volume):
337
 
        if volume['status'] == "attaching":
338
 
            self.db.volume_update(context,
339
 
                                  volume['id'],
340
 
                                  {"status": "available"})
341
 
 
342
 
    @wrap_check_policy
343
 
    def begin_detaching(self, context, volume):
344
 
        self.db.volume_update(context, volume['id'], {"status": "detaching"})
345
 
 
346
 
    @wrap_check_policy
347
 
    def roll_detaching(self, context, volume):
348
 
        if volume['status'] == "detaching":
349
 
            self.db.volume_update(context,
350
 
                                  volume['id'],
351
 
                                  {"status": "in-use"})
352
 
 
353
 
    @wrap_check_policy
354
 
    def attach(self, context, volume, instance_uuid, mountpoint):
355
 
        host = volume['host']
356
 
        queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
357
 
        return rpc.call(context, queue,
358
 
                        {"method": "attach_volume",
359
 
                         "args": {"volume_id": volume['id'],
360
 
                                  "instance_uuid": instance_uuid,
361
 
                                  "mountpoint": mountpoint}})
362
 
 
363
 
    @wrap_check_policy
364
 
    def detach(self, context, volume):
365
 
        host = volume['host']
366
 
        queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
367
 
        return rpc.call(context, queue,
368
 
                 {"method": "detach_volume",
369
 
                  "args": {"volume_id": volume['id']}})
370
 
 
371
 
    @wrap_check_policy
372
 
    def initialize_connection(self, context, volume, connector):
373
 
        host = volume['host']
374
 
        queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
375
 
        return rpc.call(context, queue,
376
 
                        {"method": "initialize_connection",
377
 
                         "args": {"volume_id": volume['id'],
378
 
                                  "connector": connector}})
379
 
 
380
 
    @wrap_check_policy
381
 
    def terminate_connection(self, context, volume, connector):
382
 
        self.unreserve_volume(context, volume)
383
 
        host = volume['host']
384
 
        queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
385
 
        return rpc.call(context, queue,
386
 
                        {"method": "terminate_connection",
387
 
                         "args": {"volume_id": volume['id'],
388
 
                                  "connector": connector}})
389
 
 
390
 
    def _create_snapshot(self, context, volume, name, description,
391
 
                         force=False):
392
 
        check_policy(context, 'create_snapshot', volume)
393
 
 
394
 
        if ((not force) and (volume['status'] != "available")):
395
 
            msg = _("must be available")
396
 
            raise exception.InvalidVolume(reason=msg)
397
 
 
398
 
        options = {
399
 
            'volume_id': volume['id'],
400
 
            'user_id': context.user_id,
401
 
            'project_id': context.project_id,
402
 
            'status': "creating",
403
 
            'progress': '0%',
404
 
            'volume_size': volume['size'],
405
 
            'display_name': name,
406
 
            'display_description': description}
407
 
 
408
 
        snapshot = self.db.snapshot_create(context, options)
409
 
        host = volume['host']
410
 
        rpc.cast(context,
411
 
                 rpc.queue_get_for(context, FLAGS.volume_topic, host),
412
 
                 {"method": "create_snapshot",
413
 
                  "args": {"volume_id": volume['id'],
414
 
                           "snapshot_id": snapshot['id']}})
415
 
        return snapshot
416
 
 
417
 
    def create_snapshot(self, context, volume, name, description):
418
 
        return self._create_snapshot(context, volume, name, description,
419
 
                                     False)
420
 
 
421
 
    def create_snapshot_force(self, context, volume, name, description):
422
 
        return self._create_snapshot(context, volume, name, description,
423
 
                                     True)
424
 
 
425
 
    @wrap_check_policy
426
 
    def delete_snapshot(self, context, snapshot):
427
 
        if snapshot['status'] not in ["available", "error"]:
428
 
            msg = _("Volume Snapshot status must be available or error")
429
 
            raise exception.InvalidVolume(reason=msg)
430
 
        self.db.snapshot_update(context, snapshot['id'],
431
 
                                {'status': 'deleting'})
432
 
        volume = self.db.volume_get(context, snapshot['volume_id'])
433
 
        host = volume['host']
434
 
        rpc.cast(context,
435
 
                 rpc.queue_get_for(context, FLAGS.volume_topic, host),
436
 
                 {"method": "delete_snapshot",
437
 
                  "args": {"snapshot_id": snapshot['id']}})
438
 
 
439
 
    @wrap_check_policy
440
 
    def get_volume_metadata(self, context, volume):
441
 
        """Get all metadata associated with a volume."""
442
 
        rv = self.db.volume_metadata_get(context, volume['id'])
443
 
        return dict(rv.iteritems())
444
 
 
445
 
    @wrap_check_policy
446
 
    def delete_volume_metadata(self, context, volume, key):
447
 
        """Delete the given metadata item from a volume."""
448
 
        self.db.volume_metadata_delete(context, volume['id'], key)
449
 
 
450
 
    @wrap_check_policy
451
 
    def update_volume_metadata(self, context, volume, metadata, delete=False):
452
 
        """Updates or creates volume metadata.
453
 
 
454
 
        If delete is True, metadata items that are not specified in the
455
 
        `metadata` argument will be deleted.
456
 
 
457
 
        """
458
 
        if delete:
459
 
            _metadata = metadata
460
 
        else:
461
 
            _metadata = self.get_volume_metadata(context, volume['id'])
462
 
            _metadata.update(metadata)
463
 
 
464
 
        self.db.volume_metadata_update(context, volume['id'], _metadata, True)
465
 
        return _metadata
466
 
 
467
 
    def get_volume_metadata_value(self, volume, key):
468
 
        """Get value of particular metadata key."""
469
 
        metadata = volume.get('volume_metadata')
470
 
        if metadata:
471
 
            for i in volume['volume_metadata']:
472
 
                if i['key'] == key:
473
 
                    return i['value']
474
 
        return None
475
 
 
476
 
    def _check_volume_availability(self, context, volume, force):
477
 
        """Check if the volume can be used."""
478
 
        if volume['status'] not in ['available', 'in-use']:
479
 
            msg = _('Volume status must be available/in-use.')
480
 
            raise exception.InvalidVolume(reason=msg)
481
 
        if not force and 'in-use' == volume['status']:
482
 
            msg = _('Volume status is in-use.')
483
 
            raise exception.InvalidVolume(reason=msg)
484
 
 
485
 
    @wrap_check_policy
486
 
    def copy_volume_to_image(self, context, volume, metadata, force):
487
 
        """Create a new image from the specified volume."""
488
 
        self._check_volume_availability(context, volume, force)
489
 
 
490
 
        recv_metadata = self.image_service.create(context, metadata)
491
 
        self.update(context, volume, {'status': 'uploading'})
492
 
        rpc.cast(context,
493
 
                 rpc.queue_get_for(context,
494
 
                                   FLAGS.volume_topic,
495
 
                                   volume['host']),
496
 
                 {"method": "copy_volume_to_image",
497
 
                  "args": {"volume_id": volume['id'],
498
 
                           "image_id": recv_metadata['id']}})
499
 
 
500
 
        response = {"id": volume['id'],
501
 
               "updated_at": volume['updated_at'],
502
 
               "status": 'uploading',
503
 
               "display_description": volume['display_description'],
504
 
               "size": volume['size'],
505
 
               "volume_type": volume['volume_type'],
506
 
               "image_id": recv_metadata['id'],
507
 
               "container_format": recv_metadata['container_format'],
508
 
               "disk_format": recv_metadata['disk_format'],
509
 
               "image_name": recv_metadata.get('name', None)
510
 
        }
511
 
        return response