~ubuntu-branches/ubuntu/raring/nova/raring-proposed

« back to all changes in this revision

Viewing changes to nova/scheduler/host_manager.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Adam Gandelman, Chuck Short
  • Date: 2012-11-23 09:04:58 UTC
  • mfrom: (1.1.66)
  • Revision ID: package-import@ubuntu.com-20121123090458-91565o7aev1i1h71
Tags: 2013.1~g1-0ubuntu1
[ Adam Gandelman ]
* debian/control: Ensure novaclient is upgraded with nova,
  require python-keystoneclient >= 1:2.9.0. (LP: #1073289)
* debian/patches/{ubuntu/*, rbd-security.patch}: Dropped, applied
  upstream.
* debian/control: Add python-testtools to Build-Depends.

[ Chuck Short ]
* New upstream version.
* Refreshed debian/patches/avoid_setuptools_git_dependency.patch.
* debian/rules: FTBFS if missing binaries.
* debian/nova-scheudler.install: Add missing rabbit-queues and
  nova-rpc-zmq-receiver.
* Remove nova-volume since it doesnt exist anymore, transition to cinder-*.
* debian/rules: install apport hook in the right place.
* debian/patches/ubuntu-show-tests.patch: Display test failures.
* debian/control: Add depends on genisoimage
* debian/control: Suggest guestmount.
* debian/control: Suggest websockify. (LP: #1076442)
* debian/nova.conf: Disable nova-volume service.
* debian/control: Depend on xen-system-* rather than the hypervisor.
* debian/control, debian/mans/nova-conductor.8, debian/nova-conductor.init,
  debian/nova-conductor.install, debian/nova-conductor.logrotate
  debian/nova-conductor.manpages, debian/nova-conductor.postrm
  debian/nova-conductor.upstart.in: Add nova-conductor service.
* debian/control: Add python-fixtures as a build deps.

Show diffs side-by-side

added added

removed removed

Lines of Context:
19
19
 
20
20
import UserDict
21
21
 
 
22
from nova.compute import task_states
 
23
from nova.compute import vm_states
22
24
from nova import db
23
25
from nova import exception
24
 
from nova import flags
25
26
from nova.openstack.common import cfg
26
27
from nova.openstack.common import log as logging
27
28
from nova.openstack.common import timeutils
28
29
from nova.scheduler import filters
 
30
from nova.scheduler import weights
29
31
 
30
32
host_manager_opts = [
31
33
    cfg.MultiStrOpt('scheduler_available_filters',
32
 
            default=['nova.scheduler.filters.standard_filters'],
 
34
            default=['nova.scheduler.filters.all_filters'],
33
35
            help='Filter classes available to the scheduler which may '
34
36
                    'be specified more than once.  An entry of '
35
37
                    '"nova.scheduler.filters.standard_filters" '
45
47
                  ],
46
48
                help='Which filter class names to use for filtering hosts '
47
49
                      'when not specified in the request.'),
 
50
    cfg.ListOpt('scheduler_weight_classes',
 
51
                default=['nova.scheduler.weights.all_weighers'],
 
52
                help='Which weight class names to use for weighing hosts'),
48
53
    ]
49
54
 
50
 
FLAGS = flags.FLAGS
51
 
FLAGS.register_opts(host_manager_opts)
 
55
CONF = cfg.CONF
 
56
CONF.register_opts(host_manager_opts)
52
57
 
53
58
LOG = logging.getLogger(__name__)
54
59
 
91
96
    previously used and lock down access.
92
97
    """
93
98
 
94
 
    def __init__(self, host, topic, capabilities=None, service=None):
 
99
    def __init__(self, host, node, capabilities=None, service=None):
95
100
        self.host = host
96
 
        self.topic = topic
97
 
 
98
 
        # Read-only capability dicts
99
 
 
100
 
        if capabilities is None:
101
 
            capabilities = {}
102
 
        self.capabilities = ReadOnlyDict(capabilities.get(topic, None))
103
 
        if service is None:
104
 
            service = {}
105
 
        self.service = ReadOnlyDict(service)
 
101
        self.nodename = node
 
102
        self.update_capabilities(capabilities, service)
 
103
 
106
104
        # Mutable available resources.
107
105
        # These will change as resources are virtually "consumed".
108
106
        self.total_usable_disk_gb = 0
111
109
        self.free_disk_mb = 0
112
110
        self.vcpus_total = 0
113
111
        self.vcpus_used = 0
 
112
        # Valid vm types on this host: 'pv', 'hvm' or 'all'
 
113
        if 'allowed_vm_type' in self.capabilities:
 
114
            self.allowed_vm_type = self.capabilities['allowed_vm_type']
 
115
        else:
 
116
            self.allowed_vm_type = 'all'
 
117
 
 
118
        # Additional host information from the compute node stats:
 
119
        self.vm_states = {}
 
120
        self.task_states = {}
 
121
        self.num_instances = 0
 
122
        self.num_instances_by_project = {}
 
123
        self.num_instances_by_os_type = {}
 
124
        self.num_io_ops = 0
114
125
 
115
126
        # Resource oversubscription values for the compute host:
116
127
        self.limits = {}
117
128
 
 
129
        self.updated = None
 
130
 
 
131
    def update_capabilities(self, capabilities=None, service=None):
 
132
        # Read-only capability dicts
 
133
 
 
134
        if capabilities is None:
 
135
            capabilities = {}
 
136
        self.capabilities = ReadOnlyDict(capabilities)
 
137
        if service is None:
 
138
            service = {}
 
139
        self.service = ReadOnlyDict(service)
 
140
 
118
141
    def update_from_compute_node(self, compute):
119
142
        """Update information about a host from its compute_node info."""
 
143
        if self.updated and self.updated > compute['updated_at']:
 
144
            return
120
145
        all_ram_mb = compute['memory_mb']
121
146
 
122
147
        # Assume virtual size is all consumed by instances if use qcow2 disk.
133
158
        self.free_disk_mb = free_disk_mb
134
159
        self.vcpus_total = compute['vcpus']
135
160
        self.vcpus_used = compute['vcpus_used']
 
161
        self.updated = compute['updated_at']
 
162
 
 
163
        stats = compute.get('stats', [])
 
164
        statmap = self._statmap(stats)
 
165
 
 
166
        # Track number of instances on host
 
167
        self.num_instances = int(statmap.get('num_instances', 0))
 
168
 
 
169
        # Track number of instances by project_id
 
170
        project_id_keys = [k for k in statmap.keys() if
 
171
                k.startswith("num_proj_")]
 
172
        for key in project_id_keys:
 
173
            project_id = key[9:]
 
174
            self.num_instances_by_project[project_id] = int(statmap[key])
 
175
 
 
176
        # Track number of instances in certain vm_states
 
177
        vm_state_keys = [k for k in statmap.keys() if k.startswith("num_vm_")]
 
178
        for key in vm_state_keys:
 
179
            vm_state = key[7:]
 
180
            self.vm_states[vm_state] = int(statmap[key])
 
181
 
 
182
        # Track number of instances in certain task_states
 
183
        task_state_keys = [k for k in statmap.keys() if
 
184
                k.startswith("num_task_")]
 
185
        for key in task_state_keys:
 
186
            task_state = key[9:]
 
187
            self.task_states[task_state] = int(statmap[key])
 
188
 
 
189
        # Track number of instances by host_type
 
190
        os_keys = [k for k in statmap.keys() if k.startswith("num_os_type_")]
 
191
        for key in os_keys:
 
192
            os = key[12:]
 
193
            self.num_instances_by_os_type[os] = int(statmap[key])
 
194
 
 
195
        self.num_io_ops = int(statmap.get('io_workload', 0))
136
196
 
137
197
    def consume_from_instance(self, instance):
138
198
        """Incrementally update host state from an instance"""
142
202
        self.free_ram_mb -= ram_mb
143
203
        self.free_disk_mb -= disk_mb
144
204
        self.vcpus_used += vcpus
145
 
 
146
 
    def passes_filters(self, filter_fns, filter_properties):
147
 
        """Return whether or not this host passes filters."""
148
 
 
149
 
        if self.host in filter_properties.get('ignore_hosts', []):
150
 
            LOG.debug(_('Host filter fails for ignored host %(host)s'),
151
 
                      {'host': self.host})
152
 
            return False
153
 
 
154
 
        force_hosts = filter_properties.get('force_hosts', [])
155
 
        if force_hosts:
156
 
            if not self.host in force_hosts:
157
 
                LOG.debug(_('Host filter fails for non-forced host %(host)s'),
158
 
                          {'host': self.host})
159
 
            return self.host in force_hosts
160
 
 
161
 
        for filter_fn in filter_fns:
162
 
            if not filter_fn(self, filter_properties):
163
 
                LOG.debug(_('Host filter function %(func)s failed for '
164
 
                            '%(host)s'),
165
 
                          {'func': repr(filter_fn),
166
 
                           'host': self.host})
167
 
                return False
168
 
 
169
 
        LOG.debug(_('Host filter passes for %(host)s'), {'host': self.host})
170
 
        return True
 
205
        self.updated = timeutils.utcnow()
 
206
 
 
207
        # Track number of instances on host
 
208
        self.num_instances += 1
 
209
 
 
210
        # Track number of instances by project_id
 
211
        project_id = instance.get('project_id')
 
212
        if project_id not in self.num_instances_by_project:
 
213
            self.num_instances_by_project[project_id] = 0
 
214
        self.num_instances_by_project[project_id] += 1
 
215
 
 
216
        # Track number of instances in certain vm_states
 
217
        vm_state = instance.get('vm_state', vm_states.BUILDING)
 
218
        if vm_state not in self.vm_states:
 
219
            self.vm_states[vm_state] = 0
 
220
        self.vm_states[vm_state] += 1
 
221
 
 
222
        # Track number of instances in certain task_states
 
223
        task_state = instance.get('task_state')
 
224
        if task_state not in self.task_states:
 
225
            self.task_states[task_state] = 0
 
226
        self.task_states[task_state] += 1
 
227
 
 
228
        # Track number of instances by host_type
 
229
        os_type = instance.get('os_type')
 
230
        if os_type not in self.num_instances_by_os_type:
 
231
            self.num_instances_by_os_type[os_type] = 0
 
232
        self.num_instances_by_os_type[os_type] += 1
 
233
 
 
234
        vm_state = instance.get('vm_state', vm_states.BUILDING)
 
235
        task_state = instance.get('task_state')
 
236
        if vm_state == vm_states.BUILDING or task_state in [
 
237
                task_states.RESIZE_MIGRATING, task_states.REBUILDING,
 
238
                task_states.RESIZE_PREP, task_states.IMAGE_SNAPSHOT,
 
239
                task_states.IMAGE_BACKUP]:
 
240
            self.num_io_ops += 1
 
241
 
 
242
    def _statmap(self, stats):
 
243
        return dict((st['key'], st['value']) for st in stats)
171
244
 
172
245
    def __repr__(self):
173
 
        return ("host '%s': free_ram_mb:%s free_disk_mb:%s" %
174
 
                (self.host, self.free_ram_mb, self.free_disk_mb))
 
246
        return ("(%s, %s) ram:%s disk:%s io_ops:%s instances:%s vm_type:%s" %
 
247
                (self.host, self.nodename, self.free_ram_mb, self.free_disk_mb,
 
248
                 self.num_io_ops, self.num_instances, self.allowed_vm_type))
175
249
 
176
250
 
177
251
class HostManager(object):
181
255
    host_state_cls = HostState
182
256
 
183
257
    def __init__(self):
184
 
        self.service_states = {}  # { <host> : { <service> : { cap k : v }}}
185
 
        self.filter_classes = filters.get_filter_classes(
186
 
                FLAGS.scheduler_available_filters)
 
258
        # { (host, hypervisor_hostname) : { <service> : { cap k : v }}}
 
259
        self.service_states = {}
 
260
        self.host_state_map = {}
 
261
        self.filter_handler = filters.HostFilterHandler()
 
262
        self.filter_classes = self.filter_handler.get_matching_classes(
 
263
                CONF.scheduler_available_filters)
 
264
        self.weight_handler = weights.HostWeightHandler()
 
265
        self.weight_classes = self.weight_handler.get_matching_classes(
 
266
                CONF.scheduler_weight_classes)
187
267
 
188
 
    def _choose_host_filters(self, filters):
 
268
    def _choose_host_filters(self, filter_cls_names):
189
269
        """Since the caller may specify which filters to use we need
190
270
        to have an authoritative list of what is permissible. This
191
271
        function checks the filter names against a predefined set
192
272
        of acceptable filters.
193
273
        """
194
 
        if filters is None:
195
 
            filters = FLAGS.scheduler_default_filters
196
 
        if not isinstance(filters, (list, tuple)):
197
 
            filters = [filters]
 
274
        if filter_cls_names is None:
 
275
            filter_cls_names = CONF.scheduler_default_filters
 
276
        if not isinstance(filter_cls_names, (list, tuple)):
 
277
            filter_cls_names = [filter_cls_names]
198
278
        good_filters = []
199
279
        bad_filters = []
200
 
        for filter_name in filters:
 
280
        for filter_name in filter_cls_names:
201
281
            found_class = False
202
282
            for cls in self.filter_classes:
203
283
                if cls.__name__ == filter_name:
 
284
                    good_filters.append(cls)
204
285
                    found_class = True
205
 
                    filter_instance = cls()
206
 
                    # Get the filter function
207
 
                    filter_func = getattr(filter_instance,
208
 
                            'host_passes', None)
209
 
                    if filter_func:
210
 
                        good_filters.append(filter_func)
211
286
                    break
212
287
            if not found_class:
213
288
                bad_filters.append(filter_name)
216
291
            raise exception.SchedulerHostFilterNotFound(filter_name=msg)
217
292
        return good_filters
218
293
 
219
 
    def filter_hosts(self, hosts, filter_properties, filters=None):
 
294
    def get_filtered_hosts(self, hosts, filter_properties,
 
295
            filter_class_names=None):
220
296
        """Filter hosts and return only ones passing all filters"""
221
 
        filtered_hosts = []
222
 
        filter_fns = self._choose_host_filters(filters)
223
 
        for host in hosts:
224
 
            if host.passes_filters(filter_fns, filter_properties):
225
 
                filtered_hosts.append(host)
226
 
        return filtered_hosts
 
297
        filter_classes = self._choose_host_filters(filter_class_names)
 
298
 
 
299
        hosts = set(hosts)
 
300
        ignore_hosts = set(filter_properties.get('ignore_hosts', []))
 
301
        ignore_hosts = hosts & ignore_hosts
 
302
        if ignore_hosts:
 
303
            ignored_hosts = ', '.join(ignore_hosts)
 
304
            msg = _('Host filter ignoring hosts: %(ignored_hosts)s')
 
305
            LOG.debug(msg, locals())
 
306
            hosts = hosts - ignore_hosts
 
307
 
 
308
        force_hosts = set(filter_properties.get('force_hosts', []))
 
309
        if force_hosts:
 
310
            matching_force_hosts = hosts & force_hosts
 
311
            if not matching_force_hosts:
 
312
                forced_hosts = ', '.join(force_hosts)
 
313
                msg = _("No hosts matched due to not matching 'force_hosts'"
 
314
                        "value of '%(forced_hosts)s'")
 
315
                LOG.debug(msg, locals())
 
316
                return []
 
317
            forced_hosts = ', '.join(matching_force_hosts)
 
318
            msg = _('Host filter forcing available hosts to %(forced_hosts)s')
 
319
            LOG.debug(msg, locals())
 
320
            hosts = matching_force_hosts
 
321
 
 
322
        return self.filter_handler.get_filtered_objects(filter_classes,
 
323
                hosts, filter_properties)
 
324
 
 
325
    def get_weighed_hosts(self, hosts, weight_properties):
 
326
        """Weigh the hosts"""
 
327
        return self.weight_handler.get_weighed_objects(self.weight_classes,
 
328
                hosts, weight_properties)
227
329
 
228
330
    def update_service_capabilities(self, service_name, host, capabilities):
229
331
        """Update the per-service capabilities based on this notification."""
 
332
 
 
333
        if service_name != 'compute':
 
334
            LOG.debug(_('Ignoring %(service_name)s service update '
 
335
                    'from %(host)s'), locals())
 
336
            return
 
337
 
 
338
        state_key = (host, capabilities.get('hypervisor_hostname'))
230
339
        LOG.debug(_("Received %(service_name)s service update from "
231
 
                    "%(host)s.") % locals())
232
 
        service_caps = self.service_states.get(host, {})
 
340
                    "%(state_key)s.") % locals())
233
341
        # Copy the capabilities, so we don't modify the original dict
234
342
        capab_copy = dict(capabilities)
235
343
        capab_copy["timestamp"] = timeutils.utcnow()  # Reported time
236
 
        service_caps[service_name] = capab_copy
237
 
        self.service_states[host] = service_caps
238
 
 
239
 
    def get_all_host_states(self, context, topic):
240
 
        """Returns a dict of all the hosts the HostManager
241
 
        knows about. Also, each of the consumable resources in HostState
242
 
        are pre-populated and adjusted based on data in the db.
243
 
 
244
 
        For example:
245
 
        {'192.168.1.100': HostState(), ...}
246
 
 
247
 
        Note: this can be very slow with a lot of instances.
248
 
        InstanceType table isn't required since a copy is stored
249
 
        with the instance (in case the InstanceType changed since the
250
 
        instance was created)."""
251
 
 
252
 
        if topic != 'compute':
253
 
            raise NotImplementedError(_(
254
 
                "host_manager only implemented for 'compute'"))
255
 
 
256
 
        host_state_map = {}
 
344
        self.service_states[state_key] = capab_copy
 
345
 
 
346
    def get_all_host_states(self, context):
 
347
        """Returns a list of HostStates that represents all the hosts
 
348
        the HostManager knows about. Also, each of the consumable resources
 
349
        in HostState are pre-populated and adjusted based on data in the db.
 
350
        """
257
351
 
258
352
        # Get resource usage across the available compute nodes:
259
353
        compute_nodes = db.compute_node_get_all(context)
263
357
                LOG.warn(_("No service for compute ID %s") % compute['id'])
264
358
                continue
265
359
            host = service['host']
266
 
            capabilities = self.service_states.get(host, None)
267
 
            host_state = self.host_state_cls(host, topic,
268
 
                    capabilities=capabilities,
269
 
                    service=dict(service.iteritems()))
 
360
            node = compute.get('hypervisor_hostname')
 
361
            state_key = (host, node)
 
362
            capabilities = self.service_states.get(state_key, None)
 
363
            host_state = self.host_state_map.get(state_key)
 
364
            if host_state:
 
365
                host_state.update_capabilities(capabilities,
 
366
                                               dict(service.iteritems()))
 
367
            else:
 
368
                host_state = self.host_state_cls(host, node,
 
369
                        capabilities=capabilities,
 
370
                        service=dict(service.iteritems()))
 
371
                self.host_state_map[state_key] = host_state
270
372
            host_state.update_from_compute_node(compute)
271
 
            host_state_map[host] = host_state
272
373
 
273
 
        return host_state_map
 
374
        return self.host_state_map.itervalues()