~ubuntu-branches/ubuntu/quantal/nova/quantal-proposed

« back to all changes in this revision

Viewing changes to nova/scheduler/driver.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2012-08-16 14:04:11 UTC
  • mto: This revision was merged to the branch mainline in revision 84.
  • Revision ID: package-import@ubuntu.com-20120816140411-0mr4n241wmk30t9l
Tags: upstream-2012.2~f3
ImportĀ upstreamĀ versionĀ 2012.2~f3

Show diffs side-by-side

added added

removed removed

Lines of Context:
31
31
from nova import notifications
32
32
from nova.openstack.common import cfg
33
33
from nova.openstack.common import importutils
34
 
from nova.openstack.common import jsonutils
35
34
from nova.openstack.common import log as logging
36
35
from nova.openstack.common import rpc
37
36
from nova.openstack.common import timeutils
44
43
    cfg.StrOpt('scheduler_host_manager',
45
44
               default='nova.scheduler.host_manager.HostManager',
46
45
               help='The scheduler host manager class to use'),
 
46
    cfg.IntOpt('scheduler_max_attempts',
 
47
               default=3,
 
48
               help='Maximum number of attempts to schedule an instance'),
47
49
    ]
48
50
 
49
51
FLAGS = flags.FLAGS
53
55
flags.DECLARE('libvirt_type', 'nova.virt.libvirt.driver')
54
56
 
55
57
 
56
 
def cast_to_volume_host(context, host, method, update_db=True, **kwargs):
 
58
def cast_to_volume_host(context, host, method, **kwargs):
57
59
    """Cast request to a volume host queue"""
58
60
 
59
 
    if update_db:
60
 
        volume_id = kwargs.get('volume_id', None)
61
 
        if volume_id is not None:
62
 
            now = timeutils.utcnow()
63
 
            db.volume_update(context, volume_id,
64
 
                    {'host': host, 'scheduled_at': now})
 
61
    volume_id = kwargs.get('volume_id', None)
 
62
    if volume_id is not None:
 
63
        now = timeutils.utcnow()
 
64
        db.volume_update(context, volume_id,
 
65
                {'host': host, 'scheduled_at': now})
65
66
    rpc.cast(context,
66
67
             rpc.queue_get_for(context, 'volume', host),
67
68
             {"method": method, "args": kwargs})
68
69
    LOG.debug(_("Casted '%(method)s' to volume '%(host)s'") % locals())
69
70
 
70
71
 
71
 
def cast_to_compute_host(context, host, method, update_db=True, **kwargs):
 
72
def instance_update_db(context, instance_uuid, host):
 
73
    '''Set the host and scheduled_at fields of an Instance.
 
74
 
 
75
    :returns: An Instance with the updated fields set properly.
 
76
    '''
 
77
    now = timeutils.utcnow()
 
78
    values = {'host': host, 'scheduled_at': now}
 
79
    return db.instance_update(context, instance_uuid, values)
 
80
 
 
81
 
 
82
def cast_to_compute_host(context, host, method, **kwargs):
72
83
    """Cast request to a compute host queue"""
73
84
 
74
 
    if update_db:
75
 
        # fall back on the id if the uuid is not present
76
 
        instance_id = kwargs.get('instance_id', None)
77
 
        instance_uuid = kwargs.get('instance_uuid', instance_id)
78
 
        if instance_uuid is not None:
79
 
            now = timeutils.utcnow()
80
 
            db.instance_update(context, instance_uuid,
81
 
                    {'host': host, 'scheduled_at': now})
 
85
    instance_uuid = kwargs.get('instance_uuid', None)
 
86
    if instance_uuid:
 
87
        instance_update_db(context, instance_uuid, host)
 
88
 
82
89
    rpc.cast(context,
83
90
             rpc.queue_get_for(context, 'compute', host),
84
91
             {"method": method, "args": kwargs})
85
92
    LOG.debug(_("Casted '%(method)s' to compute '%(host)s'") % locals())
86
93
 
87
94
 
88
 
