27
27
from eventlet import queue
28
from oslo.config import cfg
29
30
from nova.cells import state as cells_state
31
from nova.cells import utils as cells_utils
30
32
from nova import compute
31
33
from nova import context
32
34
from nova.db import base
33
35
from nova import exception
34
from nova.openstack.common import cfg
35
36
from nova.openstack.common import excutils
36
37
from nova.openstack.common import importutils
37
38
from nova.openstack.common import jsonutils
38
39
from nova.openstack.common import log as logging
40
from nova.openstack.common import rpc
39
41
from nova.openstack.common.rpc import common as rpc_common
42
from nova.openstack.common import timeutils
40
43
from nova.openstack.common import uuidutils
41
44
from nova import utils
597
600
self.state_manager = msg_runner.state_manager
598
601
self.compute_api = compute.API()
603
def task_log_get_all(self, message, task_name, period_beginning,
604
period_ending, host, state):
605
"""Get task logs from the DB. The message could have
606
directly targeted this cell, or it could have been a broadcast
609
If 'host' is not None, filter by host.
610
If 'state' is not None, filter by state.
612
task_logs = self.db.task_log_get_all(message.ctxt, task_name,
617
return jsonutils.to_primitive(task_logs)
601
620
class _ResponseMessageMethods(_BaseMessageMethods):
602
621
"""Methods that are called from a ResponseMessage. There's only
677
696
self.msg_runner.tell_parents_our_capacities(message.ctxt)
698
def service_get_by_compute_host(self, message, host_name):
699
"""Return the service entry for a compute host."""
700
service = self.db.service_get_by_compute_host(message.ctxt,
702
return jsonutils.to_primitive(service)
704
def proxy_rpc_to_manager(self, message, host_name, rpc_message,
706
"""Proxy RPC to the given compute topic."""
707
# Check that the host exists.
708
self.db.service_get_by_compute_host(message.ctxt, host_name)
709
if message.need_response:
710
return rpc.call(message.ctxt, topic, rpc_message,
712
rpc.cast(message.ctxt, topic, rpc_message)
714
def compute_node_get(self, message, compute_id):
715
"""Get compute node by ID."""
716
compute_node = self.db.compute_node_get(message.ctxt,
718
return jsonutils.to_primitive(compute_node)
680
721
class _BroadcastMessageMethods(_BaseMessageMethods):
681
722
"""These are the methods that can be called as a part of a broadcast
779
820
self.db.bw_usage_update(message.ctxt, **bw_update_info)
822
def _sync_instance(self, ctxt, instance):
823
if instance['deleted']:
824
self.msg_runner.instance_destroy_at_top(ctxt, instance)
826
self.msg_runner.instance_update_at_top(ctxt, instance)
828
def sync_instances(self, message, project_id, updated_since, deleted,
830
projid_str = project_id is None and "<all>" or project_id
831
since_str = updated_since is None and "<all>" or updated_since
832
LOG.info(_("Forcing a sync of instances, project_id="
833
"%(projid_str)s, updated_since=%(since_str)s"), locals())
834
if updated_since is not None:
835
updated_since = timeutils.parse_isotime(updated_since)
836
instances = cells_utils.get_instances_to_sync(message.ctxt,
837
updated_since=updated_since, project_id=project_id,
839
for instance in instances:
840
self._sync_instance(message.ctxt, instance)
842
def service_get_all(self, message, filters):
845
disabled = filters.pop('disabled', None)
846
services = self.db.service_get_all(message.ctxt, disabled=disabled)
848
for service in services:
849
service = jsonutils.to_primitive(service)
850
for key, val in filters.iteritems():
851
if service[key] != val:
854
ret_services.append(service)
857
def compute_node_get_all(self, message, hypervisor_match):
858
"""Return compute nodes in this cell."""
859
if hypervisor_match is not None:
860
nodes = self.db.compute_node_search_by_hypervisor(message.ctxt,
863
nodes = self.db.compute_node_get_all(message.ctxt)
864
return jsonutils.to_primitive(nodes)
866
def compute_node_stats(self, message):
867
"""Return compute node stats from this cell."""
868
return self.db.compute_node_statistics(message.ctxt)
782
871
_CELL_MESSAGE_TYPE_TO_MESSAGE_CLS = {'targeted': _TargetedMessage,
783
872
'broadcast': _BroadcastMessage,
1004
1093
'up', run_locally=False)
1005
1094
message.process()
1096
def sync_instances(self, ctxt, project_id, updated_since, deleted):
1097
"""Force a sync of all instances, potentially by project_id,
1098
and potentially since a certain date/time.
1100
method_kwargs = dict(project_id=project_id,
1101
updated_since=updated_since,
1103
message = _BroadcastMessage(self, ctxt, 'sync_instances',
1104
method_kwargs, 'down',
1108
def service_get_all(self, ctxt, filters=None):
1109
method_kwargs = dict(filters=filters)
1110
message = _BroadcastMessage(self, ctxt, 'service_get_all',
1111
method_kwargs, 'down',
1112
run_locally=True, need_response=True)
1113
return message.process()
1115
def service_get_by_compute_host(self, ctxt, cell_name, host_name):
1116
method_kwargs = dict(host_name=host_name)
1117
message = _TargetedMessage(self, ctxt,
1118
'service_get_by_compute_host',
1119
method_kwargs, 'down', cell_name,
1121
return message.process()
1123
def proxy_rpc_to_manager(self, ctxt, cell_name, host_name, topic,
1124
rpc_message, call, timeout):
1125
method_kwargs = {'host_name': host_name,
1127
'rpc_message': rpc_message,
1129
message = _TargetedMessage(self, ctxt,
1130
'proxy_rpc_to_manager',
1131
method_kwargs, 'down', cell_name,
1133
return message.process()
1135
def task_log_get_all(self, ctxt, cell_name, task_name,
1136
period_beginning, period_ending,
1137
host=None, state=None):
1138
"""Get task logs from the DB from all cells or a particular
1141
If 'cell_name' is None or '', get responses from all cells.
1142
If 'host' is not None, filter by host.
1143
If 'state' is not None, filter by state.
1145
Return a list of Response objects.
1147
method_kwargs = dict(task_name=task_name,
1148
period_beginning=period_beginning,
1149
period_ending=period_ending,
1150
host=host, state=state)
1152
message = _TargetedMessage(self, ctxt, 'task_log_get_all',
1153
method_kwargs, 'down',
1154
cell_name, need_response=True)
1155
# Caller should get a list of Responses.
1156
return [message.process()]
1157
message = _BroadcastMessage(self, ctxt, 'task_log_get_all',
1158
method_kwargs, 'down',
1159
run_locally=True, need_response=True)
1160
return message.process()
1162
def compute_node_get_all(self, ctxt, hypervisor_match=None):
1163
"""Return list of compute nodes in all child cells."""
1164
method_kwargs = dict(hypervisor_match=hypervisor_match)
1165
message = _BroadcastMessage(self, ctxt, 'compute_node_get_all',
1166
method_kwargs, 'down',
1167
run_locally=True, need_response=True)
1168
return message.process()
1170
def compute_node_stats(self, ctxt):
1171
"""Return compute node stats from all child cells."""
1172
method_kwargs = dict()
1173
message = _BroadcastMessage(self, ctxt, 'compute_node_stats',
1174
method_kwargs, 'down',
1175
run_locally=True, need_response=True)
1176
return message.process()
1178
def compute_node_get(self, ctxt, cell_name, compute_id):
1179
"""Return compute node entry from a specific cell by ID."""
1180
method_kwargs = dict(compute_id=compute_id)
1181
message = _TargetedMessage(self, ctxt, 'compute_node_get',
1182
method_kwargs, 'down',
1183
cell_name, need_response=True)
1184
return message.process()
1008
1187
def get_message_types():
1009
1188
return _CELL_MESSAGE_TYPE_TO_MESSAGE_CLS.keys()