~cloudbuilders/nova/os-keypair-integration

« back to all changes in this revision

Viewing changes to nova/vsa/api.py

  • Committer: Vishvananda Ishaya
  • Date: 2011-08-31 18:51:19 UTC
  • mfrom: (1455.1.56 nova)
  • Revision ID: vishvananda@gmail.com-20110831185119-04pppkh6iv7v6p6i
merged trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
 
 
3
# Copyright (c) 2011 Zadara Storage Inc.
 
4
# Copyright (c) 2011 OpenStack LLC.
 
5
#
 
6
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 
7
#    not use this file except in compliance with the License. You may obtain
 
8
#    a copy of the License at
 
9
#
 
10
#         http://www.apache.org/licenses/LICENSE-2.0
 
11
#
 
12
#    Unless required by applicable law or agreed to in writing, software
 
13
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
14
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
15
#    License for the specific language governing permissions and limitations
 
16
#    under the License.
 
17
 
 
18
"""
 
19
Handles all requests relating to Virtual Storage Arrays (VSAs).
 
20
 
 
21
Experimental code. Requires special VSA image.
 
22
For assistance and guidelines pls contact
 
23
    Zadara Storage Inc & Openstack community
 
24
"""
 
25
 
 
26
import sys
 
27
 
 
28
from nova import compute
 
29
from nova import db
 
30
from nova import exception
 
31
from nova import flags
 
32
from nova import log as logging
 
33
from nova import rpc
 
34
from nova import volume
 
35
from nova.compute import instance_types
 
36
from nova.db import base
 
37
from nova.volume import volume_types
 
38
 
 
39
 
 
40
class VsaState:
 
41
    CREATING = 'creating'       # VSA creating (not ready yet)
 
42
    LAUNCHING = 'launching'     # Launching VCs (all BE volumes were created)
 
43
    CREATED = 'created'         # VSA fully created and ready for use
 
44
    PARTIAL = 'partial'         # Some BE drives were allocated
 
45
    FAILED = 'failed'           # Some BE storage allocations failed
 
46
    DELETING = 'deleting'       # VSA started the deletion procedure
 
47
 
 
48
 
 
49
FLAGS = flags.FLAGS
 
50
flags.DEFINE_string('vsa_ec2_access_key', None,
 
51
                    'EC2 access key used by VSA for accessing nova')
 
52
flags.DEFINE_string('vsa_ec2_user_id', None,
 
53
                    'User ID used by VSA for accessing nova')
 
54
flags.DEFINE_boolean('vsa_multi_vol_creation', True,
 
55
                  'Ask scheduler to create multiple volumes in one call')
 
56
flags.DEFINE_string('vsa_volume_type_name', 'VSA volume type',
 
57
                    'Name of volume type associated with FE VSA volumes')
 
58
 
 
59
LOG = logging.getLogger('nova.vsa')
 
60
 
 
61
 
 
62
class API(base.Base):
 
63
    """API for interacting with the VSA manager."""
 
64
 
 
65
    def __init__(self, compute_api=None, volume_api=None, **kwargs):
 
66
        self.compute_api = compute_api or compute.API()
 
67
        self.volume_api = volume_api or volume.API()
 
68
        super(API, self).__init__(**kwargs)
 
69
 
 
70
    def _check_volume_type_correctness(self, vol_type):
 
71
        if vol_type.get('extra_specs') == None or\
 
72
           vol_type['extra_specs'].get('type') != 'vsa_drive' or\
 
73
           vol_type['extra_specs'].get('drive_type') == None or\
 
74
           vol_type['extra_specs'].get('drive_size') == None:
 
75
 
 
76
            raise exception.ApiError(_("Invalid drive type %s")
 
77
                                        % vol_type['name'])
 
78
 
 
79
    def _get_default_vsa_instance_type(self):
 
80
        return instance_types.get_instance_type_by_name(
 
81
                                FLAGS.default_vsa_instance_type)
 
82
 
 
83
    def _check_storage_parameters(self, context, vsa_name, storage,
 
84
                                  shared, first_index=0):
 
85
        """
 
86
        Translates storage array of disks to the list of volumes
 
87
        :param storage: List of dictionaries with following keys:
 
88
                        disk_name, num_disks, size
 
89
        :param shared: Specifies if storage is dedicated or shared.
 
90
                       For shared storage disks split into partitions
 
91
        """
 
92
        volume_params = []
 
93
        for node in storage:
 
94
 
 
95
            name = node.get('drive_name', None)
 
96
            num_disks = node.get('num_drives', 1)
 
97
 
 
98
            if name is None:
 
99
                raise exception.ApiError(_("No drive_name param found in %s")
 
100
                                            % node)
 
101
            try:
 
