39
34
"""Scheduler that can be used for filtering and weighing."""
40
35
def __init__(self, *args, **kwargs):
41
36
super(FilterScheduler, self).__init__(*args, **kwargs)
42
self.cost_function_cache = {}
37
self.cost_function_cache = None
43
38
self.options = scheduler_options.SchedulerOptions()
45
def schedule_create_volume(self, context, volume_id, snapshot_id, image_id,
47
# NOTE: We're only focused on compute instances right now,
48
# so this method will always raise NoValidHost().
49
msg = _("No host selection for %s defined.") % FLAGS.volume_topic
50
raise exception.NoValidHost(reason=msg)
52
40
def schedule_run_instance(self, context, request_spec,
53
41
admin_password, injected_files,
54
42
requested_networks, is_first_time,
69
56
notifier.notify(context, notifier.publisher_id("scheduler"),
70
57
'scheduler.run_instance.start', notifier.INFO, payload)
72
weighted_hosts = self._schedule(context, "compute", request_spec,
73
filter_properties, instance_uuids)
59
weighed_hosts = self._schedule(context, request_spec,
60
filter_properties, instance_uuids)
75
62
# NOTE(comstud): Make sure we do not pass this through. It
76
63
# contains an instance of RpcContext that cannot be serialized.
115
102
the prep_resize operation to it.
118
hosts = self._schedule(context, 'compute', request_spec,
119
filter_properties, [instance['uuid']])
105
weighed_hosts = self._schedule(context, request_spec,
106
filter_properties, [instance['uuid']])
107
if not weighed_hosts:
121
108
raise exception.NoValidHost(reason="")
109
weighed_host = weighed_hosts.pop(0)
111
self._post_select_populate_filter_properties(filter_properties,
114
# context is not serializable
115
filter_properties.pop('context', None)
124
117
# Forward off to the host
125
118
self.compute_rpcapi.prep_resize(context, image, instance,
126
instance_type, host.host_state.host, reservations)
119
instance_type, weighed_host.obj.host, reservations,
120
request_spec=request_spec, filter_properties=filter_properties)
128
def _provision_resource(self, context, weighted_host, request_spec,
122
def _provision_resource(self, context, weighed_host, request_spec,
129
123
filter_properties, requested_networks, injected_files,
130
124
admin_password, is_first_time, instance_uuid=None):
131
125
"""Create the requested resource in this Zone."""
132
# Add a retry entry for the selected compute host:
133
self._add_retry_host(filter_properties, weighted_host.host_state.host)
135
self._add_oversubscription_policy(filter_properties,
136
weighted_host.host_state)
138
126
payload = dict(request_spec=request_spec,
139
weighted_host=weighted_host.to_dict(),
127
weighted_host=weighed_host.to_dict(),
140
128
instance_id=instance_uuid)
141
129
notifier.notify(context, notifier.publisher_id("scheduler"),
142
130
'scheduler.run_instance.scheduled', notifier.INFO,
133
# TODO(NTTdocomo): Combine the next two updates into one
134
driver.db_instance_node_set(context,
135
instance_uuid, weighed_host.obj.nodename)
145
136
updated_instance = driver.instance_update_db(context,
146
instance_uuid, weighted_host.host_state.host)
139
self._post_select_populate_filter_properties(filter_properties,
148
142
self.compute_rpcapi.run_instance(context, instance=updated_instance,
149
host=weighted_host.host_state.host,
143
host=weighed_host.obj.host,
150
144
request_spec=request_spec, filter_properties=filter_properties,
151
145
requested_networks=requested_networks,
152
146
injected_files=injected_files,
153
147
admin_password=admin_password, is_first_time=is_first_time)
149
def _post_select_populate_filter_properties(self, filter_properties,
151
"""Add additional information to the filter properties after a host has
152
been selected by the scheduling process.
154
# Add a retry entry for the selected compute host:
155
self._add_retry_host(filter_properties, host_state.host)
157
self._add_oversubscription_policy(filter_properties, host_state)
155
159
def _add_retry_host(self, filter_properties, host):
156
160
"""Add a retry entry for the selected compute host. In the event that
157
161
the request gets re-scheduled, this entry will signal that the given
210
218
"instance %(instance_uuid)s") % locals()
211
219
raise exception.NoValidHost(reason=msg)
213
def _schedule(self, context, topic, request_spec, filter_properties,
221
def _schedule(self, context, request_spec, filter_properties,
214
222
instance_uuids=None):
215
223
"""Returns a list of hosts that meet the required specs,
216
224
ordered by their fitness.
218
226
elevated = context.elevated()
219
if topic != "compute":
220
msg = _("Scheduler only understands Compute nodes (for now)")
221
raise NotImplementedError(msg)
223
227
instance_properties = request_spec['instance_properties']
224
228
instance_type = request_spec.get("instance_type", None)
226
cost_functions = self.get_cost_functions()
227
230
config_options = self._get_configuration_options()
229
232
# check retry policy. Rather ugly use of instance_uuids[0]...
272
271
LOG.debug(_("Filtered %(hosts)s") % locals())
274
# weighted_host = WeightedHost() ... the best
276
# TODO(comstud): filter_properties will also be used for
277
# weighing and I plan fold weighing into the host manager
278
# in a future patch. I'll address the naming of this
279
# variable at that time.
280
weighted_host = least_cost.weighted_sum(cost_functions,
281
hosts, filter_properties)
282
LOG.debug(_("Weighted %(weighted_host)s") % locals())
283
selected_hosts.append(weighted_host)
273
weighed_hosts = self.host_manager.get_weighed_hosts(hosts,
275
best_host = weighed_hosts[0]
276
LOG.debug(_("Choosing host %(best_host)s") % locals())
277
selected_hosts.append(best_host)
285
278
# Now consume the resources so the filter/weights
286
279
# will change for the next instance.
287
weighted_host.host_state.consume_from_instance(
290
selected_hosts.sort(key=operator.attrgetter('weight'))
280
best_host.obj.consume_from_instance(instance_properties)
291
281
return selected_hosts
293
def get_cost_functions(self, topic=None):
294
"""Returns a list of tuples containing weights and cost functions to
295
use for weighing hosts
298
# Schedulers only support compute right now.
300
if topic in self.cost_function_cache:
301
return self.cost_function_cache[topic]
304
for cost_fn_str in FLAGS.least_cost_functions:
305
if '.' in cost_fn_str:
306
short_name = cost_fn_str.split('.')[-1]
308
short_name = cost_fn_str
309
cost_fn_str = "%s.%s.%s" % (
310
__name__, self.__class__.__name__, short_name)
311
if not (short_name.startswith('%s_' % topic) or
312
short_name.startswith('noop')):
316
# NOTE: import_class is somewhat misnamed since
317
# the weighing function can be any non-class callable
319
cost_fn = importutils.import_class(cost_fn_str)
321
raise exception.SchedulerCostFunctionNotFound(
322
cost_fn_str=cost_fn_str)
325
flag_name = "%s_weight" % cost_fn.__name__
326
weight = getattr(FLAGS, flag_name)
327
except AttributeError:
328
raise exception.SchedulerWeightFlagNotFound(
330
cost_fns.append((weight, cost_fn))
332
self.cost_function_cache[topic] = cost_fns