~ubuntu-branches/ubuntu/utopic/ceilometer/utopic-proposed

« back to all changes in this revision

Viewing changes to ceilometer/publisher/messaging.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2014-10-16 14:07:11 UTC
  • mfrom: (1.2.1) (28.1.5 utopic-proposed)
  • Revision ID: package-import@ubuntu.com-20141016140711-95mki6bdkivvfr2x
Tags: 2014.2-0ubuntu1
New upstream release. 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#
 
2
# Copyright 2012 New Dream Network, LLC (DreamHost)
 
3
#
 
4
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
 
5
#
 
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 
7
# not use this file except in compliance with the License. You may obtain
 
8
# a copy of the License at
 
9
#
 
10
#      http://www.apache.org/licenses/LICENSE-2.0
 
11
#
 
12
# Unless required by applicable law or agreed to in writing, software
 
13
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
14
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
15
# License for the specific language governing permissions and limitations
 
16
# under the License.
 
17
"""Publish a sample using the preferred RPC mechanism.
 
18
"""
 
19
 
 
20
import abc
 
21
import itertools
 
22
import operator
 
23
 
 
24
from oslo.config import cfg
 
25
import oslo.messaging
 
26
import oslo.messaging._drivers.common
 
27
import six
 
28
import six.moves.urllib.parse as urlparse
 
29
 
 
30
from ceilometer import messaging
 
31
from ceilometer.openstack.common.gettextutils import _
 
32
from ceilometer.openstack.common import log
 
33
from ceilometer import publisher
 
34
from ceilometer.publisher import utils
 
35
 
 
36
 
 
37
LOG = log.getLogger(__name__)
 
38
 
 
39
METER_PUBLISH_RPC_OPTS = [
 
40
    cfg.StrOpt('metering_topic',
 
41
               default='metering',
 
42
               help='The topic that ceilometer uses for metering messages.',
 
43
               deprecated_group="DEFAULT",
 
44
               ),
 
45
]
 
46
 
 
47
METER_PUBLISH_NOTIFIER_OPTS = [
 
48
    cfg.StrOpt('metering_topic',
 
49
               default='metering',
 
50
               help='The topic that ceilometer uses for metering '
 
51
               'notifications.',
 
52
               ),
 
53
    cfg.StrOpt('metering_driver',
 
54
               default='messagingv2',
 
55
               help='The driver that ceilometer uses for metering '
 
56
               'notifications.',
 
57
               )
 
58
]
 
59
 
 
60
cfg.CONF.register_opts(METER_PUBLISH_RPC_OPTS,
 
61
                       group="publisher_rpc")
 
62
cfg.CONF.register_opts(METER_PUBLISH_NOTIFIER_OPTS,
 
63
                       group="publisher_notifier")
 
64
cfg.CONF.import_opt('host', 'ceilometer.service')
 
65
 
 
66
 
 
67
def oslo_messaging_is_rabbit():
 
68
    kombu = ['ceilometer.openstack.common.rpc.impl_kombu',
 
69
             'oslo.messaging._drivers.impl_rabbit:RabbitDriver'
 
70
             'rabbit']
 
71
    return cfg.CONF.rpc_backend in kombu or (
 
72
        cfg.CONF.transport_url and
 
73
        cfg.CONF.transport_url.startswith('rabbit://'))
 
74
 
 
75
 
 
76
def override_backend_retry_config(value):
 
77
    """Override the retry config option native to the configured rpc backend.
 
78
 
 
79
    It is done if such a native config option exists.
 
80
    :param value: the value to override
 
81
    """
 
82
    # TODO(sileht): ultimately we should add to olso a more generic concept
 
83
    # of retry config (i.e. not specific to an individual AMQP provider)
 
84
    # see: https://bugs.launchpad.net/ceilometer/+bug/1244698
 
85
    # and: https://bugs.launchpad.net/oslo.messaging/+bug/1282639
 
86
    if oslo_messaging_is_rabbit():
 
87
        if 'rabbit_max_retries' in cfg.CONF:
 
88
            cfg.CONF.set_override('rabbit_max_retries', value)
 
89
 
 
90
 
 
91
@six.add_metaclass(abc.ABCMeta)
 
92
class MessagingPublisher(publisher.PublisherBase):
 
93
 
 
94
    def __init__(self, parsed_url):
 
95
        options = urlparse.parse_qs(parsed_url.query)
 
96
        # the values of the option is a list of url params values
 
97
        # only take care of the latest one if the option
 
98
        # is provided more than once
 
99
        self.per_meter_topic = bool(int(
 
100
            options.get('per_meter_topic', [0])[-1]))
 
101
 
 
102
        self.policy = options.get('policy', ['default'])[-1]
 
103
        self.max_queue_length = int(options.get(
 
104
            'max_queue_length', [1024])[-1])
 
105
 
 
106
        self.local_queue = []
 
107
 
 
108
        if self.policy in ['queue', 'drop']:
 
109
            LOG.info(_('Publishing policy set to %s, '
 
110
                       'override backend retry config to 1') % self.policy)
 
111
            override_backend_retry_config(1)
 
112
        elif self.policy == 'default':
 
113
            LOG.info(_('Publishing policy set to %s') % self.policy)
 
114
        else:
 
115
            LOG.warn(_('Publishing policy is unknown (%s) force to default')
 
116
                     % self.policy)
 