102
                vol_type = volume_types.get_volume_type_by_name(context, name)
 
103
            except exception.NotFound:
 
104
                raise exception.ApiError(_("Invalid drive type name %s")
 
105
                                            % name)
 
106
 
 
107
            self._check_volume_type_correctness(vol_type)
 
108
 
 
109
            # if size field present - override disk size specified in DB
 
110
            size = int(node.get('size',
 
111
                                vol_type['extra_specs'].get('drive_size')))
 
112
 
 
113
            if shared:
 
114
                part_size = FLAGS.vsa_part_size_gb
 
115
                total_capacity = num_disks * size
 
116
                num_volumes = total_capacity / part_size
 
117
                size = part_size
 
118
            else:
 
119
                num_volumes = num_disks
 
120
                size = 0    # special handling for full drives
 
121
 
 
122
            for i in range(num_volumes):
 
123
                volume_name = "drive-%03d" % first_index
 
124
                first_index += 1
 
125
                volume_desc = 'BE volume for VSA %s type %s' % \
 
126
                              (vsa_name, name)
 
127
                volume = {
 
128
                    'size': size,
 
129
                    'name': volume_name,
 
130
                    'description': volume_desc,
 
131
                    'volume_type_id': vol_type['id'],
 
132
                    }
 
133
                volume_params.append(volume)
 
134
 
 
135
        return volume_params
 
136
 
 
137
    def create(self, context, display_name='', display_description='',
 
138
                vc_count=1, instance_type=None, image_name=None,
 
139
                availability_zone=None, storage=[], shared=None):
 
140
        """
 
141
        Provision VSA instance with corresponding compute instances
 
142
        and associated volumes
 
143
        :param storage: List of dictionaries with following keys:
 
144
                        disk_name, num_disks, size
 
145
        :param shared: Specifies if storage is dedicated or shared.
 
146
                       For shared storage disks split into partitions
 
147
        """
 
148
 
 
149
        LOG.info(_("*** Experimental VSA code ***"))
 
150
 
 
151
        if vc_count > FLAGS.max_vcs_in_vsa:
 
152
            LOG.warning(_("Requested number of VCs (%d) is too high."\
 
153
                          " Setting to default"), vc_count)
 
154
            vc_count = FLAGS.max_vcs_in_vsa
 
155
 
 
156
        if instance_type is None:
 
157
            instance_type = self._get_default_vsa_instance_type()
 
158
 
 
159
        if availability_zone is None:
 
160
            availability_zone = FLAGS.storage_availability_zone
 
161
 
 
162
        if storage is None:
 
163
            storage = []
 
164
 
 
165
        if shared is None or shared == 'False' or shared == False:
 
166
            shared = False
 
167
        else:
 
168
            shared = True
 
169
 
 
170
        # check if image is ready before starting any work
 
171
        if image_name is None:
 
172
            image_name = FLAGS.vc_image_name
 
173
        try:
 
174
            image_service = self.compute_api.image_service
 
175
            vc_image = image_service.show_by_name(context, image_name)
 
176
            vc_image_href = vc_image['id']
 
177
        except exception.ImageNotFound:
 
178
            raise exception.ApiError(_("Failed to find configured image %s")
 
179
                                        % image_name)
 
180
 
 
181
        options = {
 
182
            'display_name': display_name,
 
183
            'display_description': display_description,
 
184
            'project_id': context.project_id,
 
185
            'availability_zone': availability_zone,
 
186
            'instance_type_id': instance_type['id'],
 
187
            'image_ref': vc_image_href,
 
188
            'vc_count': vc_count,
 
189
            'status': VsaState.CREATING,
 
190
        }
 
191
        LOG.info(_("Creating VSA: %s") % options)
 
192
 
 
193
        # create DB entry for VSA instance
 
194
        try:
 
195
            vsa_ref = self.db.vsa_create(context, options)
 
196
        except exception.Error:
 
197
            raise exception.ApiError(_(sys.exc_info()[1]))
 
198
        vsa_id = vsa_ref['id']
 
199
        vsa_name = vsa_ref['name']
 
200
 
 
201
        # check storage parameters
 
202
        try:
 
203
            volume_params = self._check_storage_parameters(context, vsa_name,
 
204
                                                           storage, shared)
 
205
        except exception.ApiError:
 
206
            self.db.vsa_destroy(context, vsa_id)
 
207
            raise exception.ApiError(_("Error in storage parameters: %s")
 
208
                                        % storage)
 
209
 
 
210
        # after creating DB entry, re-check and set some defaults
 
211
        updates = {}
 
212
        if (not hasattr(vsa_ref, 'display_name') or
 
213
                vsa_ref.display_name is None or
 
214
                vsa_ref.display_name == ''):
 
215
            updates['display_name'] = display_name = vsa_name
 
