~ubuntu-branches/ubuntu/raring/nova/raring-proposed

« back to all changes in this revision

Viewing changes to nova/compute/resource_tracker.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Adam Gandelman, Chuck Short
  • Date: 2012-11-23 09:04:58 UTC
  • mfrom: (1.1.66)
  • Revision ID: package-import@ubuntu.com-20121123090458-91565o7aev1i1h71
Tags: 2013.1~g1-0ubuntu1
[ Adam Gandelman ]
* debian/control: Ensure novaclient is upgraded with nova,
  require python-keystoneclient >= 1:2.9.0. (LP: #1073289)
* debian/patches/{ubuntu/*, rbd-security.patch}: Dropped, applied
  upstream.
* debian/control: Add python-testtools to Build-Depends.

[ Chuck Short ]
* New upstream version.
* Refreshed debian/patches/avoid_setuptools_git_dependency.patch.
* debian/rules: FTBFS if missing binaries.
* debian/nova-scheudler.install: Add missing rabbit-queues and
  nova-rpc-zmq-receiver.
* Remove nova-volume since it doesnt exist anymore, transition to cinder-*.
* debian/rules: install apport hook in the right place.
* debian/patches/ubuntu-show-tests.patch: Display test failures.
* debian/control: Add depends on genisoimage
* debian/control: Suggest guestmount.
* debian/control: Suggest websockify. (LP: #1076442)
* debian/nova.conf: Disable nova-volume service.
* debian/control: Depend on xen-system-* rather than the hypervisor.
* debian/control, debian/mans/nova-conductor.8, debian/nova-conductor.init,
  debian/nova-conductor.install, debian/nova-conductor.logrotate
  debian/nova-conductor.manpages, debian/nova-conductor.postrm
  debian/nova-conductor.upstart.in: Add nova-conductor service.
* debian/control: Add python-fixtures as a build deps.

Show diffs side-by-side

added added

removed removed

Lines of Context:
19
19
model.
20
20
"""
21
21
 
 
22
from nova.compute import claims
 
23
from nova.compute import instance_types
 
24
from nova.compute import task_states
22
25
from nova.compute import vm_states
 
26
from nova import context
23
27
from nova import db
24
28
from nova import exception
25
 
from nova import flags
 
29
from nova import notifications
26
30
from nova.openstack.common import cfg
27
31
from nova.openstack.common import importutils
28
32
from nova.openstack.common import jsonutils
 
33
from nova.openstack.common import lockutils
29
34
from nova.openstack.common import log as logging
30
 
from nova.openstack.common import timeutils
31
 
from nova import utils
32
35
 
33
36
resource_tracker_opts = [
34
37
    cfg.IntOpt('reserved_host_disk_mb', default=0,
35
38
               help='Amount of disk in MB to reserve for the host'),
36
39
    cfg.IntOpt('reserved_host_memory_mb', default=512,
37
40
               help='Amount of memory in MB to reserve for the host'),
38
 
    cfg.IntOpt('claim_timeout_seconds', default=600,
39
 
               help='How long, in seconds, before a resource claim times out'),
40
41
    cfg.StrOpt('compute_stats_class',
41
42
               default='nova.compute.stats.Stats',
42
43
               help='Class that will manage stats for the local compute host')
43
44
]
44
45
 
45
 
FLAGS = flags.FLAGS
46
 
FLAGS.register_opts(resource_tracker_opts)
 
46
CONF = cfg.CONF
 
47
CONF.register_opts(resource_tracker_opts)
47
48
 
48
49
LOG = logging.getLogger(__name__)
49
 
COMPUTE_RESOURCE_SEMAPHORE = "compute_resources"
50
 
 
51
 
 
52
 
class Claim(object):
53
 
    """A declaration that a compute host operation will require free resources.
54
 
 
55
 
    This information will be used to help keep the local compute hosts's
56
 
    ComputeNode model in sync to aid the scheduler in making efficient / more
57
 
    correct decisions with respect to host selection.
58
 
    """
59
 
 
60
 
    def __init__(self, instance, timeout):
61
 
        self.instance = jsonutils.to_primitive(instance)
62
 
        self.timeout = timeout
63
 
        self.expire_ts = timeutils.utcnow_ts() + timeout
64
 
 
65
 
    def is_expired(self):
66
 
        """Determine if this adjustment is old enough that we can assume it's
67
 
        no longer needed.
68
 
        """
69
 
        return timeutils.utcnow_ts() > self.expire_ts
70
 
 
71
 
    @property
72
 
    def claim_id(self):
73
 
        return self.instance['uuid']
74
 
 
75
 
    @property
76
 
    def disk_gb(self):
77
 
        return self.instance['root_gb'] + self.instance['ephemeral_gb']
78
 
 
79
 
    @property
80
 
    def memory_mb(self):
81
 
        return self.instance['memory_mb']
82
 
 
83
 
    @property
84
 
    def vcpus(self):
85
 
        return self.instance['vcpus']
86
 
 
87
 
    def __str__(self):
88
 
        return "[Claim %s: %d MB memory, %d GB disk, %d VCPUS]" % \
89
 
                    (self.claim_id, self.memory_mb, self.disk_gb, self.vcpus)
90
 
 
91
 
 
92
 
class ResourceContextManager(object):
93
 
    def __init__(self, context, claim, tracker):
94
 
        self.context = context
95
 
        self.claim = claim
96
 
        self.tracker = tracker
97
 
 
98
 
    def __enter__(self):
99
 
        if not self.claim and not self.tracker.disabled:
100
 
            # insufficient resources to complete request
101
 
            raise exception.ComputeResourcesUnavailable()
102
 
 
103
 
    def __exit__(self, exc_type, exc_val, exc_tb):
104
 
        if not self.claim:
105
 
            return
106
 
 
107
 
        if exc_type is None:
108
 
            self.tracker.finish_resource_claim(self.claim)
109
 
        else:
110
 
            self.tracker.abort_resource_claim(self.context, self.claim)
 
50
COMPUTE_RESOURCE_SEMAPHORE = claims.COMPUTE_RESOURCE_SEMAPHORE
111
51
 
112
52
 
113
53
class ResourceTracker(object):
115
55
    are built and destroyed.
116
56
    """
117
57
 
118
 
    def __init__(self, host, driver):
 
58
    def __init__(self, host, driver, nodename):
119
59
        self.host = host
120
60
        self.driver = driver
 
61
        self.nodename = nodename
121
62
        self.compute_node = None
122
 
        self.next_claim_id = 1
123
 
        self.claims = {}
124
 
        self.stats = importutils.import_object(FLAGS.compute_stats_class)
 
63
        self.stats = importutils.import_object(CONF.compute_stats_class)
125
64
        self.tracked_instances = {}
126
 
 
127
 
    def resource_claim(self, context, instance_ref, limits=None):
128
 
        claim = self.begin_resource_claim(context, instance_ref, limits)
129
 
        return ResourceContextManager(context, claim, self)
130
 
 
131
 
    @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
132
 
    def begin_resource_claim(self, context, instance_ref, limits=None,
133
 
                             timeout=None):
 
65
        self.tracked_migrations = {}
 
66
 
 
67
    @lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
 
68
    def instance_claim(self, context, instance_ref, limits=None):
134
69
        """Indicate that some resources are needed for an upcoming compute
135
70
        instance build operation.
136
71
 
141
76
        :param instance_ref: instance to reserve resources for
142
77
        :param limits: Dict of oversubscription limits for memory, disk,
143
78
                       and CPUs.
144
 
        :param timeout: How long, in seconds, the operation that requires
145
 
                        these resources should take to actually allocate what
146
 
                        it needs from the hypervisor.  If the timeout is
147
 
                        exceeded, the new resource claim will assume caller
148
 
                        before releasing the resources.
149
 
        :returns: An integer 'claim ticket'.  This should be turned into
150
 
                  finalize  a resource claim or free resources after the
151
 
                  compute operation is finished. Returns None if the claim
152
 
                  failed.
153
 
        """
154
 
        if self.disabled:
155
 
            return
156
 
 
157
 
        if not limits:
158
 
            limits = {}
159
 
 
160
 
        if not timeout:
161
 
            timeout = FLAGS.claim_timeout_seconds
162
 
 
163
 
        # If an individual limit is None, the resource will be considered
164
 
        # unlimited:
165
 
        memory_mb_limit = limits.get('memory_mb')
166
 
        disk_gb_limit = limits.get('disk_gb')
167
 
        vcpu_limit = limits.get('vcpu')
168
 
 
169
 
        memory_mb = instance_ref['memory_mb']
170
 
        disk_gb = instance_ref['root_gb'] + instance_ref['ephemeral_gb']
171
 
        vcpus = instance_ref['vcpus']
172
 
 
173
 
        msg = _("Attempting claim: memory %(memory_mb)d MB, disk %(disk_gb)d "
174
 
                "GB, VCPUs %(vcpus)d") % locals()
175
 
        LOG.audit(msg)
176
 
 
177
 
        # Test for resources:
178
 
        if not self._can_claim_memory(memory_mb, memory_mb_limit):
179
 
            return
180
 
 
181
 
        if not self._can_claim_disk(disk_gb, disk_gb_limit):
182
 
            return
183
 
 
184
 
        if not self._can_claim_cpu(vcpus, vcpu_limit):
185
 
            return
186
 
 
187
 
        # keep track of this claim until we know whether the compute operation
188
 
        # was successful/completed:
189
 
        claim = Claim(instance_ref, timeout)
190
 
        self.claims[claim.claim_id] = claim
191
 
 
192
 
        # Mark resources in-use and update stats
193
 
        self._update_usage_from_instance(self.compute_node, instance_ref)
194
 
 
195
 
        # persist changes to the compute node:
196
 
        self._update(context, self.compute_node)
197
 
        return claim
198
 
 
199
 
    def _can_claim_memory(self, memory_mb, memory_mb_limit):
200
 
        """Test if memory needed for a claim can be safely allocated"""
201
 
        # Installed memory and usage info:
202
 
        msg = _("Total memory: %(total_mem)d MB, used: %(used_mem)d MB, free: "
203
 
                "%(free_mem)d MB") % dict(
204
 
                        total_mem=self.compute_node['memory_mb'],
205
 
                        used_mem=self.compute_node['memory_mb_used'],
206
 
                        free_mem=self.compute_node['local_gb_used'])
207
 
        LOG.audit(msg)
208
 
 
209
 
        if memory_mb_limit is None:
210
 
            # treat memory as unlimited:
211
 
            LOG.audit(_("Memory limit not specified, defaulting to unlimited"))
212
 
            return True
213
 
 
214
 
        free_ram_mb = memory_mb_limit - self.compute_node['memory_mb_used']
215
 
 
216
 
        # Oversubscribed memory policy info:
217
 
        msg = _("Memory limit: %(memory_mb_limit)d MB, free: "
218
 
                "%(free_ram_mb)d MB") % locals()
219
 
        LOG.audit(msg)
220
 
 
221
 
        can_claim_mem = memory_mb <= free_ram_mb
222
 
 
223
 
        if not can_claim_mem:
224
 
            msg = _("Unable to claim resources.  Free memory %(free_ram_mb)d "
225
 
                    "MB < requested memory %(memory_mb)d MB") % locals()
226
 
            LOG.info(msg)
227
 
 
228
 
        return can_claim_mem
229
 
 
230
 
    def _can_claim_disk(self, disk_gb, disk_gb_limit):
231
 
        """Test if disk space needed can be safely allocated"""
232
 
        # Installed disk and usage info:
233
 
        msg = _("Total disk: %(total_disk)d GB, used: %(used_disk)d GB, free: "
234
 
                "%(free_disk)d GB") % dict(
235
 
                        total_disk=self.compute_node['local_gb'],
236
 
                        used_disk=self.compute_node['local_gb_used'],
237
 
                        free_disk=self.compute_node['free_disk_gb'])
238
 
        LOG.audit(msg)
239
 
 
240
 
        if disk_gb_limit is None:
241
 
            # treat disk as unlimited:
242
 
            LOG.audit(_("Disk limit not specified, defaulting to unlimited"))
243
 
            return True
244
 
 
245
 
        free_disk_gb = disk_gb_limit - self.compute_node['local_gb_used']
246
 
 
247
 
        # Oversubscribed disk policy info:
248
 
        msg = _("Disk limit: %(disk_gb_limit)d GB, free: "
249
 
                "%(free_disk_gb)d GB") % locals()
250
 
        LOG.audit(msg)
251
 
 
252
 
        can_claim_disk = disk_gb <= free_disk_gb
253
 
        if not can_claim_disk:
254
 
            msg = _("Unable to claim resources.  Free disk %(free_disk_gb)d GB"
255
 
                    " < requested disk %(disk_gb)d GB") % dict(
256
 
                            free_disk_gb=self.compute_node['free_disk_gb'],
257
 
                            disk_gb=disk_gb)
258
 
            LOG.info(msg)
259
 
 
260
 
        return can_claim_disk
261
 
 
262
 
    def _can_claim_cpu(self, vcpus, vcpu_limit):
263
 
        """Test if CPUs can be safely allocated according to given policy."""
264
 
 
265
 
        msg = _("Total VCPUs: %(total_vcpus)d, used: %(used_vcpus)d") \
266
 
                % dict(total_vcpus=self.compute_node['vcpus'],
267
 
                       used_vcpus=self.compute_node['vcpus_used'])
268
 
        LOG.audit(msg)
269
 
 
270
 
        if vcpu_limit is None:
271
 
            # treat cpu as unlimited:
272
 
            LOG.audit(_("VCPU limit not specified, defaulting to unlimited"))
273
 
            return True
274
 
 
275
 
        # Oversubscribed disk policy info:
276
 
        msg = _("CPU limit: %(vcpu_limit)d") % locals()
277
 
        LOG.audit(msg)
278
 
 
279
 
        free_vcpus = vcpu_limit - self.compute_node['vcpus_used']
280
 
        can_claim_cpu = vcpus <= free_vcpus
281
 
 
282
 
        if not can_claim_cpu:
283
 
            msg = _("Unable to claim resources.  Free CPU %(free_vcpus)d < "
284
 
                    "requested CPU %(vcpus)d") % locals()
285
 
            LOG.info(msg)
286
 
 
287
 
        return can_claim_cpu
288
 
 
289
 
    @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
290
 
    def finish_resource_claim(self, claim):
291
 
        """Indicate that the compute operation that previously claimed the
292
 
        resources identified by 'claim' has now completed and the resources
293
 
        have been allocated at the virt layer.
294
 
 
295
 
        Calling this keeps the available resource data more accurate and
296
 
        timely than letting the claim timeout elapse and waiting for
297
 
        update_available_resource to reflect the changed usage data.
298
 
 
299
 
        :param claim: A claim indicating a set of resources that were
300
 
                      previously claimed.
301
 
        """
302
 
        if self.disabled:
303
 
            return
304
 
 
305
 
        if self.claims.pop(claim.claim_id, None):
306
 
            LOG.info(_("Finishing claim: %s") % claim)
307
 
        else:
308
 
            LOG.info(_("Can't find claim %s.  It may have been 'finished' "
309
 
                       "twice, or it has already timed out."), claim.claim_id)
310
 
 
311
 
    @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
312
 
    def abort_resource_claim(self, context, claim):
313
 
        """Indicate that the operation that claimed the resources identified by
314
 
        'claim_id' has either failed or been aborted and the resources are no
315
 
        longer needed.
316
 
 
317
 
        :param claim: A claim ticket indicating a set of resources that were
318
 
                      previously claimed.
319
 
        """
320
 
        if self.disabled:
321
 
            return
322
 
 
323
 
        # un-claim the resources:
324
 
        if self.claims.pop(claim.claim_id, None):
325
 
            LOG.info(_("Aborting claim: %s") % claim)
326
 
            # flag the instance as deleted to revert the resource usage
327
 
            # and associated stats:
328
 
            claim.instance['vm_state'] = vm_states.DELETED
329
 
            self._update_usage_from_instance(self.compute_node, claim.instance)
330
 
            self._update(context, self.compute_node)
331
 
 
332
 
        else:
333
 
            # can't find the claim.  this may mean the claim already timed
334
 
            # out or it was already explicitly finished/aborted.
335
 
            LOG.audit(_("Claim %s not found.  It either timed out or was "
336
 
                        "already explicitly finished/aborted"), claim.claim_id)
337
 
 
338
 
    @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
 
79
        :returns: A Claim ticket representing the reserved resources.  It can
 
80
                  be used to revert the resource usage if an error occurs
 
81
                  during the instance build.
 
82
        """
 
83
        if self.disabled:
 
84
            # compute_driver doesn't support resource tracking, just
 
85
            # set the 'host' field and continue the build:
 
86
            self._set_instance_host(context, instance_ref)
 
87
            return claims.NopClaim()
 
88
 
 
89
        # sanity check:
 
90
        if instance_ref['host']:
 
91
            LOG.warning(_("Host field should not be set on the instance until "
 
92
                          "resources have been claimed."),
 
93
                          instance=instance_ref)
 
94
 
 
95
        claim = claims.Claim(instance_ref, self)
 
96
 
 
97
        if claim.test(self.compute_node, limits):
 
98
 
 
99
            self._set_instance_host(context, instance_ref)
 
100
 
 
101
            # Mark resources in-use and update stats
 
102
            self._update_usage_from_instance(self.compute_node, instance_ref)
 
103
 
 
104
            # persist changes to the compute node:
 
105
            self._update(context, self.compute_node)
 
106
 
 
107
            return claim
 
108
 
 
109
        else:
 
110
            raise exception.ComputeResourcesUnavailable()
 
111
 
 
112
    @lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
 
113
    def resize_claim(self, context, instance_ref, instance_type, limits=None):
 
114
        """Indicate that resources are needed for a resize operation to this
 
115
        compute host.
 
116
        :param context: security context
 
117
        :param instance_ref: instance to reserve resources for
 
118
        :param instance_type: new instance_type being resized to
 
119
        :param limits: Dict of oversubscription limits for memory, disk,
 
120
                       and CPUs.
 
121
        :returns: A Claim ticket representing the reserved resources.  This
 
122
                  should be turned into finalize  a resource claim or free
 
123
                  resources after the compute operation is finished.
 
124
        """
 
125
        if self.disabled:
 
126
            # compute_driver doesn't support resource tracking, just
 
127
            # generate the migration record and continue the resize:
 
128
            migration_ref = self._create_migration(context, instance_ref,
 
129
                    instance_type)
 
130
            return claims.NopClaim(migration=migration_ref)
 
131
 
 
132
        claim = claims.ResizeClaim(instance_ref, instance_type, self)
 
133
 
 
134
        if claim.test(self.compute_node, limits):
 
135
 
 
136
            migration_ref = self._create_migration(context, instance_ref,
 
137
                    instance_type)
 
138
            claim.migration = migration_ref
 
139
 
 
140
            # Mark the resources in-use for the resize landing on this
 
141
            # compute host:
 
142
            self._update_usage_from_migration(self.compute_node, migration_ref)
 
143
            self._update(context, self.compute_node)
 
144
 
 
145
            return claim
 
146
 
 
147
        else:
 
148
            raise exception.ComputeResourcesUnavailable()
 
149
 
 
150
    def _create_migration(self, context, instance, instance_type):
 
151
        """Create a migration record for the upcoming resize.  This should
 
152
        be done while the COMPUTE_RESOURCES_SEMAPHORE is held so the resource
 
153
        claim will not be lost if the audit process starts.
 
154
        """
 
155
        # TODO(russellb): no-db-compute: Send the old instance type
 
156
        # info that is needed via rpc so db access isn't required
 
157
        # here.
 
158
        old_instance_type_id = instance['instance_type_id']
 
159
        old_instance_type = instance_types.get_instance_type(
 
160
                old_instance_type_id)
 
161
 
 
162
        return db.migration_create(context.elevated(),
 
163
                {'instance_uuid': instance['uuid'],
 
164
                 'source_compute': instance['host'],
 
165
                 'dest_compute': self.host,
 
166
                 'dest_host': self.driver.get_host_ip_addr(),
 
167
                 'old_instance_type_id': old_instance_type['id'],
 
168
                 'new_instance_type_id': instance_type['id'],
 
169
                 'status': 'pre-migrating'})
 
170
 
 
171
    def _set_instance_host(self, context, instance_ref):
 
172
        """Tag the instance as belonging to this host.  This should be done
 
173
        while the COMPUTE_RESOURCES_SEMPAHORE is held so the resource claim
 
174
        will not be lost if the audit process starts.
 
175
        """
 
176
        values = {'host': self.host, 'launched_on': self.host}
 
177
        (old_ref, new_ref) = db.instance_update_and_get_original(context,
 
178
                instance_ref['uuid'], values)
 
179
        notifications.send_update(context, old_ref, new_ref)
 
180
        instance_ref['host'] = self.host
 
181
        instance_ref['launched_on'] = self.host
 
182
 
 
183
    def abort_instance_claim(self, instance):
 
184
        """Remove usage from the given instance"""
 
185
        # flag the instance as deleted to revert the resource usage
 
186
        # and associated stats:
 
187
        instance['vm_state'] = vm_states.DELETED
 
188
        self._update_usage_from_instance(self.compute_node, instance)
 
189
 
 
190
        ctxt = context.get_admin_context()
 
191
        self._update(ctxt, self.compute_node)
 
192
 
 
193
    def abort_resize_claim(self, instance_uuid, instance_type):
 
194
        """Remove usage for an incoming migration"""
 
195
        if instance_uuid in self.tracked_migrations:
 
196
            migration, itype = self.tracked_migrations.pop(instance_uuid)
 
197
 
 
198
            if instance_type['id'] == migration['new_instance_type_id']:
 
199
                self.stats.update_stats_for_migration(itype, sign=-1)
 
200
                self._update_usage(self.compute_node, itype, sign=-1)
 
201
 
 
202
                ctxt = context.get_admin_context()
 
203
                self._update(ctxt, self.compute_node)
 
204
 
 
205
    @lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
339
206
    def update_usage(self, context, instance):
340
207
        """Update the resource usage and stats after a change in an
341
208
        instance
343
210
        if self.disabled:
344
211
            return
345
212
 
 
213
        uuid = instance['uuid']
 
214
 
346
215
        # don't update usage for this instance unless it submitted a resource
347
216
        # claim first:
348
 
        uuid = instance['uuid']
349
217
        if uuid in self.tracked_instances:
350
218
            self._update_usage_from_instance(self.compute_node, instance)
351
219
            self._update(context.elevated(), self.compute_node)
354
222
    def disabled(self):
355
223
        return self.compute_node is None
356
224
 
357
 
    @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
 
225
    @lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
358
226
    def update_available_resource(self, context):
359
227
        """Override in-memory calculations of compute node resource usage based
360
228
        on data audited from the hypervisor layer.
363
231
        declared a need for resources, but not necessarily retrieved them from
364
232
        the hypervisor layer yet.
365
233
        """
366
 
        resources = self.driver.get_available_resource()
 
234
        LOG.audit(_("Auditing locally available compute resources"))
 
235
        resources = self.driver.get_available_resource(self.nodename)
367
236
        if not resources:
368
237
            # The virt driver does not support this function
369
238
            LOG.audit(_("Virt driver does not support "
370
239
                "'get_available_resource'  Compute tracking is disabled."))
371
240
            self.compute_node = None
372
 
            self.claims = {}
373
241
            return
374
242
 
375
243
        self._verify_resources(resources)
376
244
 
377
245
        self._report_hypervisor_resource_view(resources)
378
246
 
379
 
        self._purge_expired_claims()
380
 
 
381
 
        # Grab all instances assigned to this host:
382
 
        filters = {'host': self.host, 'deleted': False}
383
 
        instances = db.instance_get_all_by_filters(context, filters)
 
247
        # Grab all instances assigned to this node:
 
248
        instances = db.instance_get_all_by_host_and_node(context, self.host,
 
249
                                                         self.nodename)
384
250
 
385
251
        # Now calculate usage based on instance utilization:
386
252
        self._update_usage_from_instances(resources, instances)
 
253
 
 
254
        # Grab all in-progress migrations:
 
255
        migrations = db.migration_get_in_progress_by_host(context, self.host)
 
256
 
 
257
        self._update_usage_from_migrations(resources, migrations)
 
258
 
387
259
        self._report_final_resource_view(resources)
388
260
 
389
261
        self._sync_compute_node(context, resources)
397
269
                # no service record, disable resource
398
270
                return
399
271
 
400
 
            compute_node_ref = service['compute_node']
401
 
            if compute_node_ref:
402
 
                self.compute_node = compute_node_ref[0]
 
272
            compute_node_refs = service['compute_node']
 
273
            if compute_node_refs:
 
274
                for cn in compute_node_refs:
 
275
                    if cn.get('hypervisor_hostname') == self.nodename:
 
276
                        self.compute_node = cn
 
277
                        break
403
278
 
404
279
        if not self.compute_node:
405
280
            # Need to create the ComputeNode record:
412
287
            self._update(context, resources, prune_stats=True)
413
288
            LOG.info(_('Compute_service record updated for %s ') % self.host)
414
289
 
415
 
    def _purge_expired_claims(self):
416
 
        """Purge expired resource claims"""
417
 
        for claim_id in self.claims.keys():
418
 
            c = self.claims[claim_id]
419
 
            if c.is_expired():
420
 
                # if this claim is expired, just expunge it.
421
 
                # it is assumed that the instance will eventually get built
422
 
                # successfully.
423
 
                LOG.audit(_("Expiring resource claim %s"), claim_id)
424
 
                self.claims.pop(claim_id)
425
 
 
426
290
    def _create(self, context, values):
427
291
        """Create the compute node in the DB"""
428
292
        # initialize load stats from existing instances:
475
339
                self.compute_node['id'], values, prune_stats)
476
340
        self.compute_node = dict(compute_node)
477
341
 
 
342
    def confirm_resize(self, context, migration, status='confirmed'):
 
343
        """Cleanup usage for a confirmed resize"""
 
344
        elevated = context.elevated()
 
345
        db.migration_update(elevated, migration['id'],
 
346
                            {'status': status})
 
347
        self.update_available_resource(elevated)
 
348
 
 
349
    def revert_resize(self, context, migration, status='reverted'):
 
350
        """Cleanup usage for a reverted resize"""
 
351
        self.confirm_resize(context, migration, status)
 
352
 
 
353
    def _update_usage(self, resources, usage, sign=1):
 
354
        resources['memory_mb_used'] += sign * usage['memory_mb']
 
355
        resources['local_gb_used'] += sign * usage['root_gb']
 
356
        resources['local_gb_used'] += sign * usage['ephemeral_gb']
 
357
 
 
358
        # free ram and disk may be negative, depending on policy:
 
359
        resources['free_ram_mb'] = (resources['memory_mb'] -
 
360
                                    resources['memory_mb_used'])
 
361
        resources['free_disk_gb'] = (resources['local_gb'] -
 
362
                                     resources['local_gb_used'])
 
363
 
 
364
        resources['running_vms'] = self.stats.num_instances
 
365
        resources['vcpus_used'] = self.stats.num_vcpus_used
 
366
 
 
367
    def _update_usage_from_migration(self, resources, migration):
 
368
        """Update usage for a single migration.  The record may
 
369
        represent an incoming or outbound migration.
 
370
        """
 
371
        uuid = migration['instance_uuid']
 
372
        LOG.audit("Updating from migration %s" % uuid)
 
373
 
 
374
        incoming = (migration['dest_compute'] == self.host)
 
375
        outbound = (migration['source_compute'] == self.host)
 
376
        same_host = (incoming and outbound)
 
377
 
 
378
        instance = self.tracked_instances.get(uuid, None)
 
379
        itype = None
 
380
 
 
381
        if same_host:
 
382
            # same host resize. record usage for whichever instance type the
 
383
            # instance is *not* in:
 
384
            if (instance['instance_type_id'] ==
 
385
                migration['old_instance_type_id']):
 
386
 
 
387
                itype = migration['new_instance_type_id']
 
388
            else:
 
389
                # instance record already has new flavor, hold space for a
 
390
                # possible revert to the old instance type:
 
391
                itype = migration['old_instance_type_id']
 
392
 
 
393
        elif incoming and not instance:
 
394
            # instance has not yet migrated here:
 
395
            itype = migration['new_instance_type_id']
 
396
 
 
397
        elif outbound and not instance:
 
398
            # instance migrated, but record usage for a possible revert:
 
399
            itype = migration['old_instance_type_id']
 
400
 
 
401
        if itype:
 
402
            instance_type = instance_types.get_instance_type(itype)
 
403
            self.stats.update_stats_for_migration(instance_type)
 
404
            self._update_usage(resources, instance_type)
 
405
            resources['stats'] = self.stats
 
406
            self.tracked_migrations[uuid] = (migration, instance_type)
 
407
 
 
408
    def _update_usage_from_migrations(self, resources, migrations):
 
409
 
 
410
        self.tracked_migrations.clear()
 
411
 
 
412
        filtered = {}
 
413
 
 
414
        # do some defensive filtering against bad migrations records in the
 
415
        # database:
 
416
        for migration in migrations:
 
417
 
 
418
            instance = migration['instance']
 
419
 
 
420
            if not instance:
 
421
                # migration referencing deleted instance
 
422
                continue
 
423
 
 
424
            uuid = instance['uuid']
 
425
 
 
426
            # skip migration if instance isn't in a resize state:
 
427
            if not self._instance_in_resize_state(instance):
 
428
                LOG.warn(_("Instance not resizing, skipping migration."),
 
429
                         instance_uuid=uuid)
 
430
                continue
 
431
 
 
432
            # filter to most recently updated migration for each instance:
 
433
            m = filtered.get(uuid, None)
 
434
            if not m or migration['updated_at'] >= m['updated_at']:
 
435
                filtered[uuid] = migration
 
436
 
 
437
        for migration in filtered.values():
 
438
            self._update_usage_from_migration(resources, migration)
 
439
 
478
440
    def _update_usage_from_instance(self, resources, instance):
479
441
        """Update usage for a single instance."""
480
442
 
483
445
        is_deleted_instance = instance['vm_state'] == vm_states.DELETED
484
446
 
485
447
        if is_new_instance:
486
 
            self.tracked_instances[uuid] = 1
 
448
            self.tracked_instances[uuid] = jsonutils.to_primitive(instance)
487
449
            sign = 1
488
450
 
489
 
        if instance['vm_state'] == vm_states.DELETED:
 
451
        if is_deleted_instance:
490
452
            self.tracked_instances.pop(uuid)
491
453
            sign = -1
492
454
 
495
457
        # if it's a new or deleted instance:
496
458
        if is_new_instance or is_deleted_instance:
497
459
            # new instance, update compute node resource usage:
498
 
            resources['memory_mb_used'] += sign * instance['memory_mb']
499
 
            resources['local_gb_used'] += sign * instance['root_gb']
500
 
            resources['local_gb_used'] += sign * instance['ephemeral_gb']
501
 
 
502
 
            # free ram and disk may be negative, depending on policy:
503
 
            resources['free_ram_mb'] = (resources['memory_mb'] -
504
 
                                        resources['memory_mb_used'])
505
 
            resources['free_disk_gb'] = (resources['local_gb'] -
506
 
                                         resources['local_gb_used'])
507
 
 
508
 
            resources['running_vms'] = self.stats.num_instances
509
 
            resources['vcpus_used'] = self.stats.num_vcpus_used
 
460
            self._update_usage(resources, instance, sign=sign)
510
461
 
511
462
        resources['current_workload'] = self.stats.calculate_workload()
512
463
        resources['stats'] = self.stats
523
474
        self.stats.clear()
524
475
 
525
476
        # set some intiial values, reserve room for host/hypervisor:
526
 
        resources['local_gb_used'] = FLAGS.reserved_host_disk_mb / 1024
527
 
        resources['memory_mb_used'] = FLAGS.reserved_host_memory_mb
 
477
        resources['local_gb_used'] = CONF.reserved_host_disk_mb / 1024
 
478
        resources['memory_mb_used'] = CONF.reserved_host_memory_mb
528
479
        resources['vcpus_used'] = 0
529
480
        resources['free_ram_mb'] = (resources['memory_mb'] -
530
481
                                    resources['memory_mb_used'])
544
495
        if missing_keys:
545
496
            reason = _("Missing keys: %s") % missing_keys
546
497
            raise exception.InvalidInput(reason=reason)
 
498
 
 
499
    def _instance_in_resize_state(self, instance):
 
500
        vm = instance['vm_state']
 
501
        task = instance['task_state']
 
502
 
 
503
        if vm == vm_states.RESIZED:
 
504
            return True
 
505
 
 
506
        if (vm == vm_states.ACTIVE and task in [task_states.RESIZE_PREP,
 
507
                task_states.RESIZE_MIGRATING, task_states.RESIZE_MIGRATED,
 
508
                task_states.RESIZE_FINISH]):
 
509
            return True
 
510
 
 
511
        return False