~ubuntu-branches/ubuntu/quantal/nova/quantal-security

« back to all changes in this revision

Viewing changes to nova/volume/api.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Chuck Short, Adam Gandelman
  • Date: 2012-06-22 12:39:57 UTC
  • mfrom: (1.1.57)
  • Revision ID: package-import@ubuntu.com-20120622123957-hbzwg84nt9rqwg8r
Tags: 2012.2~f2~20120621.14517-0ubuntu1
[ Chuck Short ]
* New upstream version.

[ Adam Gandelman ]
* debian/rules: Temporarily disable test suite while blocking
  tests are investigated. 
* debian/patches/kombu_tests_timeout.patch: Dropped.

Show diffs side-by-side

added added

removed removed

Lines of Context:
22
22
 
23
23
import functools
24
24
 
25
 
from eventlet import greenthread
26
 
 
27
25
from nova.db import base
28
26
from nova import exception
29
27
from nova import flags
30
28
from nova import log as logging
 
29
from nova.openstack.common import rpc
 
30
from nova.openstack.common import timeutils
31
31
import nova.policy
32
32
from nova import quota
33
 
from nova import rpc
34
 
from nova import utils
35
33
 
36
34
FLAGS = flags.FLAGS
37
35
flags.DECLARE('storage_availability_zone', 'nova.volume.manager')
122
120
                           "reservations": reservations}})
123
121
        return volume
124
122
 
125
 
    # TODO(yamahata): eliminate dumb polling
126
 
    def wait_creation(self, context, volume):
127
 
        volume_id = volume['id']
128
 
        while True:
129
 
            volume = self.get(context, volume_id)
130
 
            if volume['status'] != 'creating':
131
 
                return
132
 
            greenthread.sleep(1)
133
 
 
134
123
    @wrap_check_policy
135
124
    def delete(self, context, volume):
136
125
        volume_id = volume['id']
147
136
            msg = _("Volume still has %d dependent snapshots") % len(snapshots)
148
137
            raise exception.InvalidVolume(reason=msg)
149
138
 
150
 
        now = utils.utcnow()
 
139
        now = timeutils.utcnow()
151
140
        self.db.volume_update(context, volume_id, {'status': 'deleting',
152
141
                                                   'terminated_at': now})
153
142
        host = volume['host']
154
143
        rpc.cast(context,
155
 
                 self.db.queue_get_for(context, FLAGS.volume_topic, host),
 
144
                 rpc.queue_get_for(context, FLAGS.volume_topic, host),
156
145
                 {"method": "delete_volume",
157
146
                  "args": {"volume_id": volume_id}})
158
147
 
159
 
    @wrap_check_policy
160
 
    def update(self, context, volume, fields):
161
 
        self.db.volume_update(context, volume['id'], fields)
162
 
 
163
148
    def get(self, context, volume_id):
164
149
        rv = self.db.volume_get(context, volume_id)
165
150
        volume = dict(rv.iteritems())
235
220
            msg = _("already detached")
236
221
            raise exception.InvalidVolume(reason=msg)
237
222
 
238
 
    def remove_from_compute(self, context, volume, instance_id, host):
239
 
        """Remove volume from specified compute host."""
240
 
        rpc.call(context,
241
 
                 self.db.queue_get_for(context, FLAGS.compute_topic, host),
242
 
                 {"method": "remove_volume_connection",
243
 
                  "args": {'instance_id': instance_id,
244
 
                           'volume_id': volume['id']}})
245
 
 
246
223
    @wrap_check_policy
247
224
    def reserve_volume(self, context, volume):
248
 
        self.update(context, volume, {"status": "attaching"})
 
225
        self.db.volume_update(context, volume['id'], {"status": "attaching"})
249
226
 
250
227
    @wrap_check_policy
251
228
    def unreserve_volume(self, context, volume):
252
229
        if volume['status'] == "attaching":
253
 
            self.update(context, volume, {"status": "available"})
 
230
            self.db.volume_update(context,
 
231
                                  volume['id'],
 
232
                                  {"status": "available"})
254
233
 
255
234
    @wrap_check_policy
256
235
    def attach(self, context, volume, instance_uuid, mountpoint):
257
236
        host = volume['host']
258
 
        queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
 
237
        queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
259
238
        return rpc.call(context, queue,
260
239
                        {"method": "attach_volume",
261
240
                         "args": {"volume_id": volume['id'],
265
244
    @wrap_check_policy
266
245
    def detach(self, context, volume):
267
246
        host = volume['host']
268
 
        queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
 
247
        queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
269
248
        return rpc.call(context, queue,
270
249
                 {"method": "detach_volume",
271
250
                  "args": {"volume_id": volume['id']}})
273
252
    @wrap_check_policy
274
253
    def initialize_connection(self, context, volume, connector):
275
254
        host = volume['host']
276
 
        queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
 
255
        queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
277
256
        return rpc.call(context, queue,
278
257
                        {"method": "initialize_connection",
279
258
                         "args": {"volume_id": volume['id'],
283
262
    def terminate_connection(self, context, volume, connector):
284
263
        self.unreserve_volume(context, volume)
285
264
        host = volume['host']
286
 
        queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
 
265
        queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
287
266
        return rpc.call(context, queue,
288
267
                        {"method": "terminate_connection",
289
268
                         "args": {"volume_id": volume['id'],
310
289
        snapshot = self.db.snapshot_create(context, options)
311
290
        host = volume['host']
312
291
        rpc.cast(context,
313
 
                 self.db.queue_get_for(context, FLAGS.volume_topic, host),
 
292
                 rpc.queue_get_for(context, FLAGS.volume_topic, host),
314
293
                 {"method": "create_snapshot",
315
294
                  "args": {"volume_id": volume['id'],
316
295
                           "snapshot_id": snapshot['id']}})
334
313
        volume = self.db.volume_get(context, snapshot['volume_id'])
335
314
        host = volume['host']
336
315
        rpc.cast(context,
337
 
                 self.db.queue_get_for(context, FLAGS.volume_topic, host),
 
316
                 rpc.queue_get_for(context, FLAGS.volume_topic, host),
338
317
                 {"method": "delete_snapshot",
339
318
                  "args": {"snapshot_id": snapshot['id']}})
340
319