216
        updates['vol_count'] = len(volume_params)
 
217
        vsa_ref = self.update(context, vsa_id, **updates)
 
218
 
 
219
        # create volumes
 
220
        if FLAGS.vsa_multi_vol_creation:
 
221
            if len(volume_params) > 0:
 
222
                request_spec = {
 
223
                    'num_volumes': len(volume_params),
 
224
                    'vsa_id': str(vsa_id),
 
225
                    'volumes': volume_params,
 
226
                }
 
227
 
 
228
                rpc.cast(context,
 
229
                         FLAGS.scheduler_topic,
 
230
                         {"method": "create_volumes",
 
231
                          "args": {"topic": FLAGS.volume_topic,
 
232
                                   "request_spec": request_spec,
 
233
                                   "availability_zone": availability_zone}})
 
234
        else:
 
235
            # create BE volumes one-by-one
 
236
            for vol in volume_params:
 
237
                try:
 
238
                    vol_name = vol['name']
 
239
                    vol_size = vol['size']
 
240
                    vol_type_id = vol['volume_type_id']
 
241
                    LOG.debug(_("VSA ID %(vsa_id)d %(vsa_name)s: Create "\
 
242
                                "volume %(vol_name)s, %(vol_size)d GB, "\
 
243
                                "type %(vol_type_id)s"), locals())
 
244
 
 
245
                    vol_type = volume_types.get_volume_type(context,
 
246
                                                vol['volume_type_id'])
 
247
 
 
248
                    vol_ref = self.volume_api.create(context,
 
249
                                    vol_size,
 
250
                                    None,
 
251
                                    vol_name,
 
252
                                    vol['description'],
 
253
                                    volume_type=vol_type,
 
254
                                    metadata=dict(to_vsa_id=str(vsa_id)),
 
255
                                    availability_zone=availability_zone)
 
256
                except:
 
257
                    self.update_vsa_status(context, vsa_id,
 
258
                                           status=VsaState.PARTIAL)
 
259
                    raise
 
260
 
 
261
        if len(volume_params) == 0:
 
262
            # No BE volumes - ask VSA manager to start VCs
 
263
            rpc.cast(context,
 
264
                     FLAGS.vsa_topic,
 
265
                     {"method": "create_vsa",
 
266
                      "args": {"vsa_id": str(vsa_id)}})
 
267
 
 
268
        return vsa_ref
 
269
 
 
270
    def update_vsa_status(self, context, vsa_id, status):
 
271
        updates = dict(status=status)
 
272
        LOG.info(_("VSA ID %(vsa_id)d: Update VSA status to %(status)s"),
 
273
                    locals())
 
274
        return self.update(context, vsa_id, **updates)
 
275
 
 
276
    def update(self, context, vsa_id, **kwargs):
 
277
        """Updates the VSA instance in the datastore.
 
278
 
 
279
        :param context: The security context
 
280
        :param vsa_id: ID of the VSA instance to update
 
281
        :param kwargs: All additional keyword args are treated
 
282
                       as data fields of the instance to be
 
283
                       updated
 
284
 
 
285
        :returns: None
 
286
        """
 
287
        LOG.info(_("VSA ID %(vsa_id)d: Update VSA call"), locals())
 
288
 
 
289
        updatable_fields = ['status', 'vc_count', 'vol_count',
 
290
                            'display_name', 'display_description']
 
291
        changes = {}
 
292
        for field in updatable_fields:
 
293
            if field in kwargs:
 
294
                changes[field] = kwargs[field]
 
295
 
 
296
        vc_count = kwargs.get('vc_count', None)
 
297
        if vc_count is not None:
 
298
            # VP-TODO: This request may want to update number of VCs
 
299
            # Get number of current VCs and add/delete VCs appropriately
 
300
            vsa = self.get(context, vsa_id)
 
301
            vc_count = int(vc_count)
 
302
            if vc_count > FLAGS.max_vcs_in_vsa:
 
303
                LOG.warning(_("Requested number of VCs (%d) is too high."\
 
304
                              " Setting to default"), vc_count)
 
305
                vc_count = FLAGS.max_vcs_in_vsa
 
306
 
 
307
            if vsa['vc_count'] != vc_count:
 
308
                self.update_num_vcs(context, vsa, vc_count)
 
309
                changes['vc_count'] = vc_count
 
310
 
 
311
        return self.db.vsa_update(context, vsa_id, changes)
 
312
 
 
313
    def update_num_vcs(self, context, vsa, vc_count):
 
314
        vsa_name = vsa['name']
 
315
        old_vc_count = int(vsa['vc_count'])
 
316
        if vc_count > old_vc_count:
 
317
            add_cnt = vc_count - old_vc_count
 
