~0x44/nova/extdoc

« back to all changes in this revision

Viewing changes to nova/volume/manager.py

  • Committer: vladimir.p
  • Date: 2011-07-16 00:56:27 UTC
  • mto: This revision was merged to the branch mainline in revision 1502.
  • Revision ID: vladimir@zadarastorage.com-20110716005627-xjsht4899gku39hr
VSA: first cut. merged with 1279

Show diffs side-by-side

added added

removed removed

Lines of Context:
42
42
 
43
43
"""
44
44
 
 
45
import time
45
46
 
46
47
from nova import context
47
48
from nova import exception
49
50
from nova import log as logging
50
51
from nova import manager
51
52
from nova import utils
 
53
from nova import rpc
52
54
 
53
55
 
54
56
LOG = logging.getLogger('nova.volume.manager')
58
60
                    'availability zone of this service')
59
61
flags.DEFINE_string('volume_driver', 'nova.volume.driver.ISCSIDriver',
60
62
                    'Driver to use for volume creation')
 
63
flags.DEFINE_string('vsa_volume_driver', 'nova.volume.san.ZadaraVsaDriver',
 
64
                    'Driver to use for FE/BE volume creation with VSA')
61
65
flags.DEFINE_boolean('use_local_volumes', True,
62
66
                     'if True, will not discover local volumes')
 
67
flags.DEFINE_integer('volume_state_interval', 60,
 
68
                     'Interval in seconds for querying volumes status')
63
69
 
64
70
 
65
71
class VolumeManager(manager.SchedulerDependentManager):
66
72
    """Manages attachable block storage devices."""
67
 
    def __init__(self, volume_driver=None, *args, **kwargs):
 
73
    def __init__(self, volume_driver=None, vsa_volume_driver=None,
 
74
                *args, **kwargs):
68
75
        """Load the driver from the one specified in args, or from flags."""
69
76
        if not volume_driver:
70
77
            volume_driver = FLAGS.volume_driver
71
78
        self.driver = utils.import_object(volume_driver)
 
79
        if not vsa_volume_driver:
 
80
            vsa_volume_driver = FLAGS.vsa_volume_driver
 
81
        self.vsadriver = utils.import_object(vsa_volume_driver)
72
82
        super(VolumeManager, self).__init__(service_name='volume',
73
83
                                                    *args, **kwargs)
74
84
        # NOTE(vish): Implementation specific db handling is done
75
85
        #             by the driver.
76
86
        self.driver.db = self.db
 
87
        self.vsadriver.db = self.db
 
88
        self._last_volume_stats = []
 
89
        #self._last_host_check = 0
 
90
 
 
91
    def _get_driver(self, volume_ref):
 
92
        if volume_ref['to_vsa_id'] is None and \
 
93
           volume_ref['from_vsa_id'] is None:
 
94
            return self.driver
 
95
        else:
 
96
            return self.vsadriver
77
97
 
78
98
    def init_host(self):
79
99
        """Do any initialization that needs to be run if this is a
84
104
        LOG.debug(_("Re-exporting %s volumes"), len(volumes))
85
105
        for volume in volumes:
86
106
            if volume['status'] in ['available', 'in-use']:
87
 
                self.driver.ensure_export(ctxt, volume)
 
107
                driver = self._get_driver(volume)
 
108
                driver.ensure_export(ctxt, volume)
88
109
            else:
89
110
                LOG.info(_("volume %s: skipping export"), volume['name'])
90
111
 
 
112
    def create_volumes(self, context, request_spec, availability_zone):
 
113
        LOG.info(_("create_volumes called with req=%(request_spec)s, "\
 
114
                   "availability_zone=%(availability_zone)s"), locals())
 
115
 
91
116
    def create_volume(self, context, volume_id, snapshot_id=None):
92
117
        """Creates and exports the volume."""
93
118
        context = context.elevated()
101
126
        #             before passing it to the driver.
102
127
        volume_ref['host'] = self.host
