25
from eventlet import greenthread
27
25
from nova.db import base
28
26
from nova import exception
29
27
from nova import flags
30
28
from nova import log as logging
29
from nova.openstack.common import rpc
30
from nova.openstack.common import timeutils
32
32
from nova import quota
34
from nova import utils
36
34
FLAGS = flags.FLAGS
37
35
flags.DECLARE('storage_availability_zone', 'nova.volume.manager')
122
120
"reservations": reservations}})
125
# TODO(yamahata): eliminate dumb polling
126
def wait_creation(self, context, volume):
127
volume_id = volume['id']
129
volume = self.get(context, volume_id)
130
if volume['status'] != 'creating':
134
123
@wrap_check_policy
135
124
def delete(self, context, volume):
136
125
volume_id = volume['id']
147
136
msg = _("Volume still has %d dependent snapshots") % len(snapshots)
148
137
raise exception.InvalidVolume(reason=msg)
139
now = timeutils.utcnow()
151
140
self.db.volume_update(context, volume_id, {'status': 'deleting',
152
141
'terminated_at': now})
153
142
host = volume['host']
154
143
rpc.cast(context,
155
self.db.queue_get_for(context, FLAGS.volume_topic, host),
144
rpc.queue_get_for(context, FLAGS.volume_topic, host),
156
145
{"method": "delete_volume",
157
146
"args": {"volume_id": volume_id}})
160
def update(self, context, volume, fields):
161
self.db.volume_update(context, volume['id'], fields)
163
148
def get(self, context, volume_id):
164
149
rv = self.db.volume_get(context, volume_id)
165
150
volume = dict(rv.iteritems())
235
220
msg = _("already detached")
236
221
raise exception.InvalidVolume(reason=msg)
238
def remove_from_compute(self, context, volume, instance_id, host):
239
"""Remove volume from specified compute host."""
241
self.db.queue_get_for(context, FLAGS.compute_topic, host),
242
{"method": "remove_volume_connection",
243
"args": {'instance_id': instance_id,
244
'volume_id': volume['id']}})
246
223
@wrap_check_policy
247
224
def reserve_volume(self, context, volume):
248
self.update(context, volume, {"status": "attaching"})
225
self.db.volume_update(context, volume['id'], {"status": "attaching"})
250
227
@wrap_check_policy
251
228
def unreserve_volume(self, context, volume):
252
229
if volume['status'] == "attaching":
253
self.update(context, volume, {"status": "available"})
230
self.db.volume_update(context,
232
{"status": "available"})
255
234
@wrap_check_policy
256
235
def attach(self, context, volume, instance_uuid, mountpoint):
257
236
host = volume['host']
258
queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
237
queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
259
238
return rpc.call(context, queue,
260
239
{"method": "attach_volume",
261
240
"args": {"volume_id": volume['id'],
265
244
@wrap_check_policy
266
245
def detach(self, context, volume):
267
246
host = volume['host']
268
queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
247
queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
269
248
return rpc.call(context, queue,
270
249
{"method": "detach_volume",
271
250
"args": {"volume_id": volume['id']}})
273
252
@wrap_check_policy
274
253
def initialize_connection(self, context, volume, connector):
275
254
host = volume['host']
276
queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
255
queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
277
256
return rpc.call(context, queue,
278
257
{"method": "initialize_connection",
279
258
"args": {"volume_id": volume['id'],
283
262
def terminate_connection(self, context, volume, connector):
284
263
self.unreserve_volume(context, volume)
285
264
host = volume['host']
286
queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
265
queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
287
266
return rpc.call(context, queue,
288
267
{"method": "terminate_connection",
289
268
"args": {"volume_id": volume['id'],
310
289
snapshot = self.db.snapshot_create(context, options)
311
290
host = volume['host']
312
291
rpc.cast(context,
313
self.db.queue_get_for(context, FLAGS.volume_topic, host),
292
rpc.queue_get_for(context, FLAGS.volume_topic, host),
314
293
{"method": "create_snapshot",
315
294
"args": {"volume_id": volume['id'],
316
295
"snapshot_id": snapshot['id']}})
334
313
volume = self.db.volume_get(context, snapshot['volume_id'])
335
314
host = volume['host']
336
315
rpc.cast(context,
337
self.db.queue_get_for(context, FLAGS.volume_topic, host),
316
rpc.queue_get_for(context, FLAGS.volume_topic, host),
338
317
{"method": "delete_snapshot",
339
318
"args": {"snapshot_id": snapshot['id']}})