117
            self.policy = 'default'
 
118
 
 
119
    def publish_samples(self, context, samples):
 
120
        """Publish samples on RPC.
 
121
 
 
122
        :param context: Execution context from the service or RPC call.
 
123
        :param samples: Samples from pipeline after transformation.
 
124
 
 
125
        """
 
126
 
 
127
        meters = [
 
128
            utils.meter_message_from_counter(
 
129
                sample,
 
130
                cfg.CONF.publisher.metering_secret)
 
131
            for sample in samples
 
132
        ]
 
133
 
 
134
        topic = cfg.CONF.publisher_rpc.metering_topic
 
135
        self.local_queue.append((context, topic, meters))
 
136
 
 
137
        if self.per_meter_topic:
 
138
            for meter_name, meter_list in itertools.groupby(
 
139
                    sorted(meters, key=operator.itemgetter('counter_name')),
 
140
                    operator.itemgetter('counter_name')):
 
141
                meter_list = list(meter_list)
 
142
                topic_name = topic + '.' + meter_name
 
143
                LOG.debug('Publishing %(m)d samples on %(n)s',
 
144
                          {'m': len(meter_list), 'n': topic_name})
 
145
                self.local_queue.append((context, topic_name, meter_list))
 
146
 
 
147
        self.flush()
 
148
 
 
149
    def flush(self):
 
150
        # NOTE(sileht):
 
151
        # IO of the rpc stuff in handled by eventlet,
 
152
        # this is why the self.local_queue, is emptied before processing the
 
153
        # queue and the remaining messages in the queue are added to
 
154
        # self.local_queue after in case of a other call have already added
 
155
        # something in the self.local_queue
 
156
        queue = self.local_queue
 
157
        self.local_queue = []
 
158
        self.local_queue = (self._process_queue(queue, self.policy) +
 
159
                            self.local_queue)
 
160
        if self.policy == 'queue':
 
161
            self._check_queue_length()
 
162
 
 
163
    def _check_queue_length(self):
 
164
        queue_length = len(self.local_queue)
 
165
        if queue_length > self.max_queue_length > 0:
 
166
            count = queue_length - self.max_queue_length
 
167
            self.local_queue = self.local_queue[count:]
 
168
            LOG.warn(_("Publisher max local_queue length is exceeded, "
 
169
                     "dropping %d oldest samples") % count)
 
170
 
 
171
    def _process_queue(self, queue, policy):
 
172
        # NOTE(sileht):
 
173
        # the behavior of rpc.cast call depends of rabbit_max_retries
 
174
        # if rabbit_max_retries <= 0:
 
175
        #   it returns only if the msg has been sent on the amqp queue
 
176
        # if rabbit_max_retries > 0:
 
177
        #   it raises an exception if rabbitmq is unreachable
 
178
        #
 
179
        # the default policy just respect the rabbitmq configuration
 
180
        # nothing special is done if rabbit_max_retries <= 0
 
181
        # and exception is reraised if rabbit_max_retries > 0
 
182
        while queue:
 
183
            context, topic, meters = queue[0]
 
184
            try:
 
185
                self._send(context, topic, meters)
 
186
            except oslo.messaging._drivers.common.RPCException:
 
187
                samples = sum([len(m) for __, __, m in queue])
 
188
                if policy == 'queue':
 
189
                    LOG.warn(_("Failed to publish %d samples, queue them"),
 
190
                             samples)
 
191
                    return queue
 
192
                elif policy == 'drop':
 
193
                    LOG.warn(_("Failed to publish %d samples, dropping them"),
 
194
                             samples)
 
195
                    return []
 
196
                # default, occur only if rabbit_max_retries > 0
 
197
                raise
 
198
            else:
 
199
                queue.pop(0)
 
200
        return []
 
201
 
 
202
    @abc.abstractmethod
 
203
    def _send(self, context, topic, meters):
 
204
        """Send the meters to the messaging topic."""
 
205
 
 
206
 
 
207
class RPCPublisher(MessagingPublisher):
 
208
    def __init__(self, parsed_url):
 
209
        super(RPCPublisher, self).__init__(parsed_url)
 
210
 
 
211
        options = urlparse.parse_qs(parsed_url.query)
 
212
        self.target = options.get('target', ['record_metering_data'])[0]
 
213
 
 
214
        self.rpc_client = messaging.get_rpc_client(
 
215
            messaging.get_transport(),
 
216
            version='1.0'
 
217
        )
 
218
 
 
219
    def _send(self, context, topic, meters):
 
220
        self.rpc_client.prepare(topic=topic).cast(context, self.target,
 
221
                                                  data=meters)
 
222
 
 
223
 
 
224
class NotifierPublisher(MessagingPublisher):
 
225
    def __init__(self, parsed_url):
 
226
        super(NotifierPublisher, self).__init__(parsed_url)
 
227
        self.notifier = oslo.messaging.Notifier(
 
228
            messaging.get_transport(),
 
229
            driver=cfg.CONF.publisher_notifier.metering_driver,
 
230
            publisher_id='metering.publisher.%s' % cfg.CONF.host,
 
231
            topic=cfg.CONF.publisher_notifier.metering_topic
 
232
        )
 
233
 
 
234
    def _send(self, context, event_type, meters):
 
235
        self.notifier.sample(context.to_dict(), event_type=event_type,
 
236
                             payload=meters)