1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright (c) 2011 Zadara Storage Inc.
4
# Copyright (c) 2011 OpenStack LLC.
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may
7
# not use this file except in compliance with the License. You may obtain
8
# a copy of the License at
10
# http://www.apache.org/licenses/LICENSE-2.0
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15
# License for the specific language governing permissions and limitations
19
Handles all requests relating to Virtual Storage Arrays (VSAs).
21
Experimental code. Requires special VSA image.
22
For assistance and guidelines pls contact
23
Zadara Storage Inc & Openstack community
28
from nova import compute
30
from nova import exception
31
from nova import flags
32
from nova import log as logging
34
from nova import volume
35
from nova.compute import instance_types
36
from nova.db import base
37
from nova.volume import volume_types
41
CREATING = 'creating' # VSA creating (not ready yet)
42
LAUNCHING = 'launching' # Launching VCs (all BE volumes were created)
43
CREATED = 'created' # VSA fully created and ready for use
44
PARTIAL = 'partial' # Some BE drives were allocated
45
FAILED = 'failed' # Some BE storage allocations failed
46
DELETING = 'deleting' # VSA started the deletion procedure
50
flags.DEFINE_string('vsa_ec2_access_key', None,
51
'EC2 access key used by VSA for accessing nova')
52
flags.DEFINE_string('vsa_ec2_user_id', None,
53
'User ID used by VSA for accessing nova')
54
flags.DEFINE_boolean('vsa_multi_vol_creation', True,
55
'Ask scheduler to create multiple volumes in one call')
56
flags.DEFINE_string('vsa_volume_type_name', 'VSA volume type',
57
'Name of volume type associated with FE VSA volumes')
59
LOG = logging.getLogger('nova.vsa')
63
"""API for interacting with the VSA manager."""
65
def __init__(self, compute_api=None, volume_api=None, **kwargs):
66
self.compute_api = compute_api or compute.API()
67
self.volume_api = volume_api or volume.API()
68
super(API, self).__init__(**kwargs)
70
def _check_volume_type_correctness(self, vol_type):
71
if vol_type.get('extra_specs') == None or\
72
vol_type['extra_specs'].get('type') != 'vsa_drive' or\
73
vol_type['extra_specs'].get('drive_type') == None or\
74
vol_type['extra_specs'].get('drive_size') == None:
76
raise exception.ApiError(_("Invalid drive type %s")
79
def _get_default_vsa_instance_type(self):
80
return instance_types.get_instance_type_by_name(
81
FLAGS.default_vsa_instance_type)
83
def _check_storage_parameters(self, context, vsa_name, storage,
84
shared, first_index=0):
86
Translates storage array of disks to the list of volumes
87
:param storage: List of dictionaries with following keys:
88
disk_name, num_disks, size
89
:param shared: Specifies if storage is dedicated or shared.
90
For shared storage disks split into partitions
95
name = node.get('drive_name', None)
96
num_disks = node.get('num_drives', 1)
99
raise exception.ApiError(_("No drive_name param found in %s")
102
vol_type = volume_types.get_volume_type_by_name(context, name)
103
except exception.NotFound:
104
raise exception.ApiError(_("Invalid drive type name %s")
107
self._check_volume_type_correctness(vol_type)
109
# if size field present - override disk size specified in DB
110
size = int(node.get('size',
111
vol_type['extra_specs'].get('drive_size')))
114
part_size = FLAGS.vsa_part_size_gb
115
total_capacity = num_disks * size
116
num_volumes = total_capacity / part_size
119
num_volumes = num_disks
120
size = 0 # special handling for full drives
122
for i in range(num_volumes):
123
volume_name = "drive-%03d" % first_index
125
volume_desc = 'BE volume for VSA %s type %s' % \
130
'description': volume_desc,
131
'volume_type_id': vol_type['id'],
133
volume_params.append(volume)
137
def create(self, context, display_name='', display_description='',
138
vc_count=1, instance_type=None, image_name=None,
139
availability_zone=None, storage=[], shared=None):
141
Provision VSA instance with corresponding compute instances
142
and associated volumes
143
:param storage: List of dictionaries with following keys:
144
disk_name, num_disks, size
145
:param shared: Specifies if storage is dedicated or shared.
146
For shared storage disks split into partitions
149
LOG.info(_("*** Experimental VSA code ***"))
151
if vc_count > FLAGS.max_vcs_in_vsa:
152
LOG.warning(_("Requested number of VCs (%d) is too high."\
153
" Setting to default"), vc_count)
154
vc_count = FLAGS.max_vcs_in_vsa
156
if instance_type is None:
157
instance_type = self._get_default_vsa_instance_type()
159
if availability_zone is None:
160
availability_zone = FLAGS.storage_availability_zone
165
if shared is None or shared == 'False' or shared == False:
170
# check if image is ready before starting any work
171
if image_name is None:
172
image_name = FLAGS.vc_image_name
174
image_service = self.compute_api.image_service
175
vc_image = image_service.show_by_name(context, image_name)
176
vc_image_href = vc_image['id']
177
except exception.ImageNotFound:
178
raise exception.ApiError(_("Failed to find configured image %s")
182
'display_name': display_name,
183
'display_description': display_description,
184
'project_id': context.project_id,
185
'availability_zone': availability_zone,
186
'instance_type_id': instance_type['id'],
187
'image_ref': vc_image_href,
188
'vc_count': vc_count,
189
'status': VsaState.CREATING,
191
LOG.info(_("Creating VSA: %s") % options)
193
# create DB entry for VSA instance
195
vsa_ref = self.db.vsa_create(context, options)
196
except exception.Error:
197
raise exception.ApiError(_(sys.exc_info()[1]))
198
vsa_id = vsa_ref['id']
199
vsa_name = vsa_ref['name']
201
# check storage parameters
203
volume_params = self._check_storage_parameters(context, vsa_name,
205
except exception.ApiError:
206
self.db.vsa_destroy(context, vsa_id)
207
raise exception.ApiError(_("Error in storage parameters: %s")
210
# after creating DB entry, re-check and set some defaults
212
if (not hasattr(vsa_ref, 'display_name') or
213
vsa_ref.display_name is None or
214
vsa_ref.display_name == ''):
215
updates['display_name'] = display_name = vsa_name
216
updates['vol_count'] = len(volume_params)
217
vsa_ref = self.update(context, vsa_id, **updates)
220
if FLAGS.vsa_multi_vol_creation:
221
if len(volume_params) > 0:
223
'num_volumes': len(volume_params),
224
'vsa_id': str(vsa_id),
225
'volumes': volume_params,
229
FLAGS.scheduler_topic,
230
{"method": "create_volumes",
231
"args": {"topic": FLAGS.volume_topic,
232
"request_spec": request_spec,
233
"availability_zone": availability_zone}})
235
# create BE volumes one-by-one
236
for vol in volume_params:
238
vol_name = vol['name']
239
vol_size = vol['size']
240
vol_type_id = vol['volume_type_id']
241
LOG.debug(_("VSA ID %(vsa_id)d %(vsa_name)s: Create "\
242
"volume %(vol_name)s, %(vol_size)d GB, "\
243
"type %(vol_type_id)s"), locals())
245
vol_type = volume_types.get_volume_type(context,
246
vol['volume_type_id'])
248
vol_ref = self.volume_api.create(context,
253
volume_type=vol_type,
254
metadata=dict(to_vsa_id=str(vsa_id)),
255
availability_zone=availability_zone)
257
self.update_vsa_status(context, vsa_id,
258
status=VsaState.PARTIAL)
261
if len(volume_params) == 0:
262
# No BE volumes - ask VSA manager to start VCs
265
{"method": "create_vsa",
266
"args": {"vsa_id": str(vsa_id)}})
270
def update_vsa_status(self, context, vsa_id, status):
271
updates = dict(status=status)
272
LOG.info(_("VSA ID %(vsa_id)d: Update VSA status to %(status)s"),
274
return self.update(context, vsa_id, **updates)
276
def update(self, context, vsa_id, **kwargs):
277
"""Updates the VSA instance in the datastore.
279
:param context: The security context
280
:param vsa_id: ID of the VSA instance to update
281
:param kwargs: All additional keyword args are treated
282
as data fields of the instance to be
287
LOG.info(_("VSA ID %(vsa_id)d: Update VSA call"), locals())
289
updatable_fields = ['status', 'vc_count', 'vol_count',
290
'display_name', 'display_description']
292
for field in updatable_fields:
294
changes[field] = kwargs[field]
296
vc_count = kwargs.get('vc_count', None)
297
if vc_count is not None:
298
# VP-TODO: This request may want to update number of VCs
299
# Get number of current VCs and add/delete VCs appropriately
300
vsa = self.get(context, vsa_id)
301
vc_count = int(vc_count)
302
if vc_count > FLAGS.max_vcs_in_vsa:
303
LOG.warning(_("Requested number of VCs (%d) is too high."\
304
" Setting to default"), vc_count)
305
vc_count = FLAGS.max_vcs_in_vsa
307
if vsa['vc_count'] != vc_count:
308
self.update_num_vcs(context, vsa, vc_count)
309
changes['vc_count'] = vc_count
311
return self.db.vsa_update(context, vsa_id, changes)
313
def update_num_vcs(self, context, vsa, vc_count):
314
vsa_name = vsa['name']
315
old_vc_count = int(vsa['vc_count'])
316
if vc_count > old_vc_count:
317
add_cnt = vc_count - old_vc_count
318
LOG.debug(_("Adding %(add_cnt)s VCs to VSA %(vsa_name)s."),
320
# VP-TODO: actual code for adding new VCs
322
elif vc_count < old_vc_count:
323
del_cnt = old_vc_count - vc_count
324
LOG.debug(_("Deleting %(del_cnt)s VCs from VSA %(vsa_name)s."),
326
# VP-TODO: actual code for deleting extra VCs
328
def _force_volume_delete(self, ctxt, volume):
329
"""Delete a volume, bypassing the check that it must be available."""
330
host = volume['host']
332
# Deleting volume from database and skipping rpc.
333
self.db.volume_destroy(ctxt, volume['id'])
337
self.db.queue_get_for(ctxt, FLAGS.volume_topic, host),
338
{"method": "delete_volume",
339
"args": {"volume_id": volume['id']}})
341
def delete_vsa_volumes(self, context, vsa_id, direction,
343
if direction == "FE":
344
volumes = self.get_all_vsa_volumes(context, vsa_id)
346
volumes = self.get_all_vsa_drives(context, vsa_id)
348
for volume in volumes:
350
vol_name = volume['name']
351
LOG.info(_("VSA ID %(vsa_id)s: Deleting %(direction)s "\
352
"volume %(vol_name)s"), locals())
353
self.volume_api.delete(context, volume['id'])
354
except exception.ApiError:
355
LOG.info(_("Unable to delete volume %s"), volume['name'])
357
LOG.info(_("VSA ID %(vsa_id)s: Forced delete. "\
358
"%(direction)s volume %(vol_name)s"), locals())
359
self._force_volume_delete(context, volume)
361
def delete(self, context, vsa_id):
362
"""Terminate a VSA instance."""
363
LOG.info(_("Going to try to terminate VSA ID %s"), vsa_id)
365
# Delete all FrontEnd and BackEnd volumes
366
self.delete_vsa_volumes(context, vsa_id, "FE", force_delete=True)
367
self.delete_vsa_volumes(context, vsa_id, "BE", force_delete=True)
369
# Delete all VC instances
370
instances = self.compute_api.get_all(context,
371
search_opts={'metadata': dict(vsa_id=str(vsa_id))})
372
for instance in instances:
373
name = instance['name']
374
LOG.debug(_("VSA ID %(vsa_id)s: Delete instance %(name)s"),
376
self.compute_api.delete(context, instance['id'])
378
# Delete VSA instance
379
self.db.vsa_destroy(context, vsa_id)
381
def get(self, context, vsa_id):
382
rv = self.db.vsa_get(context, vsa_id)
385
def get_all(self, context):
387
return self.db.vsa_get_all(context)
388
return self.db.vsa_get_all_by_project(context, context.project_id)
390
def get_vsa_volume_type(self, context):
391
name = FLAGS.vsa_volume_type_name
393
vol_type = volume_types.get_volume_type_by_name(context, name)
394
except exception.NotFound:
395
volume_types.create(context, name,
396
extra_specs=dict(type='vsa_volume'))
397
vol_type = volume_types.get_volume_type_by_name(context, name)
401
def get_all_vsa_instances(self, context, vsa_id):
402
return self.compute_api.get_all(context,
403
search_opts={'metadata': dict(vsa_id=str(vsa_id))})
405
def get_all_vsa_volumes(self, context, vsa_id):
406
return self.volume_api.get_all(context,
407
search_opts={'metadata': dict(from_vsa_id=str(vsa_id))})
409
def get_all_vsa_drives(self, context, vsa_id):
410
return self.volume_api.get_all(context,
411
search_opts={'metadata': dict(to_vsa_id=str(vsa_id))})