58
60
'availability zone of this service')
59
61
flags.DEFINE_string('volume_driver', 'nova.volume.driver.ISCSIDriver',
60
62
'Driver to use for volume creation')
63
flags.DEFINE_string('vsa_volume_driver', 'nova.volume.san.ZadaraVsaDriver',
64
'Driver to use for FE/BE volume creation with VSA')
61
65
flags.DEFINE_boolean('use_local_volumes', True,
62
66
'if True, will not discover local volumes')
67
flags.DEFINE_integer('volume_state_interval', 60,
68
'Interval in seconds for querying volumes status')
65
71
class VolumeManager(manager.SchedulerDependentManager):
66
72
"""Manages attachable block storage devices."""
67
def __init__(self, volume_driver=None, *args, **kwargs):
73
def __init__(self, volume_driver=None, vsa_volume_driver=None,
68
75
"""Load the driver from the one specified in args, or from flags."""
69
76
if not volume_driver:
70
77
volume_driver = FLAGS.volume_driver
71
78
self.driver = utils.import_object(volume_driver)
79
if not vsa_volume_driver:
80
vsa_volume_driver = FLAGS.vsa_volume_driver
81
self.vsadriver = utils.import_object(vsa_volume_driver)
72
82
super(VolumeManager, self).__init__(service_name='volume',
74
84
# NOTE(vish): Implementation specific db handling is done
76
86
self.driver.db = self.db
87
self.vsadriver.db = self.db
88
self._last_volume_stats = []
89
#self._last_host_check = 0
91
def _get_driver(self, volume_ref):
92
if volume_ref['to_vsa_id'] is None and \
93
volume_ref['from_vsa_id'] is None:
78
98
def init_host(self):
79
99
"""Do any initialization that needs to be run if this is a
84
104
LOG.debug(_("Re-exporting %s volumes"), len(volumes))
85
105
for volume in volumes:
86
106
if volume['status'] in ['available', 'in-use']:
87
self.driver.ensure_export(ctxt, volume)
107
driver = self._get_driver(volume)
108
driver.ensure_export(ctxt, volume)
89
110
LOG.info(_("volume %s: skipping export"), volume['name'])
112
def create_volumes(self, context, request_spec, availability_zone):
113
LOG.info(_("create_volumes called with req=%(request_spec)s, "\
114
"availability_zone=%(availability_zone)s"), locals())
91
116
def create_volume(self, context, volume_id, snapshot_id=None):
92
117
"""Creates and exports the volume."""
93
118
context = context.elevated()
101
126
# before passing it to the driver.
102
127
volume_ref['host'] = self.host
129
driver = self._get_driver(volume_ref)
105
131
vol_name = volume_ref['name']
106
132
vol_size = volume_ref['size']
107
133
LOG.debug(_("volume %(vol_name)s: creating lv of"
108
134
" size %(vol_size)sG") % locals())
109
135
if snapshot_id == None:
110
model_update = self.driver.create_volume(volume_ref)
136
model_update = driver.create_volume(volume_ref)
112
138
snapshot_ref = self.db.snapshot_get(context, snapshot_id)
113
model_update = self.driver.create_volume_from_snapshot(
139
model_update = driver.create_volume_from_snapshot(
117
143
self.db.volume_update(context, volume_ref['id'], model_update)
119
145
LOG.debug(_("volume %s: creating export"), volume_ref['name'])
120
model_update = self.driver.create_export(context, volume_ref)
146
model_update = driver.create_export(context, volume_ref)
122
148
self.db.volume_update(context, volume_ref['id'], model_update)
124
151
self.db.volume_update(context,
125
152
volume_ref['id'], {'status': 'error'})
153
self._notify_vsa(context, volume_ref, 'error')
128
156
now = utils.utcnow()
130
158
volume_ref['id'], {'status': 'available',
131
159
'launched_at': now})
132
160
LOG.debug(_("volume %s: created successfully"), volume_ref['name'])
162
self._notify_vsa(context, volume_ref, 'available')
166
def _notify_vsa(self, context, volume_ref, status):
167
if volume_ref['to_vsa_id'] is not None:
170
{"method": "vsa_volume_created",
171
"args": {"vol_id": volume_ref['id'],
172
"vsa_id": volume_ref['to_vsa_id'],
135
175
def delete_volume(self, context, volume_id):
136
176
"""Deletes and unexports volume."""
137
177
context = context.elevated()
141
181
if volume_ref['host'] != self.host:
142
182
raise exception.Error(_("Volume is not local to this node"))
184
driver = self._get_driver(volume_ref)
145
186
LOG.debug(_("volume %s: removing export"), volume_ref['name'])
146
self.driver.remove_export(context, volume_ref)
187
driver.remove_export(context, volume_ref)
147
188
LOG.debug(_("volume %s: deleting"), volume_ref['name'])
148
self.driver.delete_volume(volume_ref)
189
driver.delete_volume(volume_ref)
149
190
except exception.VolumeIsBusy, e:
150
191
LOG.debug(_("volume %s: volume is busy"), volume_ref['name'])
151
self.driver.ensure_export(context, volume_ref)
192
driver.ensure_export(context, volume_ref)
152
193
self.db.volume_update(context, volume_ref['id'],
153
194
{'status': 'available'})
211
254
Returns path to device."""
212
255
context = context.elevated()
213
256
volume_ref = self.db.volume_get(context, volume_id)
257
driver = self._get_driver(volume_ref)
214
258
if volume_ref['host'] == self.host and FLAGS.use_local_volumes:
215
path = self.driver.local_path(volume_ref)
259
path = driver.local_path(volume_ref)
217
path = self.driver.discover_volume(context, volume_ref)
261
path = driver.discover_volume(context, volume_ref)
220
264
def remove_compute_volume(self, context, volume_id):
221
265
"""Remove remote volume on compute host."""
222
266
context = context.elevated()
223
267
volume_ref = self.db.volume_get(context, volume_id)
268
driver = self._get_driver(volume_ref)
224
269
if volume_ref['host'] == self.host and FLAGS.use_local_volumes:
227
self.driver.undiscover_volume(volume_ref)
272
driver.undiscover_volume(volume_ref)
229
274
def check_for_export(self, context, instance_id):
230
275
"""Make sure whether volume is exported."""
231
276
instance_ref = self.db.instance_get(context, instance_id)
232
277
for volume in instance_ref['volumes']:
233
self.driver.check_for_export(context, volume['id'])
278
driver = self._get_driver(volume)
279
driver.check_for_export(context, volume['id'])
281
def periodic_tasks(self, context=None):
282
"""Tasks to be run at a periodic interval."""
286
self._report_driver_status()
287
except Exception as ex:
288
LOG.warning(_("Error during report_driver_status(): %s"),
290
error_list.append(ex)
292
super(VolumeManager, self).periodic_tasks(context)
296
def _volume_stats_changed(self, stat1, stat2):
297
#LOG.info(_("stat1=%s"), stat1)
298
#LOG.info(_("stat2=%s"), stat2)
300
if len(stat1) != len(stat2):
302
for (k, v) in stat1.iteritems():
303
if (k, v) not in stat2.iteritems():
307
def _report_driver_status(self):
308
#curr_time = time.time()
309
#LOG.info(_("Report Volume node status"))
310
#if curr_time - self._last_host_check > FLAGS.volume_state_interval:
311
# self._last_host_check = curr_time
313
LOG.info(_("Updating volume status"))
315
volume_stats = self.vsadriver.get_volume_stats(refresh=True)
316
if self._volume_stats_changed(self._last_volume_stats, volume_stats):
317
LOG.info(_("New capabilities found: %s"), volume_stats)
318
self._last_volume_stats = volume_stats
320
# This will grab info about the host and queue it
321
# to be sent to the Schedulers.
322
self.update_service_capabilities(self._last_volume_stats)
324
self.update_service_capabilities(None)
326
def notification(self, context, event):
327
LOG.info(_("Notification {%s} received"), event)
328
self._last_volume_stats = []