103
128
 
 
129
        driver = self._get_driver(volume_ref)
104
130
        try:
105
131
            vol_name = volume_ref['name']
106
132
            vol_size = volume_ref['size']
107
133
            LOG.debug(_("volume %(vol_name)s: creating lv of"
108
134
                    " size %(vol_size)sG") % locals())
109
135
            if snapshot_id == None:
110
 
                model_update = self.driver.create_volume(volume_ref)
 
136
                model_update = driver.create_volume(volume_ref)
111
137
            else:
112
138
                snapshot_ref = self.db.snapshot_get(context, snapshot_id)
113
 
                model_update = self.driver.create_volume_from_snapshot(
 
139
                model_update = driver.create_volume_from_snapshot(
114
140
                    volume_ref,
115
141
                    snapshot_ref)
116
142
            if model_update:
117
143
                self.db.volume_update(context, volume_ref['id'], model_update)
118
144
 
119
145
            LOG.debug(_("volume %s: creating export"), volume_ref['name'])
120
 
            model_update = self.driver.create_export(context, volume_ref)
 
146
            model_update = driver.create_export(context, volume_ref)
121
147
            if model_update:
122
148
                self.db.volume_update(context, volume_ref['id'], model_update)
123
 
        except Exception:
 
149
        # except Exception:
 
150
        except:
124
151
            self.db.volume_update(context,
125
152
                                  volume_ref['id'], {'status': 'error'})
 
153
            self._notify_vsa(context, volume_ref, 'error')
126
154
            raise
127
155
 
128
156
        now = utils.utcnow()
130
158
                              volume_ref['id'], {'status': 'available',
131
159
                                                 'launched_at': now})
132
160
        LOG.debug(_("volume %s: created successfully"), volume_ref['name'])
 
161
 
 
162
        self._notify_vsa(context, volume_ref, 'available')
 
163
 
133
164
        return volume_id
134
165
 
 
166
    def _notify_vsa(self, context, volume_ref, status):
 
167
        if volume_ref['to_vsa_id'] is not None:
 
168
            rpc.cast(context,
 
169
                     FLAGS.vsa_topic,
 
170
                     {"method": "vsa_volume_created",
 
171
                      "args": {"vol_id": volume_ref['id'],
 
172
                               "vsa_id": volume_ref['to_vsa_id'],
 
173
                               "status": status}})
 
174
 
135
175
    def delete_volume(self, context, volume_id):
136
176
        """Deletes and unexports volume."""
137
177
        context = context.elevated()
141
181
        if volume_ref['host'] != self.host:
142
182
            raise exception.Error(_("Volume is not local to this node"))
143
183
 
 
184
        driver = self._get_driver(volume_ref)
144
185
        try:
145
186
            LOG.debug(_("volume %s: removing export"), volume_ref['name'])
146
 
            self.driver.remove_export(context, volume_ref)
 
187
            driver.remove_export(context, volume_ref)
147
188
            LOG.debug(_("volume %s: deleting"), volume_ref['name'])
148
 
            self.driver.delete_volume(volume_ref)
 
189
            driver.delete_volume(volume_ref)
149
190
        except exception.VolumeIsBusy, e:
150
191
            LOG.debug(_("volume %s: volume is busy"), volume_ref['name'])
151
 
            self.driver.ensure_export(context, volume_ref)
 
192
            driver.ensure_export(context, volume_ref)
152
193
            self.db.volume_update(context, volume_ref['id'],
153
194
                                  {'status': 'available'})
154
195
            return True
171
212
        try:
172
213
            snap_name = snapshot_ref['name']
173
214
            LOG.debug(_("snapshot %(snap_name)s: creating") % locals())
 
215
            # snapshot-related operations are irrelevant for vsadriver
174
216
            model_update = self.driver.create_snapshot(snapshot_ref)
175
217
            if model_update:
