22
from nova.compute import claims
23
from nova.compute import instance_types
24
from nova.compute import task_states
22
25
from nova.compute import vm_states
26
from nova import context
23
27
from nova import db
24
28
from nova import exception
25
from nova import flags
29
from nova import notifications
26
30
from nova.openstack.common import cfg
27
31
from nova.openstack.common import importutils
28
32
from nova.openstack.common import jsonutils
33
from nova.openstack.common import lockutils
29
34
from nova.openstack.common import log as logging
30
from nova.openstack.common import timeutils
31
from nova import utils
33
36
resource_tracker_opts = [
34
37
cfg.IntOpt('reserved_host_disk_mb', default=0,
35
38
help='Amount of disk in MB to reserve for the host'),
36
39
cfg.IntOpt('reserved_host_memory_mb', default=512,
37
40
help='Amount of memory in MB to reserve for the host'),
38
cfg.IntOpt('claim_timeout_seconds', default=600,
39
help='How long, in seconds, before a resource claim times out'),
40
41
cfg.StrOpt('compute_stats_class',
41
42
default='nova.compute.stats.Stats',
42
43
help='Class that will manage stats for the local compute host')
46
FLAGS.register_opts(resource_tracker_opts)
47
CONF.register_opts(resource_tracker_opts)
48
49
LOG = logging.getLogger(__name__)
49
COMPUTE_RESOURCE_SEMAPHORE = "compute_resources"
53
"""A declaration that a compute host operation will require free resources.
55
This information will be used to help keep the local compute hosts's
56
ComputeNode model in sync to aid the scheduler in making efficient / more
57
correct decisions with respect to host selection.
60
def __init__(self, instance, timeout):
61
self.instance = jsonutils.to_primitive(instance)
62
self.timeout = timeout
63
self.expire_ts = timeutils.utcnow_ts() + timeout
66
"""Determine if this adjustment is old enough that we can assume it's
69
return timeutils.utcnow_ts() > self.expire_ts
73
return self.instance['uuid']
77
return self.instance['root_gb'] + self.instance['ephemeral_gb']
81
return self.instance['memory_mb']
85
return self.instance['vcpus']
88
return "[Claim %s: %d MB memory, %d GB disk, %d VCPUS]" % \
89
(self.claim_id, self.memory_mb, self.disk_gb, self.vcpus)
92
class ResourceContextManager(object):
93
def __init__(self, context, claim, tracker):
94
self.context = context
96
self.tracker = tracker
99
if not self.claim and not self.tracker.disabled:
100
# insufficient resources to complete request
101
raise exception.ComputeResourcesUnavailable()
103
def __exit__(self, exc_type, exc_val, exc_tb):
108
self.tracker.finish_resource_claim(self.claim)
110
self.tracker.abort_resource_claim(self.context, self.claim)
50
COMPUTE_RESOURCE_SEMAPHORE = claims.COMPUTE_RESOURCE_SEMAPHORE
113
53
class ResourceTracker(object):
141
76
:param instance_ref: instance to reserve resources for
142
77
:param limits: Dict of oversubscription limits for memory, disk,
144
:param timeout: How long, in seconds, the operation that requires
145
these resources should take to actually allocate what
146
it needs from the hypervisor. If the timeout is
147
exceeded, the new resource claim will assume caller
148
before releasing the resources.
149
:returns: An integer 'claim ticket'. This should be turned into
150
finalize a resource claim or free resources after the
151
compute operation is finished. Returns None if the claim
161
timeout = FLAGS.claim_timeout_seconds
163
# If an individual limit is None, the resource will be considered
165
memory_mb_limit = limits.get('memory_mb')
166
disk_gb_limit = limits.get('disk_gb')
167
vcpu_limit = limits.get('vcpu')
169
memory_mb = instance_ref['memory_mb']
170
disk_gb = instance_ref['root_gb'] + instance_ref['ephemeral_gb']
171
vcpus = instance_ref['vcpus']
173
msg = _("Attempting claim: memory %(memory_mb)d MB, disk %(disk_gb)d "
174
"GB, VCPUs %(vcpus)d") % locals()
177
# Test for resources:
178
if not self._can_claim_memory(memory_mb, memory_mb_limit):
181
if not self._can_claim_disk(disk_gb, disk_gb_limit):
184
if not self._can_claim_cpu(vcpus, vcpu_limit):
187
# keep track of this claim until we know whether the compute operation
188
# was successful/completed:
189
claim = Claim(instance_ref, timeout)
190
self.claims[claim.claim_id] = claim
192
# Mark resources in-use and update stats
193
self._update_usage_from_instance(self.compute_node, instance_ref)
195
# persist changes to the compute node:
196
self._update(context, self.compute_node)
199
def _can_claim_memory(self, memory_mb, memory_mb_limit):
200
"""Test if memory needed for a claim can be safely allocated"""
201
# Installed memory and usage info:
202
msg = _("Total memory: %(total_mem)d MB, used: %(used_mem)d MB, free: "
203
"%(free_mem)d MB") % dict(
204
total_mem=self.compute_node['memory_mb'],
205
used_mem=self.compute_node['memory_mb_used'],
206
free_mem=self.compute_node['local_gb_used'])
209
if memory_mb_limit is None:
210
# treat memory as unlimited:
211
LOG.audit(_("Memory limit not specified, defaulting to unlimited"))
214
free_ram_mb = memory_mb_limit - self.compute_node['memory_mb_used']
216
# Oversubscribed memory policy info:
217
msg = _("Memory limit: %(memory_mb_limit)d MB, free: "
218
"%(free_ram_mb)d MB") % locals()
221
can_claim_mem = memory_mb <= free_ram_mb
223
if not can_claim_mem:
224
msg = _("Unable to claim resources. Free memory %(free_ram_mb)d "
225
"MB < requested memory %(memory_mb)d MB") % locals()
230
def _can_claim_disk(self, disk_gb, disk_gb_limit):
231
"""Test if disk space needed can be safely allocated"""
232
# Installed disk and usage info:
233
msg = _("Total disk: %(total_disk)d GB, used: %(used_disk)d GB, free: "
234
"%(free_disk)d GB") % dict(
235
total_disk=self.compute_node['local_gb'],
236
used_disk=self.compute_node['local_gb_used'],
237
free_disk=self.compute_node['free_disk_gb'])
240
if disk_gb_limit is None:
241
# treat disk as unlimited:
242
LOG.audit(_("Disk limit not specified, defaulting to unlimited"))
245
free_disk_gb = disk_gb_limit - self.compute_node['local_gb_used']
247
# Oversubscribed disk policy info:
248
msg = _("Disk limit: %(disk_gb_limit)d GB, free: "
249
"%(free_disk_gb)d GB") % locals()
252
can_claim_disk = disk_gb <= free_disk_gb
253
if not can_claim_disk:
254
msg = _("Unable to claim resources. Free disk %(free_disk_gb)d GB"
255
" < requested disk %(disk_gb)d GB") % dict(
256
free_disk_gb=self.compute_node['free_disk_gb'],
260
return can_claim_disk
262
def _can_claim_cpu(self, vcpus, vcpu_limit):
263
"""Test if CPUs can be safely allocated according to given policy."""
265
msg = _("Total VCPUs: %(total_vcpus)d, used: %(used_vcpus)d") \
266
% dict(total_vcpus=self.compute_node['vcpus'],
267
used_vcpus=self.compute_node['vcpus_used'])
270
if vcpu_limit is None:
271
# treat cpu as unlimited:
272
LOG.audit(_("VCPU limit not specified, defaulting to unlimited"))
275
# Oversubscribed disk policy info:
276
msg = _("CPU limit: %(vcpu_limit)d") % locals()
279
free_vcpus = vcpu_limit - self.compute_node['vcpus_used']
280
can_claim_cpu = vcpus <= free_vcpus
282
if not can_claim_cpu:
283
msg = _("Unable to claim resources. Free CPU %(free_vcpus)d < "
284
"requested CPU %(vcpus)d") % locals()
289
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
290
def finish_resource_claim(self, claim):
291
"""Indicate that the compute operation that previously claimed the
292
resources identified by 'claim' has now completed and the resources
293
have been allocated at the virt layer.
295
Calling this keeps the available resource data more accurate and
296
timely than letting the claim timeout elapse and waiting for
297
update_available_resource to reflect the changed usage data.
299
:param claim: A claim indicating a set of resources that were
305
if self.claims.pop(claim.claim_id, None):
306
LOG.info(_("Finishing claim: %s") % claim)
308
LOG.info(_("Can't find claim %s. It may have been 'finished' "
309
"twice, or it has already timed out."), claim.claim_id)
311
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
312
def abort_resource_claim(self, context, claim):
313
"""Indicate that the operation that claimed the resources identified by
314
'claim_id' has either failed or been aborted and the resources are no
317
:param claim: A claim ticket indicating a set of resources that were
323
# un-claim the resources:
324
if self.claims.pop(claim.claim_id, None):
325
LOG.info(_("Aborting claim: %s") % claim)
326
# flag the instance as deleted to revert the resource usage
327
# and associated stats:
328
claim.instance['vm_state'] = vm_states.DELETED
329
self._update_usage_from_instance(self.compute_node, claim.instance)
330
self._update(context, self.compute_node)
333
# can't find the claim. this may mean the claim already timed
334
# out or it was already explicitly finished/aborted.
335
LOG.audit(_("Claim %s not found. It either timed out or was "
336
"already explicitly finished/aborted"), claim.claim_id)
338
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
79
:returns: A Claim ticket representing the reserved resources. It can
80
be used to revert the resource usage if an error occurs
81
during the instance build.
84
# compute_driver doesn't support resource tracking, just
85
# set the 'host' field and continue the build:
86
self._set_instance_host(context, instance_ref)
87
return claims.NopClaim()
90
if instance_ref['host']:
91
LOG.warning(_("Host field should not be set on the instance until "
92
"resources have been claimed."),
93
instance=instance_ref)
95
claim = claims.Claim(instance_ref, self)
97
if claim.test(self.compute_node, limits):
99
self._set_instance_host(context, instance_ref)
101
# Mark resources in-use and update stats
102
self._update_usage_from_instance(self.compute_node, instance_ref)
104
# persist changes to the compute node:
105
self._update(context, self.compute_node)
110
raise exception.ComputeResourcesUnavailable()
112
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
113
def resize_claim(self, context, instance_ref, instance_type, limits=None):
114
"""Indicate that resources are needed for a resize operation to this
116
:param context: security context
117
:param instance_ref: instance to reserve resources for
118
:param instance_type: new instance_type being resized to
119
:param limits: Dict of oversubscription limits for memory, disk,
121
:returns: A Claim ticket representing the reserved resources. This
122
should be turned into finalize a resource claim or free
123
resources after the compute operation is finished.
126
# compute_driver doesn't support resource tracking, just
127
# generate the migration record and continue the resize:
128
migration_ref = self._create_migration(context, instance_ref,
130
return claims.NopClaim(migration=migration_ref)
132
claim = claims.ResizeClaim(instance_ref, instance_type, self)
134
if claim.test(self.compute_node, limits):
136
migration_ref = self._create_migration(context, instance_ref,
138
claim.migration = migration_ref
140
# Mark the resources in-use for the resize landing on this
142
self._update_usage_from_migration(self.compute_node, migration_ref)
143
self._update(context, self.compute_node)
148
raise exception.ComputeResourcesUnavailable()
150
def _create_migration(self, context, instance, instance_type):
151
"""Create a migration record for the upcoming resize. This should
152
be done while the COMPUTE_RESOURCES_SEMAPHORE is held so the resource
153
claim will not be lost if the audit process starts.
155
# TODO(russellb): no-db-compute: Send the old instance type
156
# info that is needed via rpc so db access isn't required
158
old_instance_type_id = instance['instance_type_id']
159
old_instance_type = instance_types.get_instance_type(
160
old_instance_type_id)
162
return db.migration_create(context.elevated(),
163
{'instance_uuid': instance['uuid'],
164
'source_compute': instance['host'],
165
'dest_compute': self.host,
166
'dest_host': self.driver.get_host_ip_addr(),
167
'old_instance_type_id': old_instance_type['id'],
168
'new_instance_type_id': instance_type['id'],
169
'status': 'pre-migrating'})
171
def _set_instance_host(self, context, instance_ref):
172
"""Tag the instance as belonging to this host. This should be done
173
while the COMPUTE_RESOURCES_SEMPAHORE is held so the resource claim
174
will not be lost if the audit process starts.
176
values = {'host': self.host, 'launched_on': self.host}
177
(old_ref, new_ref) = db.instance_update_and_get_original(context,
178
instance_ref['uuid'], values)
179
notifications.send_update(context, old_ref, new_ref)
180
instance_ref['host'] = self.host
181
instance_ref['launched_on'] = self.host
183
def abort_instance_claim(self, instance):
184
"""Remove usage from the given instance"""
185
# flag the instance as deleted to revert the resource usage
186
# and associated stats:
187
instance['vm_state'] = vm_states.DELETED
188
self._update_usage_from_instance(self.compute_node, instance)
190
ctxt = context.get_admin_context()
191
self._update(ctxt, self.compute_node)
193
def abort_resize_claim(self, instance_uuid, instance_type):
194
"""Remove usage for an incoming migration"""
195
if instance_uuid in self.tracked_migrations:
196
migration, itype = self.tracked_migrations.pop(instance_uuid)
198
if instance_type['id'] == migration['new_instance_type_id']:
199
self.stats.update_stats_for_migration(itype, sign=-1)
200
self._update_usage(self.compute_node, itype, sign=-1)
202
ctxt = context.get_admin_context()
203
self._update(ctxt, self.compute_node)
205
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
339
206
def update_usage(self, context, instance):
340
207
"""Update the resource usage and stats after a change in an
475
339
self.compute_node['id'], values, prune_stats)
476
340
self.compute_node = dict(compute_node)
342
def confirm_resize(self, context, migration, status='confirmed'):
343
"""Cleanup usage for a confirmed resize"""
344
elevated = context.elevated()
345
db.migration_update(elevated, migration['id'],
347
self.update_available_resource(elevated)
349
def revert_resize(self, context, migration, status='reverted'):
350
"""Cleanup usage for a reverted resize"""
351
self.confirm_resize(context, migration, status)
353
def _update_usage(self, resources, usage, sign=1):
354
resources['memory_mb_used'] += sign * usage['memory_mb']
355
resources['local_gb_used'] += sign * usage['root_gb']
356
resources['local_gb_used'] += sign * usage['ephemeral_gb']
358
# free ram and disk may be negative, depending on policy:
359
resources['free_ram_mb'] = (resources['memory_mb'] -
360
resources['memory_mb_used'])
361
resources['free_disk_gb'] = (resources['local_gb'] -
362
resources['local_gb_used'])
364
resources['running_vms'] = self.stats.num_instances
365
resources['vcpus_used'] = self.stats.num_vcpus_used
367
def _update_usage_from_migration(self, resources, migration):
368
"""Update usage for a single migration. The record may
369
represent an incoming or outbound migration.
371
uuid = migration['instance_uuid']
372
LOG.audit("Updating from migration %s" % uuid)
374
incoming = (migration['dest_compute'] == self.host)
375
outbound = (migration['source_compute'] == self.host)
376
same_host = (incoming and outbound)
378
instance = self.tracked_instances.get(uuid, None)
382
# same host resize. record usage for whichever instance type the
383
# instance is *not* in:
384
if (instance['instance_type_id'] ==
385
migration['old_instance_type_id']):
387
itype = migration['new_instance_type_id']
389
# instance record already has new flavor, hold space for a
390
# possible revert to the old instance type:
391
itype = migration['old_instance_type_id']
393
elif incoming and not instance:
394
# instance has not yet migrated here:
395
itype = migration['new_instance_type_id']
397
elif outbound and not instance:
398
# instance migrated, but record usage for a possible revert:
399
itype = migration['old_instance_type_id']
402
instance_type = instance_types.get_instance_type(itype)
403
self.stats.update_stats_for_migration(instance_type)
404
self._update_usage(resources, instance_type)
405
resources['stats'] = self.stats
406
self.tracked_migrations[uuid] = (migration, instance_type)
408
def _update_usage_from_migrations(self, resources, migrations):
410
self.tracked_migrations.clear()
414
# do some defensive filtering against bad migrations records in the
416
for migration in migrations:
418
instance = migration['instance']
421
# migration referencing deleted instance
424
uuid = instance['uuid']
426
# skip migration if instance isn't in a resize state:
427
if not self._instance_in_resize_state(instance):
428
LOG.warn(_("Instance not resizing, skipping migration."),
432
# filter to most recently updated migration for each instance:
433
m = filtered.get(uuid, None)
434
if not m or migration['updated_at'] >= m['updated_at']:
435
filtered[uuid] = migration
437
for migration in filtered.values():
438
self._update_usage_from_migration(resources, migration)
478
440
def _update_usage_from_instance(self, resources, instance):
479
441
"""Update usage for a single instance."""