def cast_to_network_host(context, host, method, update_db=False, **kwargs):
 
95
def cast_to_network_host(context, host, method, **kwargs):
89
96
    """Cast request to a network host queue"""
90
97
 
91
98
    rpc.cast(context,
94
101
    LOG.debug(_("Casted '%(method)s' to network '%(host)s'") % locals())
95
102
 
96
103
 
97
 
def cast_to_host(context, topic, host, method, update_db=True, **kwargs):
 
104
def cast_to_host(context, topic, host, method, **kwargs):
98
105
    """Generic cast to host"""
99
106
 
100
107
    topic_mapping = {
104
111
 
105
112
    func = topic_mapping.get(topic)
106
113
    if func:
107
 
        func(context, host, method, update_db=update_db, **kwargs)
 
114
        func(context, host, method, **kwargs)
108
115
    else:
109
116
        rpc.cast(context,
110
117
                 rpc.queue_get_for(context, topic, host),
140
147
        self.compute_api = compute_api.API()
141
148
        self.compute_rpcapi = compute_rpcapi.ComputeAPI()
142
149
 
143
 
    def get_host_list(self):
144
 
        """Get a list of hosts from the HostManager."""
145
 
        return self.host_manager.get_host_list()
146
 
 
147
 
    def get_service_capabilities(self):
148
 
        """Get the normalized set of capabilities for the services.
149
 
        """
150
 
        return self.host_manager.get_service_capabilities()
151
 
 
152
150
    def update_service_capabilities(self, service_name, host, capabilities):
153
151
        """Process a capability update from a service node."""
154
152
        self.host_manager.update_service_capabilities(service_name,
186
184
        """Must override schedule method for scheduler to work."""
187
185
        raise NotImplementedError(_("Must implement a fallback schedule"))
188
186
 
189
 
    def schedule_prep_resize(self, context, request_spec, *_args, **_kwargs):
 
187
    def schedule_prep_resize(self, context, image, request_spec,
 
188
                             filter_properties, instance, instance_type,
 
189
                             reservations=None):
190
190
        """Must override schedule_prep_resize method for scheduler to work."""
191
191
        msg = _("Driver must implement schedule_prep_resize")
192
192
        raise NotImplementedError(msg)
193
193
 
194
 
    def schedule_run_instance(self, context, request_spec, *_args, **_kwargs):
 
194
    def schedule_run_instance(self, context, request_spec,
 
195
                              admin_password, injected_files,
 
196
                              requested_networks, is_first_time,
 
197
                              filter_properties, reservations):
195
198
        """Must override schedule_run_instance method for scheduler to work."""
196
199
        msg = _("Driver must implement schedule_run_instance")
197
200
        raise NotImplementedError(msg)
198
201
 
199
 
    def schedule_live_migration(self, context, instance_id, dest,
200
 
                                block_migration=False,
201
 
                                disk_over_commit=False):
 
202
    def schedule_live_migration(self, context, dest,
 
203
                                block_migration=False, disk_over_commit=False,
 
204
                                instance=None, instance_id=None):
202
205
        """Live migration scheduling method.
203
206
 
204
207
        :param context:
205
 
        :param instance_id:
 
208
        :param instance_id: (deprecated)
 
209
        :param instance: instance dict
206
210
        :param dest: destination host
207
211
        :param block_migration: if true, block_migration.
208
212
        :param disk_over_commit: if True, consider real(not virtual)
212
216
            The host where instance is running currently.
213
217
            Then scheduler send request that host.
214
218
        """
215
 
        # Whether instance exists and is running.
216
 
        instance_ref = db.instance_get(context, instance_id)
217
 
 
218
 
        # Checking instance.
219
 
        self._live_migration_src_check(context, instance_ref)
220
 
 
221
 
        # Checking destination host.
222
 
        self._live_migration_dest_check(context, instance_ref,
223
 
                                        dest, block_migration,
224
 
                                        disk_over_commit)
225
 
        # Common checking.
226
 
        self._live_migration_common_check(context, instance_ref,
227
 
                                          dest, block_migration,
228
 
                                          disk_over_commit)
229
 
 
230
 
        # Changing instance_state.
 
219
        # Check we can do live migration
 
220
        if not instance:
 
221
            instance = db.instance_get(context, instance_id)
 
222
 
 
223
        self._live_migration_src_check(context, instance)
 
224
        self._live_migration_dest_check(context, instance, dest)
 
225
        self._live_migration_common_check(context, instance, dest)
 
226
        migrate_data = self.compute_rpcapi.check_can_live_migrate_destination(
 
227
                context, instance, dest, block_migration, disk_over_commit)
 
228
 
 
229
        # Change instance_state
231
230
        values = {"task_state": task_states.MIGRATING}
232
231
 
233
232
        # update instance state and notify
234
233
        (old_ref, new_instance_ref) = db.instance_update_and_get_original(
235
 
                context, instance_id, values)
 
234
                context, instance['uuid'], values)
236
235
        notifications.send_update(context, old_ref, new_instance_ref,
237
236
                service="scheduler")
238
237
 
239
 
        src = instance_ref['host']
240
 
        cast_to_compute_host(context, src, 'live_migration',
241
 
                update_db=False,
242
 
                instance_id=instance_id,
243
 
                dest=dest,
244
 
                block_migration=block_migration)
 
238
        # Perform migration
 
239
        src = instance['host']
 
240
        self.compute_rpcapi.live_migration(context, host=src,
 
241
                instance=new_instance_ref, dest=dest,
 
242
                block_migration=block_migration,
 
243
                migrate_data=migrate_data)
245
244
 
246
245
    def _live_migration_src_check(self, context, instance_ref):
247
246
        """Live migration check routine (for src host).
250
249
        :param instance_ref: nova.db.sqlalchemy.models.Instance object
251
250
 
252
251
        """
253
 
 
 
252
        # TODO(johngar) why is this not in the API layer?
254
253
        # Checking instance is running.
255
254
        if instance_ref['power_state'] != power_state.RUNNING:
256
255
            raise exception.InstanceNotRunning(
258
257
 
259
258
        # Checking src host exists and compute node
260
259
        src = instance_ref['host']
261
 
        services = db.service_get_all_compute_by_host(context, src)
 
260
        try:
 
261
            services = db.service_get_all_compute_by_host(context, src)
 
262
        except exception.NotFound:
 
263
            raise exception.ComputeServiceUnavailable(host=src)
262
264
 
263
265
        # Checking src host is alive.
264
266
        if not utils.service_is_up(services[0]):
265
267
            raise exception.ComputeServiceUnavailable(host=src)
266
268
 
267
 
    def _live_migration_dest_check(self, context, instance_ref, dest,
268
 
                                   block_migration, disk_over_commit):
 
269
    def _live_migration_dest_check(self, context, instance_ref, dest):
269
270
        """Live migration check routine (for destination host).
270
271
 
271
272
        :param context: security context
272
273
        :param instance_ref: nova.db.sqlalchemy.models.Instance object
273
274
        :param dest: destination host
274
 
        :param block_migration: if true, block_migration.
275
 
        :param disk_over_commit: if True, consider real(not virtual)
276
 
                                 disk size.
277
275
        """
278
276
 
279
277
        # Checking dest exists and compute node.
291
289
            raise exception.UnableToMigrateToSelf(
292
290
                    instance_id=instance_ref['uuid'], host=dest)
293
291
 
294
 
        # Checking dst host still has enough capacities.
295
 
        self.assert_compute_node_has_enough_resources(context,
296
 
                                                      instance_ref,
297
 
                                                      dest,
298
 
                                                      block_migration,
299
 
                                                      disk_over_commit)
 
292
        # Check memory requirements
 
293
        self._assert_compute_node_has_enough_memory(context,
 
294
                                                   instance_ref, dest)
300
295
 
301
 
    def _live_migration_common_check(self, context, instance_ref, dest,
302
 
                                     block_migration, disk_over_commit):
 
296
    def _live_migration_common_check(self, context, instance_ref, dest):
303
297
        """Live migration common check routine.
304
298
 
305
299
        Below checkings are followed by
308
302
        :param context: security context
309
303
        :param instance_ref: nova.db.sqlalchemy.models.Instance object
310
304
        :param dest: destination host
311
 
        :param block_migration: if true, block_migration.
312
 
        :param disk_over_commit: if True, consider real(not virtual)
313
 
                                 disk size.
314
 
 
315
305
        """
316
 
 
317
 
        # Checking shared storage connectivity
318
 
        # if block migration, instances_paths should not be on shared storage.
319
 
        shared = self.mounted_on_same_shared_storage(context, instance_ref,
320
 
                                                     dest)
321
 
        if block_migration:
322
 
            if shared:
323
 
                reason = _("Block migration can not be used "
324
 
                           "with shared storage.")
325
 
                raise exception.InvalidSharedStorage(reason=reason, path=dest)
326
 
 
327
 
        elif not shared:
328
 
            reason = _("Live migration can not be used "
329
 
                       "without shared storage.")
330
 
            raise exception.InvalidSharedStorage(reason=reason, path=dest)
331
 
 
332
 
        # Checking destination host exists.
333
 
        dservice_refs = db.service_get_all_compute_by_host(context, dest)
334
 
        dservice_ref = dservice_refs[0]['compute_node'][0]
335
 
 
336
 
        # Checking original host( where instance was launched at) exists.
337
 
        try:
338
 
            oservice_refs = db.service_get_all_compute_by_host(context,
339
 
                                           instance_ref['host'])
340
 
        except exception.NotFound:
341
 
            raise exception.SourceHostUnavailable()
342
 
        oservice_ref = oservice_refs[0]['compute_node'][0]
 
306
        dservice_ref = self._get_compute_info(context, dest)
 
307
        src = instance_ref['host']
 
308
        oservice_ref = self._get_compute_info(context, src)
343
309
 
344
310
        # Checking hypervisor is same.
345
311
        orig_hypervisor = oservice_ref['hypervisor_type']
353
319
        if orig_hypervisor > dest_hypervisor:
354
320
            raise exception.DestinationHypervisorTooOld()
355
321
 
356
 
        # Checking cpuinfo.
357
 
        try:
358
 
            self.compute_rpcapi.compare_cpu(context, oservice_ref['cpu_info'],
359
 
                                            dest)
360
 
 
361
 
        except exception.InvalidCPUInfo:
362
 
            src = instance_ref['host']
363
 
            LOG.exception(_("host %(dest)s is not compatible with "
364
 
                                "original host %(src)s.") % locals())
365
 
            raise
366
 
 
367
 
    def assert_compute_node_has_enough_resources(self, context, instance_ref,
368
 
                                                 dest, block_migration,
369
 
                                                 disk_over_commit):
370
 
 
371
 
        """Checks if destination host has enough resource for live migration.
372
 
 
373
 
        :param context: security context
374
 
        :param instance_ref: nova.db.sqlalchemy.models.Instance object
375
 
        :param dest: destination host
376
 
        :param block_migration: if true, block_migration.
377
 
        :param disk_over_commit: if True, consider real(not virtual)
378
 
                                 disk size.
379
 
 
380
 
        """
381
 
        self.assert_compute_node_has_enough_memory(context,
382
 
                                                   instance_ref, dest)
383
 
        if not block_migration:
384
 
            return
385
 
        self.assert_compute_node_has_enough_disk(context,
386
 
                                                 instance_ref, dest,
387
 
                                                 disk_over_commit)
388
 
 
389
 
    def assert_compute_node_has_enough_memory(self, context,
 
322
    def _assert_compute_node_has_enough_memory(self, context,
390
323
                                              instance_ref, dest):
391
324
        """Checks if destination host has enough memory for live migration.
392
325
 
397
330
 
398
331
        """
399
332
        # Getting total available memory of host
400
 
        avail = self._get_compute_info(context, dest, 'memory_mb')
 
333
        avail = self._get_compute_info(context, dest)['memory_mb']
401
334
 
402
335
        # Getting total used memory and disk of host
403
336
        # It should be sum of memories that are assigned as max value,
414
347
                       "instance:%(mem_inst)s)")
415
348
            raise exception.MigrationError(reason=reason % locals())
416
349
 
417
 
    def assert_compute_node_has_enough_disk(self, context, instance_ref, dest,
418
 
                                            disk_over_commit):
419
 
        """Checks if destination host has enough disk for block migration.
420
 
 
421
 
        :param context: security context
422
 
        :param instance_ref: nova.db.sqlalchemy.models.Instance object
423
 
        :param dest: destination host
424
 
        :param disk_over_commit: if True, consider real(not virtual)
425
 
                                 disk size.
426
 
 
427
 
        """
428
 
 
429
 
        # Libvirt supports qcow2 disk format,which is usually compressed
430
 
        # on compute nodes.
431
 
        # Real disk image (compressed) may enlarged to "virtual disk size",
432
 
        # that is specified as the maximum disk size.
433
 
        # (See qemu-img -f path-to-disk)
434
 
        # Scheduler recognizes destination host still has enough disk space
435
 
        # if real disk size < available disk size
436
 
        # if disk_over_commit is True,
437
 
        #  otherwise virtual disk size < available disk size.
438
 
 
439
 
        # Getting total available disk of host
440
 
        available_gb = self._get_compute_info(context,
441
 
                                              dest, 'disk_available_least')
442
 
        available = available_gb * (1024 ** 3)
443
 
 
444
 
        # Getting necessary disk size
445
 
        ret = self.compute_rpcapi.get_instance_disk_info(context, instance_ref)
446
 
        disk_infos = jsonutils.loads(ret)
447
 
 
448
 
        necessary = 0
449
 
        if disk_over_commit:
450
 
            for info in disk_infos:
451
 
                necessary += int(info['disk_size'])
452
 
        else:
453
 
            for info in disk_infos:
454
 
                necessary += int(info['virt_disk_size'])
455
 
 
456
 
        # Check that available disk > necessary disk
457
 
        if (available - necessary) < 0:
458
 
            instance_uuid = instance_ref['uuid']
459
 
            reason = _("Unable to migrate %(instance_uuid)s to %(dest)s: "
460
 
                       "Lack of disk(host:%(available)s "
461
 
                       "<= instance:%(necessary)s)")
462
 
            raise exception.MigrationError(reason=reason % locals())
463
 
 
464
 
    def _get_compute_info(self, context, host, key):
 
350
    def _get_compute_info(self, context, host):
465
351
        """get compute node's information specified by key
466
352
 
467
353
        :param context: security context
471
357
 
472
358
        """
473
359
        compute_node_ref = db.service_get_all_compute_by_host(context, host)
474
 
        compute_node_ref = compute_node_ref[0]['compute_node'][0]
475
 
        return compute_node_ref[key]
476
 
 
477
 
    def mounted_on_same_shared_storage(self, context, instance_ref, dest):
478
 
        """Check if the src and dest host mount same shared storage.
479
 
 
480
 
        At first, dest host creates temp file, and src host can see
481
 
        it if they mounts same shared storage. Then src host erase it.
482
 
 
483
 
        :param context: security context
484
 
        :param instance_ref: nova.db.sqlalchemy.models.Instance object
485
 
        :param dest: destination host
486
 
 
487
 
        """
488
 
 
489
 
        src = instance_ref['host']
490
 
 
491
 
        filename = self.compute_rpcapi.create_shared_storage_test_file(context,
492
 
                dest)
493
 
 
494
 
        try:
495
 
            # make sure existence at src host.
496
 
            ret = self.compute_rpcapi.check_shared_storage_test_file(context,
497
 
                    filename, src)
498
 
 
499
 
        finally:
500
 
            self.compute_rpcapi.cleanup_shared_storage_test_file(context,
501
 
                    filename, dest)
502
 
 
503
 
        return ret
 
360
        return compute_node_ref[0]['compute_node'][0]