2
# Copyright 2012 New Dream Network, LLC (DreamHost)
4
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may
7
# not use this file except in compliance with the License. You may obtain
8
# a copy of the License at
10
# http://www.apache.org/licenses/LICENSE-2.0
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15
# License for the specific language governing permissions and limitations
17
"""Publish a sample using the preferred RPC mechanism.
24
from oslo.config import cfg
26
import oslo.messaging._drivers.common
28
import six.moves.urllib.parse as urlparse
30
from ceilometer import messaging
31
from ceilometer.openstack.common.gettextutils import _
32
from ceilometer.openstack.common import log
33
from ceilometer import publisher
34
from ceilometer.publisher import utils
37
LOG = log.getLogger(__name__)
39
METER_PUBLISH_RPC_OPTS = [
40
cfg.StrOpt('metering_topic',
42
help='The topic that ceilometer uses for metering messages.',
43
deprecated_group="DEFAULT",
47
METER_PUBLISH_NOTIFIER_OPTS = [
48
cfg.StrOpt('metering_topic',
50
help='The topic that ceilometer uses for metering '
53
cfg.StrOpt('metering_driver',
54
default='messagingv2',
55
help='The driver that ceilometer uses for metering '
60
cfg.CONF.register_opts(METER_PUBLISH_RPC_OPTS,
61
group="publisher_rpc")
62
cfg.CONF.register_opts(METER_PUBLISH_NOTIFIER_OPTS,
63
group="publisher_notifier")
64
cfg.CONF.import_opt('host', 'ceilometer.service')
67
def oslo_messaging_is_rabbit():
68
kombu = ['ceilometer.openstack.common.rpc.impl_kombu',
69
'oslo.messaging._drivers.impl_rabbit:RabbitDriver'
71
return cfg.CONF.rpc_backend in kombu or (
72
cfg.CONF.transport_url and
73
cfg.CONF.transport_url.startswith('rabbit://'))
76
def override_backend_retry_config(value):
77
"""Override the retry config option native to the configured rpc backend.
79
It is done if such a native config option exists.
80
:param value: the value to override
82
# TODO(sileht): ultimately we should add to olso a more generic concept
83
# of retry config (i.e. not specific to an individual AMQP provider)
84
# see: https://bugs.launchpad.net/ceilometer/+bug/1244698
85
# and: https://bugs.launchpad.net/oslo.messaging/+bug/1282639
86
if oslo_messaging_is_rabbit():
87
if 'rabbit_max_retries' in cfg.CONF:
88
cfg.CONF.set_override('rabbit_max_retries', value)
91
@six.add_metaclass(abc.ABCMeta)
92
class MessagingPublisher(publisher.PublisherBase):
94
def __init__(self, parsed_url):
95
options = urlparse.parse_qs(parsed_url.query)
96
# the values of the option is a list of url params values
97
# only take care of the latest one if the option
98
# is provided more than once
99
self.per_meter_topic = bool(int(
100
options.get('per_meter_topic', [0])[-1]))
102
self.policy = options.get('policy', ['default'])[-1]
103
self.max_queue_length = int(options.get(
104
'max_queue_length', [1024])[-1])
106
self.local_queue = []
108
if self.policy in ['queue', 'drop']:
109
LOG.info(_('Publishing policy set to %s, '
110
'override backend retry config to 1') % self.policy)
111
override_backend_retry_config(1)
112
elif self.policy == 'default':
113
LOG.info(_('Publishing policy set to %s') % self.policy)
115
LOG.warn(_('Publishing policy is unknown (%s) force to default')
117
self.policy = 'default'
119
def publish_samples(self, context, samples):
120
"""Publish samples on RPC.
122
:param context: Execution context from the service or RPC call.
123
:param samples: Samples from pipeline after transformation.
128
utils.meter_message_from_counter(
130
cfg.CONF.publisher.metering_secret)
131
for sample in samples
134
topic = cfg.CONF.publisher_rpc.metering_topic
135
self.local_queue.append((context, topic, meters))
137
if self.per_meter_topic:
138
for meter_name, meter_list in itertools.groupby(
139
sorted(meters, key=operator.itemgetter('counter_name')),
140
operator.itemgetter('counter_name')):
141
meter_list = list(meter_list)
142
topic_name = topic + '.' + meter_name
143
LOG.debug('Publishing %(m)d samples on %(n)s',
144
{'m': len(meter_list), 'n': topic_name})
145
self.local_queue.append((context, topic_name, meter_list))
151
# IO of the rpc stuff in handled by eventlet,
152
# this is why the self.local_queue, is emptied before processing the
153
# queue and the remaining messages in the queue are added to
154
# self.local_queue after in case of a other call have already added
155
# something in the self.local_queue
156
queue = self.local_queue
157
self.local_queue = []
158
self.local_queue = (self._process_queue(queue, self.policy) +
160
if self.policy == 'queue':
161
self._check_queue_length()
163
def _check_queue_length(self):
164
queue_length = len(self.local_queue)
165
if queue_length > self.max_queue_length > 0:
166
count = queue_length - self.max_queue_length
167
self.local_queue = self.local_queue[count:]
168
LOG.warn(_("Publisher max local_queue length is exceeded, "
169
"dropping %d oldest samples") % count)
171
def _process_queue(self, queue, policy):
173
# the behavior of rpc.cast call depends of rabbit_max_retries
174
# if rabbit_max_retries <= 0:
175
# it returns only if the msg has been sent on the amqp queue
176
# if rabbit_max_retries > 0:
177
# it raises an exception if rabbitmq is unreachable
179
# the default policy just respect the rabbitmq configuration
180
# nothing special is done if rabbit_max_retries <= 0
181
# and exception is reraised if rabbit_max_retries > 0
183
context, topic, meters = queue[0]
185
self._send(context, topic, meters)
186
except oslo.messaging._drivers.common.RPCException:
187
samples = sum([len(m) for __, __, m in queue])
188
if policy == 'queue':
189
LOG.warn(_("Failed to publish %d samples, queue them"),
192
elif policy == 'drop':
193
LOG.warn(_("Failed to publish %d samples, dropping them"),
196
# default, occur only if rabbit_max_retries > 0
203
def _send(self, context, topic, meters):
204
"""Send the meters to the messaging topic."""
207
class RPCPublisher(MessagingPublisher):
208
def __init__(self, parsed_url):
209
super(RPCPublisher, self).__init__(parsed_url)
211
options = urlparse.parse_qs(parsed_url.query)
212
self.target = options.get('target', ['record_metering_data'])[0]
214
self.rpc_client = messaging.get_rpc_client(
215
messaging.get_transport(),
219
def _send(self, context, topic, meters):
220
self.rpc_client.prepare(topic=topic).cast(context, self.target,
224
class NotifierPublisher(MessagingPublisher):
225
def __init__(self, parsed_url):
226
super(NotifierPublisher, self).__init__(parsed_url)
227
self.notifier = oslo.messaging.Notifier(
228
messaging.get_transport(),
229
driver=cfg.CONF.publisher_notifier.metering_driver,
230
publisher_id='metering.publisher.%s' % cfg.CONF.host,
231
topic=cfg.CONF.publisher_notifier.metering_topic
234
def _send(self, context, event_type, meters):
235
self.notifier.sample(context.to_dict(), event_type=event_type,