~ubuntu-branches/ubuntu/raring/nova/raring-proposed

« back to all changes in this revision

Viewing changes to nova/scheduler/filter_scheduler.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Adam Gandelman, Chuck Short
  • Date: 2012-11-23 09:04:58 UTC
  • mfrom: (1.1.66)
  • Revision ID: package-import@ubuntu.com-20121123090458-91565o7aev1i1h71
Tags: 2013.1~g1-0ubuntu1
[ Adam Gandelman ]
* debian/control: Ensure novaclient is upgraded with nova,
  require python-keystoneclient >= 1:2.9.0. (LP: #1073289)
* debian/patches/{ubuntu/*, rbd-security.patch}: Dropped, applied
  upstream.
* debian/control: Add python-testtools to Build-Depends.

[ Chuck Short ]
* New upstream version.
* Refreshed debian/patches/avoid_setuptools_git_dependency.patch.
* debian/rules: FTBFS if missing binaries.
* debian/nova-scheudler.install: Add missing rabbit-queues and
  nova-rpc-zmq-receiver.
* Remove nova-volume since it doesnt exist anymore, transition to cinder-*.
* debian/rules: install apport hook in the right place.
* debian/patches/ubuntu-show-tests.patch: Display test failures.
* debian/control: Add depends on genisoimage
* debian/control: Suggest guestmount.
* debian/control: Suggest websockify. (LP: #1076442)
* debian/nova.conf: Disable nova-volume service.
* debian/control: Depend on xen-system-* rather than the hypervisor.
* debian/control, debian/mans/nova-conductor.8, debian/nova-conductor.init,
  debian/nova-conductor.install, debian/nova-conductor.logrotate
  debian/nova-conductor.manpages, debian/nova-conductor.postrm
  debian/nova-conductor.upstart.in: Add nova-conductor service.
* debian/control: Add python-fixtures as a build deps.

Show diffs side-by-side

added added

removed removed

Lines of Context:
19
19
Weighing Functions.
20
20
"""
21
21
 
22
 
import operator
23
 
 
24
22
from nova import exception
25
 
from nova import flags
26
 
from nova.openstack.common import importutils
 
23
from nova.openstack.common import cfg
27
24
from nova.openstack.common import log as logging
28
25
from nova.openstack.common.notifier import api as notifier
29
26
from nova.scheduler import driver
30
 
from nova.scheduler import least_cost
31
27
from nova.scheduler import scheduler_options
32
28
 
33
 
 
34
 
FLAGS = flags.FLAGS
 
29
CONF = cfg.CONF
35
30
LOG = logging.getLogger(__name__)
36
31
 
37
32
 
39
34
    """Scheduler that can be used for filtering and weighing."""
40
35
    def __init__(self, *args, **kwargs):
41
36
        super(FilterScheduler, self).__init__(*args, **kwargs)
42
 
        self.cost_function_cache = {}
 
37
        self.cost_function_cache = None
43
38
        self.options = scheduler_options.SchedulerOptions()
44
39
 
45
 
    def schedule_create_volume(self, context, volume_id, snapshot_id, image_id,
46
 
                               reservations):
47
 
        # NOTE: We're only focused on compute instances right now,
48
 
        # so this method will always raise NoValidHost().
49
 
        msg = _("No host selection for %s defined.") % FLAGS.volume_topic
50
 
        raise exception.NoValidHost(reason=msg)
51
 
 
52
40
    def schedule_run_instance(self, context, request_spec,
53
41
                              admin_password, injected_files,
54
42
                              requested_networks, is_first_time,
59
47
 
60
48
        Returns a list of the instances created.
61
49
        """
62
 
        elevated = context.elevated()
63
50
        instance_uuids = request_spec.get('instance_uuids')
64
51
        num_instances = len(instance_uuids)
65
52
        LOG.debug(_("Attempting to build %(num_instances)d instance(s)") %
69
56
        notifier.notify(context, notifier.publisher_id("scheduler"),
70
57
                        'scheduler.run_instance.start', notifier.INFO, payload)
71
58
 
72
 
        weighted_hosts = self._schedule(context, "compute", request_spec,
73
 
                                        filter_properties, instance_uuids)
 
59
        weighed_hosts = self._schedule(context, request_spec,
 
60
                filter_properties, instance_uuids)
74
61
 
75
62
        # NOTE(comstud): Make sure we do not pass this through.  It
76
63
        # contains an instance of RpcContext that cannot be serialized.
81
68
 
82
69
            try:
83
70
                try:
84
 
                    weighted_host = weighted_hosts.pop(0)
 
71
                    weighed_host = weighed_hosts.pop(0)
85
72
                except IndexError:
86
73
                    raise exception.NoValidHost(reason="")
87
74
 
88
 
                self._provision_resource(elevated, weighted_host,
 
75
                self._provision_resource(context, weighed_host,
89
76
                                         request_spec,
90
77
                                         filter_properties,
91
78
                                         requested_networks,
115
102
        the prep_resize operation to it.
116
103
        """
117
104
 
118
 
        hosts = self._schedule(context, 'compute', request_spec,
119
 
                               filter_properties, [instance['uuid']])
120
 
        if not hosts:
 
105
        weighed_hosts = self._schedule(context, request_spec,
 
106
                filter_properties, [instance['uuid']])
 
107
        if not weighed_hosts:
121
108
            raise exception.NoValidHost(reason="")
122
 
        host = hosts.pop(0)
 
109
        weighed_host = weighed_hosts.pop(0)
 
110
 
 
111
        self._post_select_populate_filter_properties(filter_properties,
 
112
                weighed_host.obj)
 
113
 
 
114
        # context is not serializable
 
115
        filter_properties.pop('context', None)
123
116
 
124
117
        # Forward off to the host
125
118
        self.compute_rpcapi.prep_resize(context, image, instance,
126
 
                instance_type, host.host_state.host, reservations)
 
119
                instance_type, weighed_host.obj.host, reservations,
 
120
                request_spec=request_spec, filter_properties=filter_properties)
127
121
 
128
 
    def _provision_resource(self, context, weighted_host, request_spec,
 
122
    def _provision_resource(self, context, weighed_host, request_spec,
129
123
            filter_properties, requested_networks, injected_files,
130
124
            admin_password, is_first_time, instance_uuid=None):
131
125
        """Create the requested resource in this Zone."""
132
 
        # Add a retry entry for the selected compute host:
133
 
        self._add_retry_host(filter_properties, weighted_host.host_state.host)
134
 
 
135
 
        self._add_oversubscription_policy(filter_properties,
136
 
                weighted_host.host_state)
137
 
 
138
126
        payload = dict(request_spec=request_spec,
139
 
                       weighted_host=weighted_host.to_dict(),
 
127
                       weighted_host=weighed_host.to_dict(),
140
128
                       instance_id=instance_uuid)
141
129
        notifier.notify(context, notifier.publisher_id("scheduler"),
142
130
                        'scheduler.run_instance.scheduled', notifier.INFO,
143
131
                        payload)
144
132
 
 
133
        # TODO(NTTdocomo): Combine the next two updates into one
 
134
        driver.db_instance_node_set(context,
 
135
                instance_uuid, weighed_host.obj.nodename)
145
136
        updated_instance = driver.instance_update_db(context,
146
 
                instance_uuid, weighted_host.host_state.host)
 
137
                instance_uuid)
 
138
 
 
139
        self._post_select_populate_filter_properties(filter_properties,
 
140
                weighed_host.obj)
147
141
 
148
142
        self.compute_rpcapi.run_instance(context, instance=updated_instance,
149
 
                host=weighted_host.host_state.host,
 
143
                host=weighed_host.obj.host,
150
144
                request_spec=request_spec, filter_properties=filter_properties,
151
145
                requested_networks=requested_networks,
152
146
                injected_files=injected_files,
153
147
                admin_password=admin_password, is_first_time=is_first_time)
154
148
 
 
149
    def _post_select_populate_filter_properties(self, filter_properties,
 
150
            host_state):
 
151
        """Add additional information to the filter properties after a host has
 
152
        been selected by the scheduling process.
 
153
        """
 
154
        # Add a retry entry for the selected compute host:
 
155
        self._add_retry_host(filter_properties, host_state.host)
 
156
 
 
157
        self._add_oversubscription_policy(filter_properties, host_state)
 
158
 
155
159
    def _add_retry_host(self, filter_properties, host):
156
160
        """Add a retry entry for the selected compute host.  In the event that
157
161
        the request gets re-scheduled, this entry will signal that the given
174
178
        """Stuff things into filter_properties.  Can be overridden in a
175
179
        subclass to add more data.
176
180
        """
177
 
        pass
 
181
        # Save useful information from the request spec for filter processing:
 
182
        project_id = request_spec['instance_properties']['project_id']
 
183
        os_type = request_spec['instance_properties']['os_type']
 
184
        filter_properties['project_id'] = project_id
 
185
        filter_properties['os_type'] = os_type
178
186
 
179
187
    def _max_attempts(self):
180
 
        max_attempts = FLAGS.scheduler_max_attempts
 
188
        max_attempts = CONF.scheduler_max_attempts
181
189
        if max_attempts < 1:
182
190
            raise exception.NovaException(_("Invalid value for "
183
191
                "'scheduler_max_attempts', must be >= 1"))
210
218
                    "instance %(instance_uuid)s") % locals()
211
219
            raise exception.NoValidHost(reason=msg)
212
220
 
213
 
    def _schedule(self, context, topic, request_spec, filter_properties,
 
221
    def _schedule(self, context, request_spec, filter_properties,
214
222
                  instance_uuids=None):
215
223
        """Returns a list of hosts that meet the required specs,
216
224
        ordered by their fitness.
217
225
        """
218
226
        elevated = context.elevated()
219
 
        if topic != "compute":
220
 
            msg = _("Scheduler only understands Compute nodes (for now)")
221
 
            raise NotImplementedError(msg)
222
 
 
223
227
        instance_properties = request_spec['instance_properties']
224
228
        instance_type = request_spec.get("instance_type", None)
225
229
 
226
 
        cost_functions = self.get_cost_functions()
227
230
        config_options = self._get_configuration_options()
228
231
 
229
232
        # check retry policy.  Rather ugly use of instance_uuids[0]...
247
250
        # host, we virtually consume resources on it so subsequent
248
251
        # selections can adjust accordingly.
249
252
 
250
 
        # unfiltered_hosts_dict is {host : ZoneManager.HostInfo()}
251
 
        unfiltered_hosts_dict = self.host_manager.get_all_host_states(
252
 
                elevated, topic)
253
 
 
254
253
        # Note: remember, we are using an iterator here. So only
255
254
        # traverse this list once. This can bite you if the hosts
256
255
        # are being scanned in a filter or weighing function.
257
 
        hosts = unfiltered_hosts_dict.itervalues()
 
256
        hosts = self.host_manager.get_all_host_states(elevated)
258
257
 
259
258
        selected_hosts = []
260
259
        if instance_uuids:
263
262
            num_instances = request_spec.get('num_instances', 1)
264
263
        for num in xrange(num_instances):
265
264
            # Filter local hosts based on requirements ...
266
 
            hosts = self.host_manager.filter_hosts(hosts,
 
265
            hosts = self.host_manager.get_filtered_hosts(hosts,
267
266
                    filter_properties)
268
267
            if not hosts:
269
268
                # Can't get any more locally.
271
270
 
272
271
            LOG.debug(_("Filtered %(hosts)s") % locals())
273
272
 
274
 
            # weighted_host = WeightedHost() ... the best
275
 
            # host for the job.
276
 
            # TODO(comstud): filter_properties will also be used for
277
 
            # weighing and I plan fold weighing into the host manager
278
 
            # in a future patch.  I'll address the naming of this
279
 
            # variable at that time.
280
 
            weighted_host = least_cost.weighted_sum(cost_functions,
281
 
                    hosts, filter_properties)
282
 
            LOG.debug(_("Weighted %(weighted_host)s") % locals())
283
 
            selected_hosts.append(weighted_host)
284
 
 
 
273
            weighed_hosts = self.host_manager.get_weighed_hosts(hosts,
 
274
                    filter_properties)
 
275
            best_host = weighed_hosts[0]
 
276
            LOG.debug(_("Choosing host %(best_host)s") % locals())
 
277
            selected_hosts.append(best_host)
285
278
            # Now consume the resources so the filter/weights
286
279
            # will change for the next instance.
287
 
            weighted_host.host_state.consume_from_instance(
288
 
                    instance_properties)
289
 
 
290
 
        selected_hosts.sort(key=operator.attrgetter('weight'))
 
280
            best_host.obj.consume_from_instance(instance_properties)
291
281
        return selected_hosts
292
 
 
293
 
    def get_cost_functions(self, topic=None):
294
 
        """Returns a list of tuples containing weights and cost functions to
295
 
        use for weighing hosts
296
 
        """
297
 
        if topic is None:
298
 
            # Schedulers only support compute right now.
299
 
            topic = "compute"
300
 
        if topic in self.cost_function_cache:
301
 
            return self.cost_function_cache[topic]
302
 
 
303
 
        cost_fns = []
304
 
        for cost_fn_str in FLAGS.least_cost_functions:
305
 
            if '.' in cost_fn_str:
306
 
                short_name = cost_fn_str.split('.')[-1]
307
 
            else:
308
 
                short_name = cost_fn_str
309
 
                cost_fn_str = "%s.%s.%s" % (
310
 
                        __name__, self.__class__.__name__, short_name)
311
 
            if not (short_name.startswith('%s_' % topic) or
312
 
                    short_name.startswith('noop')):
313
 
                continue
314
 
 
315
 
            try:
316
 
                # NOTE: import_class is somewhat misnamed since
317
 
                # the weighing function can be any non-class callable
318
 
                # (i.e., no 'self')
319
 
                cost_fn = importutils.import_class(cost_fn_str)
320
 
            except ImportError:
321
 
                raise exception.SchedulerCostFunctionNotFound(
322
 
                        cost_fn_str=cost_fn_str)
323
 
 
324
 
            try:
325
 
                flag_name = "%s_weight" % cost_fn.__name__
326
 
                weight = getattr(FLAGS, flag_name)
327
 
            except AttributeError:
328
 
                raise exception.SchedulerWeightFlagNotFound(
329
 
                        flag_name=flag_name)
330
 
            cost_fns.append((weight, cost_fn))
331
 
 
332
 
        self.cost_function_cache[topic] = cost_fns
333
 
        return cost_fns