22
from nova.compute import task_states
23
from nova.compute import vm_states
22
24
from nova import db
23
25
from nova import exception
24
from nova import flags
25
26
from nova.openstack.common import cfg
26
27
from nova.openstack.common import log as logging
27
28
from nova.openstack.common import timeutils
28
29
from nova.scheduler import filters
30
from nova.scheduler import weights
30
32
host_manager_opts = [
31
33
cfg.MultiStrOpt('scheduler_available_filters',
32
default=['nova.scheduler.filters.standard_filters'],
34
default=['nova.scheduler.filters.all_filters'],
33
35
help='Filter classes available to the scheduler which may '
34
36
'be specified more than once. An entry of '
35
37
'"nova.scheduler.filters.standard_filters" '
91
96
previously used and lock down access.
94
def __init__(self, host, topic, capabilities=None, service=None):
99
def __init__(self, host, node, capabilities=None, service=None):
98
# Read-only capability dicts
100
if capabilities is None:
102
self.capabilities = ReadOnlyDict(capabilities.get(topic, None))
105
self.service = ReadOnlyDict(service)
102
self.update_capabilities(capabilities, service)
106
104
# Mutable available resources.
107
105
# These will change as resources are virtually "consumed".
108
106
self.total_usable_disk_gb = 0
111
109
self.free_disk_mb = 0
112
110
self.vcpus_total = 0
113
111
self.vcpus_used = 0
112
# Valid vm types on this host: 'pv', 'hvm' or 'all'
113
if 'allowed_vm_type' in self.capabilities:
114
self.allowed_vm_type = self.capabilities['allowed_vm_type']
116
self.allowed_vm_type = 'all'
118
# Additional host information from the compute node stats:
120
self.task_states = {}
121
self.num_instances = 0
122
self.num_instances_by_project = {}
123
self.num_instances_by_os_type = {}
115
126
# Resource oversubscription values for the compute host:
131
def update_capabilities(self, capabilities=None, service=None):
132
# Read-only capability dicts
134
if capabilities is None:
136
self.capabilities = ReadOnlyDict(capabilities)
139
self.service = ReadOnlyDict(service)
118
141
def update_from_compute_node(self, compute):
119
142
"""Update information about a host from its compute_node info."""
143
if self.updated and self.updated > compute['updated_at']:
120
145
all_ram_mb = compute['memory_mb']
122
147
# Assume virtual size is all consumed by instances if use qcow2 disk.
133
158
self.free_disk_mb = free_disk_mb
134
159
self.vcpus_total = compute['vcpus']
135
160
self.vcpus_used = compute['vcpus_used']
161
self.updated = compute['updated_at']
163
stats = compute.get('stats', [])
164
statmap = self._statmap(stats)
166
# Track number of instances on host
167
self.num_instances = int(statmap.get('num_instances', 0))
169
# Track number of instances by project_id
170
project_id_keys = [k for k in statmap.keys() if
171
k.startswith("num_proj_")]
172
for key in project_id_keys:
174
self.num_instances_by_project[project_id] = int(statmap[key])
176
# Track number of instances in certain vm_states
177
vm_state_keys = [k for k in statmap.keys() if k.startswith("num_vm_")]
178
for key in vm_state_keys:
180
self.vm_states[vm_state] = int(statmap[key])
182
# Track number of instances in certain task_states
183
task_state_keys = [k for k in statmap.keys() if
184
k.startswith("num_task_")]
185
for key in task_state_keys:
187
self.task_states[task_state] = int(statmap[key])
189
# Track number of instances by host_type
190
os_keys = [k for k in statmap.keys() if k.startswith("num_os_type_")]
193
self.num_instances_by_os_type[os] = int(statmap[key])
195
self.num_io_ops = int(statmap.get('io_workload', 0))
137
197
def consume_from_instance(self, instance):
138
198
"""Incrementally update host state from an instance"""
142
202
self.free_ram_mb -= ram_mb
143
203
self.free_disk_mb -= disk_mb
144
204
self.vcpus_used += vcpus
146
def passes_filters(self, filter_fns, filter_properties):
147
"""Return whether or not this host passes filters."""
149
if self.host in filter_properties.get('ignore_hosts', []):
150
LOG.debug(_('Host filter fails for ignored host %(host)s'),
154
force_hosts = filter_properties.get('force_hosts', [])
156
if not self.host in force_hosts:
157
LOG.debug(_('Host filter fails for non-forced host %(host)s'),
159
return self.host in force_hosts
161
for filter_fn in filter_fns:
162
if not filter_fn(self, filter_properties):
163
LOG.debug(_('Host filter function %(func)s failed for '
165
{'func': repr(filter_fn),
169
LOG.debug(_('Host filter passes for %(host)s'), {'host': self.host})
205
self.updated = timeutils.utcnow()
207
# Track number of instances on host
208
self.num_instances += 1
210
# Track number of instances by project_id
211
project_id = instance.get('project_id')
212
if project_id not in self.num_instances_by_project:
213
self.num_instances_by_project[project_id] = 0
214
self.num_instances_by_project[project_id] += 1
216
# Track number of instances in certain vm_states
217
vm_state = instance.get('vm_state', vm_states.BUILDING)
218
if vm_state not in self.vm_states:
219
self.vm_states[vm_state] = 0
220
self.vm_states[vm_state] += 1
222
# Track number of instances in certain task_states
223
task_state = instance.get('task_state')
224
if task_state not in self.task_states:
225
self.task_states[task_state] = 0
226
self.task_states[task_state] += 1
228
# Track number of instances by host_type
229
os_type = instance.get('os_type')
230
if os_type not in self.num_instances_by_os_type:
231
self.num_instances_by_os_type[os_type] = 0
232
self.num_instances_by_os_type[os_type] += 1
234
vm_state = instance.get('vm_state', vm_states.BUILDING)
235
task_state = instance.get('task_state')
236
if vm_state == vm_states.BUILDING or task_state in [
237
task_states.RESIZE_MIGRATING, task_states.REBUILDING,
238
task_states.RESIZE_PREP, task_states.IMAGE_SNAPSHOT,
239
task_states.IMAGE_BACKUP]:
242
def _statmap(self, stats):
243
return dict((st['key'], st['value']) for st in stats)
172
245
def __repr__(self):
173
return ("host '%s': free_ram_mb:%s free_disk_mb:%s" %
174
(self.host, self.free_ram_mb, self.free_disk_mb))
246
return ("(%s, %s) ram:%s disk:%s io_ops:%s instances:%s vm_type:%s" %
247
(self.host, self.nodename, self.free_ram_mb, self.free_disk_mb,
248
self.num_io_ops, self.num_instances, self.allowed_vm_type))
177
251
class HostManager(object):
181
255
host_state_cls = HostState
183
257
def __init__(self):
184
self.service_states = {} # { <host> : { <service> : { cap k : v }}}
185
self.filter_classes = filters.get_filter_classes(
186
FLAGS.scheduler_available_filters)
258
# { (host, hypervisor_hostname) : { <service> : { cap k : v }}}
259
self.service_states = {}
260
self.host_state_map = {}
261
self.filter_handler = filters.HostFilterHandler()
262
self.filter_classes = self.filter_handler.get_matching_classes(
263
CONF.scheduler_available_filters)
264
self.weight_handler = weights.HostWeightHandler()
265
self.weight_classes = self.weight_handler.get_matching_classes(
266
CONF.scheduler_weight_classes)
188
def _choose_host_filters(self, filters):
268
def _choose_host_filters(self, filter_cls_names):
189
269
"""Since the caller may specify which filters to use we need
190
270
to have an authoritative list of what is permissible. This
191
271
function checks the filter names against a predefined set
192
272
of acceptable filters.
195
filters = FLAGS.scheduler_default_filters
196
if not isinstance(filters, (list, tuple)):
274
if filter_cls_names is None:
275
filter_cls_names = CONF.scheduler_default_filters
276
if not isinstance(filter_cls_names, (list, tuple)):
277
filter_cls_names = [filter_cls_names]
198
278
good_filters = []
200
for filter_name in filters:
280
for filter_name in filter_cls_names:
201
281
found_class = False
202
282
for cls in self.filter_classes:
203
283
if cls.__name__ == filter_name:
284
good_filters.append(cls)
204
285
found_class = True
205
filter_instance = cls()
206
# Get the filter function
207
filter_func = getattr(filter_instance,
210
good_filters.append(filter_func)
212
287
if not found_class:
213
288
bad_filters.append(filter_name)
216
291
raise exception.SchedulerHostFilterNotFound(filter_name=msg)
217
292
return good_filters
219
def filter_hosts(self, hosts, filter_properties, filters=None):
294
def get_filtered_hosts(self, hosts, filter_properties,
295
filter_class_names=None):
220
296
"""Filter hosts and return only ones passing all filters"""
222
filter_fns = self._choose_host_filters(filters)
224
if host.passes_filters(filter_fns, filter_properties):
225
filtered_hosts.append(host)
226
return filtered_hosts
297
filter_classes = self._choose_host_filters(filter_class_names)
300
ignore_hosts = set(filter_properties.get('ignore_hosts', []))
301
ignore_hosts = hosts & ignore_hosts
303
ignored_hosts = ', '.join(ignore_hosts)
304
msg = _('Host filter ignoring hosts: %(ignored_hosts)s')
305
LOG.debug(msg, locals())
306
hosts = hosts - ignore_hosts
308
force_hosts = set(filter_properties.get('force_hosts', []))
310
matching_force_hosts = hosts & force_hosts
311
if not matching_force_hosts:
312
forced_hosts = ', '.join(force_hosts)
313
msg = _("No hosts matched due to not matching 'force_hosts'"
314
"value of '%(forced_hosts)s'")
315
LOG.debug(msg, locals())
317
forced_hosts = ', '.join(matching_force_hosts)
318
msg = _('Host filter forcing available hosts to %(forced_hosts)s')
319
LOG.debug(msg, locals())
320
hosts = matching_force_hosts
322
return self.filter_handler.get_filtered_objects(filter_classes,
323
hosts, filter_properties)
325
def get_weighed_hosts(self, hosts, weight_properties):
326
"""Weigh the hosts"""
327
return self.weight_handler.get_weighed_objects(self.weight_classes,
328
hosts, weight_properties)
228
330
def update_service_capabilities(self, service_name, host, capabilities):
229
331
"""Update the per-service capabilities based on this notification."""
333
if service_name != 'compute':
334
LOG.debug(_('Ignoring %(service_name)s service update '
335
'from %(host)s'), locals())
338
state_key = (host, capabilities.get('hypervisor_hostname'))
230
339
LOG.debug(_("Received %(service_name)s service update from "
231
"%(host)s.") % locals())
232
service_caps = self.service_states.get(host, {})
340
"%(state_key)s.") % locals())
233
341
# Copy the capabilities, so we don't modify the original dict
234
342
capab_copy = dict(capabilities)
235
343
capab_copy["timestamp"] = timeutils.utcnow() # Reported time
236
service_caps[service_name] = capab_copy
237
self.service_states[host] = service_caps
239
def get_all_host_states(self, context, topic):
240
"""Returns a dict of all the hosts the HostManager
241
knows about. Also, each of the consumable resources in HostState
242
are pre-populated and adjusted based on data in the db.
245
{'192.168.1.100': HostState(), ...}
247
Note: this can be very slow with a lot of instances.
248
InstanceType table isn't required since a copy is stored
249
with the instance (in case the InstanceType changed since the
250
instance was created)."""
252
if topic != 'compute':
253
raise NotImplementedError(_(
254
"host_manager only implemented for 'compute'"))
344
self.service_states[state_key] = capab_copy
346
def get_all_host_states(self, context):
347
"""Returns a list of HostStates that represents all the hosts
348
the HostManager knows about. Also, each of the consumable resources
349
in HostState are pre-populated and adjusted based on data in the db.
258
352
# Get resource usage across the available compute nodes:
259
353
compute_nodes = db.compute_node_get_all(context)
263
357
LOG.warn(_("No service for compute ID %s") % compute['id'])
265
359
host = service['host']
266
capabilities = self.service_states.get(host, None)
267
host_state = self.host_state_cls(host, topic,
268
capabilities=capabilities,
269
service=dict(service.iteritems()))
360
node = compute.get('hypervisor_hostname')
361
state_key = (host, node)
362
capabilities = self.service_states.get(state_key, None)
363
host_state = self.host_state_map.get(state_key)
365
host_state.update_capabilities(capabilities,
366
dict(service.iteritems()))
368
host_state = self.host_state_cls(host, node,
369
capabilities=capabilities,
370
service=dict(service.iteritems()))
371
self.host_state_map[state_key] = host_state
270
372
host_state.update_from_compute_node(compute)
271
host_state_map[host] = host_state
273
return host_state_map
374
return self.host_state_map.itervalues()