~ubuntu-branches/ubuntu/saucy/nova/saucy-proposed

« back to all changes in this revision

Viewing changes to nova/cells/messaging.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Chuck Short, Adam Gandelman
  • Date: 2013-02-22 09:27:29 UTC
  • mfrom: (1.1.68)
  • Revision ID: package-import@ubuntu.com-20130222092729-nn3gt8rf97uvts77
Tags: 2013.1.g3-0ubuntu1
[ Chuck Short ]
* New usptream release. 
* debian/patches/debian/patches/fix-ubuntu-tests.patch: Refreshed.
* debian/nova-baremetal.logrotate: Fix logfile path.
* debian/control, debian/nova-spiceproxy.{install, logrotate, upstart}:
  Add spice html5 proxy support.
* debian/nova-novncproxy.upstart: Start on runlevel [2345]
* debian/rules: Call testr directly since run_tests.sh -N gives weird return
  value when tests pass.
* debian/pyddist-overrides: Add websockify.
* debian/nova-common.postinst: Removed config file conversion, since
  the option is no longer available. (LP: #1110567)
* debian/control: Add python-pyasn1 as a dependency.
* debian/control: Add python-oslo-config as a dependency.
* debian/control: Suggest sysfsutils, sg3-utils, multipath-tools for fibre
  channel support.

[ Adam Gandelman ]
* debian/control: Fix typo (websocikfy -> websockify).

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
import sys
26
26
 
27
27
from eventlet import queue
 
28
from oslo.config import cfg
28
29
 
29
30
from nova.cells import state as cells_state
 
31
from nova.cells import utils as cells_utils
30
32
from nova import compute
31
33
from nova import context
32
34
from nova.db import base
33
35
from nova import exception
34
 
from nova.openstack.common import cfg
35
36
from nova.openstack.common import excutils
36
37
from nova.openstack.common import importutils
37
38
from nova.openstack.common import jsonutils
38
39
from nova.openstack.common import log as logging
 
40
from nova.openstack.common import rpc
39
41
from nova.openstack.common.rpc import common as rpc_common
 
42
from nova.openstack.common import timeutils
40
43
from nova.openstack.common import uuidutils
41
44
from nova import utils
42
45
 
58
61
 
59
62
# Separator used between cell names for the 'full cell name' and routing
60
63
# path.
61
 
_PATH_CELL_SEP = '!'
 
64
_PATH_CELL_SEP = cells_utils._PATH_CELL_SEP
62
65
 
63
66
 
64
67
def _reverse_path(path):
597
600
        self.state_manager = msg_runner.state_manager
598
601
        self.compute_api = compute.API()
599
602
 
 
603
    def task_log_get_all(self, message, task_name, period_beginning,
 
604
                         period_ending, host, state):
 
605
        """Get task logs from the DB.  The message could have
 
606
        directly targeted this cell, or it could have been a broadcast
 
607
        message.
 
608
 
 
609
        If 'host' is not None, filter by host.
 
610
        If 'state' is not None, filter by state.
 
611
        """
 
612
        task_logs = self.db.task_log_get_all(message.ctxt, task_name,
 
613
                                             period_beginning,
 
614
                                             period_ending,
 
615
                                             host=host,
 
616
                                             state=state)
 
617
        return jsonutils.to_primitive(task_logs)
 
618
 
600
619
 
601
620
class _ResponseMessageMethods(_BaseMessageMethods):
602
621
    """Methods that are called from a ResponseMessage.  There's only
676
695
        """
677
696
        self.msg_runner.tell_parents_our_capacities(message.ctxt)
678
697
 
 
698
    def service_get_by_compute_host(self, message, host_name):
 
699
        """Return the service entry for a compute host."""
 
700
        service = self.db.service_get_by_compute_host(message.ctxt,
 
701
                                                      host_name)
 
702
        return jsonutils.to_primitive(service)
 
703
 
 
704
    def proxy_rpc_to_manager(self, message, host_name, rpc_message,
 
705
                             topic, timeout):
 
706
        """Proxy RPC to the given compute topic."""
 
707
        # Check that the host exists.
 
708
        self.db.service_get_by_compute_host(message.ctxt, host_name)
 
709
        if message.need_response:
 
710
            return rpc.call(message.ctxt, topic, rpc_message,
 
711
                    timeout=timeout)
 
712
        rpc.cast(message.ctxt, topic, rpc_message)
 
713
 
 
714
    def compute_node_get(self, message, compute_id):
 
715
        """Get compute node by ID."""
 
716
        compute_node = self.db.compute_node_get(message.ctxt,
 
717
                                                compute_id)
 
718
        return jsonutils.to_primitive(compute_node)
 
719
 
679
720
 
680
721
class _BroadcastMessageMethods(_BaseMessageMethods):
681
722
    """These are the methods that can be called as a part of a broadcast
778
819
            return
779
820
        self.db.bw_usage_update(message.ctxt, **bw_update_info)
780
821
 
 
822
    def _sync_instance(self, ctxt, instance):
 
823
        if instance['deleted']:
 
824
            self.msg_runner.instance_destroy_at_top(ctxt, instance)
 
825
        else:
 
826
            self.msg_runner.instance_update_at_top(ctxt, instance)
 
827
 
 
828
    def sync_instances(self, message, project_id, updated_since, deleted,
 
829
                       **kwargs):
 
830
        projid_str = project_id is None and "<all>" or project_id
 
831
        since_str = updated_since is None and "<all>" or updated_since
 
832
        LOG.info(_("Forcing a sync of instances, project_id="
 
833
                   "%(projid_str)s, updated_since=%(since_str)s"), locals())
 
834
        if updated_since is not None:
 
835
            updated_since = timeutils.parse_isotime(updated_since)
 
836
        instances = cells_utils.get_instances_to_sync(message.ctxt,
 
837
                updated_since=updated_since, project_id=project_id,
 
838
                deleted=deleted)
 
839
        for instance in instances:
 
840
            self._sync_instance(message.ctxt, instance)
 
841
 
 
842
    def service_get_all(self, message, filters):
 
843
        if filters is None:
 
844
            filters = {}
 
845
        disabled = filters.pop('disabled', None)
 
846
        services = self.db.service_get_all(message.ctxt, disabled=disabled)
 
847
        ret_services = []
 
848
        for service in services:
 
849
            service = jsonutils.to_primitive(service)
 
850
            for key, val in filters.iteritems():
 
851
                if service[key] != val:
 
852
                    break
 
853
            else:
 
854
                ret_services.append(service)
 
855
        return ret_services
 
856
 
 
857
    def compute_node_get_all(self, message, hypervisor_match):
 
858
        """Return compute nodes in this cell."""
 
859
        if hypervisor_match is not None:
 
860
            nodes = self.db.compute_node_search_by_hypervisor(message.ctxt,
 
861
                    hypervisor_match)
 
862
        else:
 
863
            nodes = self.db.compute_node_get_all(message.ctxt)
 
864
        return jsonutils.to_primitive(nodes)
 
865
 
 
866
    def compute_node_stats(self, message):
 
867
        """Return compute node stats from this cell."""
 
868
        return self.db.compute_node_statistics(message.ctxt)
 
869
 
781
870
 
782
871
_CELL_MESSAGE_TYPE_TO_MESSAGE_CLS = {'targeted': _TargetedMessage,
783
872
                                     'broadcast': _BroadcastMessage,
1004
1093
                                    'up', run_locally=False)
1005
1094
        message.process()
1006
1095
 
 
1096
    def sync_instances(self, ctxt, project_id, updated_since, deleted):
 
1097
        """Force a sync of all instances, potentially by project_id,
 
1098
        and potentially since a certain date/time.
 
1099
        """
 
1100
        method_kwargs = dict(project_id=project_id,
 
1101
                             updated_since=updated_since,
 
1102
                             deleted=deleted)
 
1103
        message = _BroadcastMessage(self, ctxt, 'sync_instances',
 
1104
                                    method_kwargs, 'down',
 
1105
                                    run_locally=False)
 
1106
        message.process()
 
1107
 
 
1108
    def service_get_all(self, ctxt, filters=None):
 
1109
        method_kwargs = dict(filters=filters)
 
1110
        message = _BroadcastMessage(self, ctxt, 'service_get_all',
 
1111
                                    method_kwargs, 'down',
 
1112
                                    run_locally=True, need_response=True)
 
1113
        return message.process()
 
1114
 
 
1115
    def service_get_by_compute_host(self, ctxt, cell_name, host_name):
 
1116
        method_kwargs = dict(host_name=host_name)
 
1117
        message = _TargetedMessage(self, ctxt,
 
1118
                                  'service_get_by_compute_host',
 
1119
                                  method_kwargs, 'down', cell_name,
 
1120
                                  need_response=True)
 
1121
        return message.process()
 
1122
 
 
1123
    def proxy_rpc_to_manager(self, ctxt, cell_name, host_name, topic,
 
1124
                             rpc_message, call, timeout):
 
1125
        method_kwargs = {'host_name': host_name,
 
1126
                         'topic': topic,
 
1127
                         'rpc_message': rpc_message,
 
1128
                         'timeout': timeout}
 
1129
        message = _TargetedMessage(self, ctxt,
 
1130
                                   'proxy_rpc_to_manager',
 
1131
                                   method_kwargs, 'down', cell_name,
 
1132
                                   need_response=call)
 
1133
        return message.process()
 
1134
 
 
1135
    def task_log_get_all(self, ctxt, cell_name, task_name,
 
1136
                         period_beginning, period_ending,
 
1137
                         host=None, state=None):
 
1138
        """Get task logs from the DB from all cells or a particular
 
1139
        cell.
 
1140
 
 
1141
        If 'cell_name' is None or '', get responses from all cells.
 
1142
        If 'host' is not None, filter by host.
 
1143
        If 'state' is not None, filter by state.
 
1144
 
 
1145
        Return a list of Response objects.
 
1146
        """
 
1147
        method_kwargs = dict(task_name=task_name,
 
1148
                             period_beginning=period_beginning,
 
1149
                             period_ending=period_ending,
 
1150
                             host=host, state=state)
 
1151
        if cell_name:
 
1152
            message = _TargetedMessage(self, ctxt, 'task_log_get_all',
 
1153
                                    method_kwargs, 'down',
 
1154
                                    cell_name, need_response=True)
 
1155
            # Caller should get a list of Responses.
 
1156
            return [message.process()]
 
1157
        message = _BroadcastMessage(self, ctxt, 'task_log_get_all',
 
1158
                                    method_kwargs, 'down',
 
1159
                                    run_locally=True, need_response=True)
 
1160
        return message.process()
 
1161
 
 
1162
    def compute_node_get_all(self, ctxt, hypervisor_match=None):
 
1163
        """Return list of compute nodes in all child cells."""
 
1164
        method_kwargs = dict(hypervisor_match=hypervisor_match)
 
1165
        message = _BroadcastMessage(self, ctxt, 'compute_node_get_all',
 
1166
                                    method_kwargs, 'down',
 
1167
                                    run_locally=True, need_response=True)
 
1168
        return message.process()
 
1169
 
 
1170
    def compute_node_stats(self, ctxt):
 
1171
        """Return compute node stats from all child cells."""
 
1172
        method_kwargs = dict()
 
1173
        message = _BroadcastMessage(self, ctxt, 'compute_node_stats',
 
1174
                                    method_kwargs, 'down',
 
1175
                                    run_locally=True, need_response=True)
 
1176
        return message.process()
 
1177
 
 
1178
    def compute_node_get(self, ctxt, cell_name, compute_id):
 
1179
        """Return compute node entry from a specific cell by ID."""
 
1180
        method_kwargs = dict(compute_id=compute_id)
 
1181
        message = _TargetedMessage(self, ctxt, 'compute_node_get',
 
1182
                                    method_kwargs, 'down',
 
1183
                                    cell_name, need_response=True)
 
1184
        return message.process()
 
1185
 
1007
1186
    @staticmethod
1008
1187
    def get_message_types():
1009
1188
        return _CELL_MESSAGE_TYPE_TO_MESSAGE_CLS.keys()