176
218
                self.db.snapshot_update(context, snapshot_ref['id'],
194
236
 
195
237
        try:
196
238
            LOG.debug(_("snapshot %s: deleting"), snapshot_ref['name'])
 
239
            # snapshot-related operations are irrelevant for vsadriver
197
240
            self.driver.delete_snapshot(snapshot_ref)
198
241
        except Exception:
199
242
            self.db.snapshot_update(context,
211
254
        Returns path to device."""
212
255
        context = context.elevated()
213
256
        volume_ref = self.db.volume_get(context, volume_id)
 
257
        driver = self._get_driver(volume_ref)
214
258
        if volume_ref['host'] == self.host and FLAGS.use_local_volumes:
215
 
            path = self.driver.local_path(volume_ref)
 
259
            path = driver.local_path(volume_ref)
216
260
        else:
217
 
            path = self.driver.discover_volume(context, volume_ref)
 
261
            path = driver.discover_volume(context, volume_ref)
218
262
        return path
219
263
 
220
264
    def remove_compute_volume(self, context, volume_id):
221
265
        """Remove remote volume on compute host."""
222
266
        context = context.elevated()
223
267
        volume_ref = self.db.volume_get(context, volume_id)
 
268
        driver = self._get_driver(volume_ref)
224
269
        if volume_ref['host'] == self.host and FLAGS.use_local_volumes:
225
270
            return True
226
271
        else:
227
 
            self.driver.undiscover_volume(volume_ref)
 
272
            driver.undiscover_volume(volume_ref)
228
273
 
229
274
    def check_for_export(self, context, instance_id):
230
275
        """Make sure whether volume is exported."""
231
276
        instance_ref = self.db.instance_get(context, instance_id)
232
277
        for volume in instance_ref['volumes']:
233
 
            self.driver.check_for_export(context, volume['id'])
 
278
            driver = self._get_driver(volume)
 
279
            driver.check_for_export(context, volume['id'])
 
280
 
 
281
    def periodic_tasks(self, context=None):
 
282
        """Tasks to be run at a periodic interval."""
 
283
 
 
284
        error_list = []
 
285
        try:
 
286
            self._report_driver_status()
 
287
        except Exception as ex:
 
288
            LOG.warning(_("Error during report_driver_status(): %s"),
 
289
                        unicode(ex))
 
290
            error_list.append(ex)
 
291
 
 
292
        super(VolumeManager, self).periodic_tasks(context)
 
293
 
 
294
        return error_list
 
295
 
 
296
    def _volume_stats_changed(self, stat1, stat2):
 
297
        #LOG.info(_("stat1=%s"), stat1)
 
298
        #LOG.info(_("stat2=%s"), stat2)
 
299
 
 
300
        if len(stat1) != len(stat2):
 
301
            return True
 
302
        for (k, v) in stat1.iteritems():
 
303
            if (k, v) not in stat2.iteritems():
 
304
                return True
 
305
        return False
 
306
 
 
307
    def _report_driver_status(self):
 
308
        #curr_time = time.time()
 
309
        #LOG.info(_("Report Volume node status"))
 
310
        #if curr_time - self._last_host_check > FLAGS.volume_state_interval:
 
311
        #    self._last_host_check = curr_time
 
312
 
 
313
        LOG.info(_("Updating volume status"))
 
314
 
 
315
        volume_stats = self.vsadriver.get_volume_stats(refresh=True)
 
316
        if self._volume_stats_changed(self._last_volume_stats, volume_stats):
 
317
            LOG.info(_("New capabilities found: %s"), volume_stats)
 
318
            self._last_volume_stats = volume_stats
 
319
 
 
320
            # This will grab info about the host and queue it
 
321
            # to be sent to the Schedulers.
 
322
            self.update_service_capabilities(self._last_volume_stats)
 
323
        else:
 
324
            self.update_service_capabilities(None)
 
325
 
 
326
    def notification(self, context, event):
 
327
        LOG.info(_("Notification {%s} received"), event)
 
328
        self._last_volume_stats = []