318
            LOG.debug(_("Adding %(add_cnt)s VCs to VSA %(vsa_name)s."),
 
319
                        locals())
 
320
            # VP-TODO: actual code for adding new VCs
 
321
 
 
322
        elif vc_count < old_vc_count:
 
323
            del_cnt = old_vc_count - vc_count
 
324
            LOG.debug(_("Deleting %(del_cnt)s VCs from VSA %(vsa_name)s."),
 
325
                        locals())
 
326
            # VP-TODO: actual code for deleting extra VCs
 
327
 
 
328
    def _force_volume_delete(self, ctxt, volume):
 
329
        """Delete a volume, bypassing the check that it must be available."""
 
330
        host = volume['host']
 
331
        if not host:
 
332
            # Deleting volume from database and skipping rpc.
 
333
            self.db.volume_destroy(ctxt, volume['id'])
 
334
            return
 
335
 
 
336
        rpc.cast(ctxt,
 
337
                 self.db.queue_get_for(ctxt, FLAGS.volume_topic, host),
 
338
                 {"method": "delete_volume",
 
339
                  "args": {"volume_id": volume['id']}})
 
340
 
 
341
    def delete_vsa_volumes(self, context, vsa_id, direction,
 
342
                           force_delete=True):
 
343
        if direction == "FE":
 
344
            volumes = self.get_all_vsa_volumes(context, vsa_id)
 
345
        else:
 
346
            volumes = self.get_all_vsa_drives(context, vsa_id)
 
347
 
 
348
        for volume in volumes:
 
349
            try:
 
350
                vol_name = volume['name']
 
351
                LOG.info(_("VSA ID %(vsa_id)s: Deleting %(direction)s "\
 
352
                           "volume %(vol_name)s"), locals())
 
353
                self.volume_api.delete(context, volume['id'])
 
354
            except exception.ApiError:
 
355
                LOG.info(_("Unable to delete volume %s"), volume['name'])
 
356
                if force_delete:
 
357
                    LOG.info(_("VSA ID %(vsa_id)s: Forced delete. "\
 
358
                               "%(direction)s volume %(vol_name)s"), locals())
 
359
                    self._force_volume_delete(context, volume)
 
360
 
 
361
    def delete(self, context, vsa_id):
 
362
        """Terminate a VSA instance."""
 
363
        LOG.info(_("Going to try to terminate VSA ID %s"), vsa_id)
 
364
 
 
365
        # Delete all FrontEnd and BackEnd volumes
 
366
        self.delete_vsa_volumes(context, vsa_id, "FE", force_delete=True)
 
367
        self.delete_vsa_volumes(context, vsa_id, "BE", force_delete=True)
 
368
 
 
369
        # Delete all VC instances
 
370
        instances = self.compute_api.get_all(context,
 
371
                search_opts={'metadata': dict(vsa_id=str(vsa_id))})
 
372
        for instance in instances:
 
373
            name = instance['name']
 
374
            LOG.debug(_("VSA ID %(vsa_id)s: Delete instance %(name)s"),
 
375
                        locals())
 
376
            self.compute_api.delete(context, instance['id'])
 
377
 
 
378
        # Delete VSA instance
 
379
        self.db.vsa_destroy(context, vsa_id)
 
380
 
 
381
    def get(self, context, vsa_id):
 
382
        rv = self.db.vsa_get(context, vsa_id)
 
383
        return rv
 
384
 
 
385
    def get_all(self, context):
 
386
        if context.is_admin:
 
387
            return self.db.vsa_get_all(context)
 
388
        return self.db.vsa_get_all_by_project(context, context.project_id)
 
389
 
 
390
    def get_vsa_volume_type(self, context):
 
391
        name = FLAGS.vsa_volume_type_name
 
392
        try:
 
393
            vol_type = volume_types.get_volume_type_by_name(context, name)
 
394
        except exception.NotFound:
 
395
            volume_types.create(context, name,
 
396
                                extra_specs=dict(type='vsa_volume'))
 
397
            vol_type = volume_types.get_volume_type_by_name(context, name)
 
398
 
 
399
        return vol_type
 
400
 
 
401
    def get_all_vsa_instances(self, context, vsa_id):
 
402
        return self.compute_api.get_all(context,
 
403
                search_opts={'metadata': dict(vsa_id=str(vsa_id))})
 
404
 
 
405
    def get_all_vsa_volumes(self, context, vsa_id):
 
406
        return self.volume_api.get_all(context,
 
407
                search_opts={'metadata': dict(from_vsa_id=str(vsa_id))})
 
408
 
 
409
    def get_all_vsa_drives(self, context, vsa_id):
 
410
        return self.volume_api.get_all(context,
 
411
                search_opts={'metadata': dict(to_vsa_id=str(vsa_id))})