~ubuntu-branches/ubuntu/vivid/ceilometer/vivid

« back to all changes in this revision

Viewing changes to ceilometer/pipeline.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2014-03-06 14:44:28 UTC
  • mto: (28.1.1 utopic-proposed) (1.2.1)
  • mto: This revision was merged to the branch mainline in revision 19.
  • Revision ID: package-import@ubuntu.com-20140306144428-rvphsh4igwyulzf0
Tags: upstream-2014.1~b3
ImportĀ upstreamĀ versionĀ 2014.1~b3

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
# -*- encoding: utf-8 -*-
2
2
#
3
3
# Copyright Ā© 2013 Intel Corp.
 
4
# Copyright Ā© 2014 Red Hat, Inc
4
5
#
5
 
# Author: Yunhong Jiang <yunhong.jiang@intel.com>
 
6
# Authors: Yunhong Jiang <yunhong.jiang@intel.com>
 
7
#          Eoghan Glynn <eglynn@redhat.com>
6
8
#
7
9
# Licensed under the Apache License, Version 2.0 (the "License"); you may
8
10
# not use this file except in compliance with the License. You may obtain
27
29
from ceilometer.openstack.common.gettextutils import _  # noqa
28
30
from ceilometer.openstack.common import log
29
31
from ceilometer import publisher
 
32
from ceilometer import transformer as xformer
30
33
 
31
34
 
32
35
OPTS = [
33
36
    cfg.StrOpt('pipeline_cfg_file',
34
37
               default="pipeline.yaml",
35
 
               help="Configuration file for pipeline definition"
 
38
               help="Configuration file for pipeline definition."
36
39
               ),
37
40
]
38
41
 
71
74
            p.flush(self.context)
72
75
 
73
76
 
74
 
class Pipeline(object):
75
 
    """Sample handling pipeline
76
 
 
77
 
    Pipeline describes a chain of handlers. The chain starts with
78
 
    tranformer and ends with one or more publishers.
79
 
 
80
 
    The first transformer in the chain gets sample from data collector, i.e.
81
 
    pollster or notification handler, takes some action like dropping,
82
 
    aggregation, changing field etc, then passes the updated sample
83
 
    to next step.
84
 
 
85
 
    The subsequent transformers, if any, handle the data similarly.
86
 
 
87
 
    In the end of the chain, publishers publish the data. The exact publishing
88
 
    method depends on publisher type, for example, pushing into data storage
89
 
    through message bus, sending to external CW software through CW API call.
90
 
 
91
 
    If no transformer is included in the chain, the publishers get samples
92
 
    from data collector and publish them directly.
 
77
class Source(object):
 
78
    """Represents a source of samples, in effect a set of pollsters
 
79
    and/or notification handlers emitting samples for a set of matching
 
80
    meters.
 
81
 
 
82
    Each source encapsulates meter name matching, polling interval
 
83
    determination, optional resource enumeration or discovery, and
 
84
    mapping to one or more sinks for publication.
93
85
 
94
86
    """
95
87
 
96
 
    def __init__(self, cfg, transformer_manager):
 
88
    def __init__(self, cfg):
97
89
        self.cfg = cfg
98
90
 
99
91
        try:
104
96
                raise PipelineException("Invalid interval value", cfg)
105
97
            # Support 'counters' for backward compatibility
106
98
            self.meters = cfg.get('meters', cfg.get('counters'))
107
 
            # It's legal to have no transformer specified
108
 
            self.transformer_cfg = cfg['transformers'] or []
 
99
            self.sinks = cfg.get('sinks')
109
100
        except KeyError as err:
110
101
            raise PipelineException(
111
102
                "Required field %s not specified" % err.args[0], cfg)
113
104
        if self.interval <= 0:
114
105
            raise PipelineException("Interval value should > 0", cfg)
115
106
 
116
 
        self._check_meters()
117
 
 
118
 
        if not cfg.get('publishers'):
119
 
            raise PipelineException("No publisher specified", cfg)
120
 
 
121
 
        self.publishers = []
122
 
        for p in cfg['publishers']:
123
 
            if '://' not in p:
124
 
                # Support old format without URL
125
 
                p = p + "://"
126
 
            try:
127
 
                self.publishers.append(publisher.get_publisher(p))
128
 
            except Exception:
129
 
                LOG.exception(_("Unable to load publisher %s"), p)
130
 
 
131
 
        self.transformers = self._setup_transformers(cfg, transformer_manager)
132
 
 
133
107
        self.resources = cfg.get('resources') or []
134
108
        if not isinstance(self.resources, list):
135
109
            raise PipelineException("Resources should be a list", cfg)
136
110
 
 
111
        self.discovery = cfg.get('discovery') or []
 
112
        if not isinstance(self.discovery, list):
 
113
            raise PipelineException("Discovery should be a list", cfg)
 
114
 
 
115
        self._check_meters()
 
116
 
137
117
    def __str__(self):
138
118
        return self.name
139
119
 
161
141
                "Included meters specified with wildcard",
162
142
                self.cfg)
163
143
 
 
144
    # (yjiang5) To support meters like instance:m1.tiny,
 
145
    # which include variable part at the end starting with ':'.
 
146
    # Hope we will not add such meters in future.
 
147
    @staticmethod
 
148
    def _variable_meter_name(name):
 
149
        m = name.partition(':')
 
150
        if m[1] == ':':
 
151
            return m[1].join((m[0], '*'))
 
152
        else:
 
153
            return name
 
154
 
 
155
    def support_meter(self, meter_name):
 
156
        meter_name = self._variable_meter_name(meter_name)
 
157
 
 
158
        # Special case: if we only have negation, we suppose the default is
 
159
        # allow
 
160
        default = all(meter.startswith('!') for meter in self.meters)
 
161
 
 
162
        # Support wildcard like storage.* and !disk.*
 
163
        # Start with negation, we consider that the order is deny, allow
 
164
        if any(fnmatch.fnmatch(meter_name, meter[1:])
 
165
               for meter in self.meters
 
166
               if meter[0] == '!'):
 
167
            return False
 
168
 
 
169
        if any(fnmatch.fnmatch(meter_name, meter)
 
170
               for meter in self.meters
 
171
               if meter[0] != '!'):
 
172
            return True
 
173
 
 
174
        return default
 
175
 
 
176
    def check_sinks(self, sinks):
 
177
        if not self.sinks:
 
178
            raise PipelineException(
 
179
                "No sink defined in source %s" % self,
 
180
                self.cfg)
 
181
        for sink in self.sinks:
 
182
            if sink not in sinks:
 
183
                raise PipelineException(
 
184
                    "Dangling sink %s from source %s" % (sink, self),
 
185
                    self.cfg)
 
186
 
 
187
 
 
188
class Sink(object):
 
189
    """Represents a sink for the transformation and publication of
 
190
    samples emitted from a related source.
 
191
 
 
192
    Each sink config is concerned *only* with the transformation rules
 
193
    and publication conduits for samples.
 
194
 
 
195
    In effect, a sink describes a chain of handlers. The chain starts
 
196
    with zero or more transformers and ends with one or more publishers.
 
197
 
 
198
    The first transformer in the chain is passed samples from the
 
199
    corresponding source, takes some action such as deriving rate of
 
200
    change, performing unit conversion, or aggregating, before passing
 
201
    the modified sample to next step.
 
202
 
 
203
    The subsequent transformers, if any, handle the data similarly.
 
204
 
 
205
    At the end of the chain, publishers publish the data. The exact
 
206
    publishing method depends on publisher type, for example, pushing
 
207
    into data storage via the message bus providing guaranteed delivery,
 
208
    or for loss-tolerant samples UDP may be used.
 
209
 
 
210
    If no transformers are included in the chain, the publishers are
 
211
    passed samples directly from the sink which are published unchanged.
 
212
 
 
213
    """
 
214
 
 
215
    def __init__(self, cfg, transformer_manager):
 
216
        self.cfg = cfg
 
217
 
 
218
        try:
 
219
            self.name = cfg['name']
 
220
            # It's legal to have no transformer specified
 
221
            self.transformer_cfg = cfg['transformers'] or []
 
222
        except KeyError as err:
 
223
            raise PipelineException(
 
224
                "Required field %s not specified" % err.args[0], cfg)
 
225
 
 
226
        if not cfg.get('publishers'):
 
227
            raise PipelineException("No publisher specified", cfg)
 
228
 
 
229
        self.publishers = []
 
230
        for p in cfg['publishers']:
 
231
            if '://' not in p:
 
232
                # Support old format without URL
 
233
                p = p + "://"
 
234
            try:
 
235
                self.publishers.append(publisher.get_publisher(p))
 
236
            except Exception:
 
237
                LOG.exception(_("Unable to load publisher %s"), p)
 
238
 
 
239
        self.transformers = self._setup_transformers(cfg, transformer_manager)
 
240
 
 
241
    def __str__(self):
 
242
        return self.name
 
243
 
164
244
    def _setup_transformers(self, cfg, transformer_manager):
165
245
        transformer_cfg = cfg['transformers'] or []
166
246
        transformers = []
234
314
                                                      'pub': p}))
235
315
            LOG.audit(_("Pipeline %s: Published samples") % self)
236
316
 
237
 
    def publish_sample(self, ctxt, sample):
238
 
        self.publish_samples(ctxt, [sample])
239
 
 
240
317
    def publish_samples(self, ctxt, samples):
241
318
        for meter_name, samples in itertools.groupby(
242
319
                sorted(samples, key=operator.attrgetter('name')),
243
320
                operator.attrgetter('name')):
244
 
            if self.support_meter(meter_name):
245
 
                self._publish_samples(0, ctxt, samples)
246
 
 
247
 
    # (yjiang5) To support meters like instance:m1.tiny,
248
 
    # which include variable part at the end starting with ':'.
249
 
    # Hope we will not add such meters in future.
250
 
    def _variable_meter_name(self, name):
251
 
        m = name.partition(':')
252
 
        if m[1] == ':':
253
 
            return m[1].join((m[0], '*'))
254
 
        else:
255
 
            return name
256
 
 
257
 
    def support_meter(self, meter_name):
258
 
        meter_name = self._variable_meter_name(meter_name)
259
 
 
260
 
        # Special case: if we only have negation, we suppose the default it
261
 
        # allow
262
 
        if all(meter.startswith('!') for meter in self.meters):
263
 
            default = True
264
 
        else:
265
 
            default = False
266
 
 
267
 
        # Support wildcard like storage.* and !disk.*
268
 
        # Start with negation, we consider that the order is deny, allow
269
 
        if any(fnmatch.fnmatch(meter_name, meter[1:])
270
 
               for meter in self.meters
271
 
               if meter[0] == '!'):
272
 
            return False
273
 
 
274
 
        if any(fnmatch.fnmatch(meter_name, meter)
275
 
               for meter in self.meters
276
 
               if meter[0] != '!'):
277
 
            return True
278
 
 
279
 
        return default
 
321
            self._publish_samples(0, ctxt, samples)
280
322
 
281
323
    def flush(self, ctxt):
282
324
        """Flush data after all samples have been injected to pipeline."""
283
325
 
284
 
        LOG.audit(_("Flush pipeline %s"), self)
285
326
        for (i, transformer) in enumerate(self.transformers):
286
327
            try:
287
328
                self._publish_samples(i + 1, ctxt,
293
334
                                                 'trans': transformer}))
294
335
                LOG.exception(err)
295
336
 
 
337
 
 
338
class Pipeline(object):
 
339
    """Represents a coupling between a sink and a corresponding source.
 
340
    """
 
341
 
 
342
    def __init__(self, source, sink):
 
343
        self.source = source
 
344
        self.sink = sink
 
345
        self.name = str(self)
 
346
 
 
347
    def __str__(self):
 
348
        return (self.source.name if self.source.name == self.sink.name
 
349
                else '%s:%s' % (self.source.name, self.sink.name))
 
350
 
296
351
    def get_interval(self):
297
 
        return self.interval
 
352
        return self.source.interval
 
353
 
 
354
    @property
 
355
    def resources(self):
 
356
        return self.source.resources
 
357
 
 
358
    @property
 
359
    def discovery(self):
 
360
        return self.source.discovery
 
361
 
 
362
    def support_meter(self, meter_name):
 
363
        return self.source.support_meter(meter_name)
 
364
 
 
365
    @property
 
366
    def publishers(self):
 
367
        return self.sink.publishers
 
368
 
 
369
    def publish_sample(self, ctxt, sample):
 
370
        self.publish_samples(ctxt, [sample])
 
371
 
 
372
    def publish_samples(self, ctxt, samples):
 
373
        supported = [s for s in samples if self.source.support_meter(s.name)]
 
374
        self.sink.publish_samples(ctxt, supported)
 
375
 
 
376
    def flush(self, ctxt):
 
377
        self.sink.flush(ctxt)
298
378
 
299
379
 
300
380
class PipelineManager(object):
306
386
 
307
387
    """
308
388
 
309
 
    def __init__(self, cfg,
310
 
                 transformer_manager):
 
389
    def __init__(self, cfg, transformer_manager):
311
390
        """Setup the pipelines according to config.
312
391
 
313
 
        The top of the cfg is a list of pipeline definitions.
314
 
 
315
 
        Pipeline definition is an dictionary specifying the target samples,
316
 
        the tranformers involved, and the target publishers:
317
 
        {
318
 
            "name": pipeline_name
319
 
            "interval": interval_time
320
 
            "meters" :  ["meter_1", "meter_2"],
321
 
            "resources": ["resource_uri1", "resource_uri2"],
322
 
            "tranformers":[
323
 
                              {"name": "Transformer_1",
324
 
                               "parameters": {"p1": "value"}},
325
 
 
326
 
                               {"name": "Transformer_2",
327
 
                               "parameters": {"p1": "value"}},
328
 
                           ]
329
 
            "publishers": ["publisher_1", "publisher_2"]
330
 
        }
331
 
 
332
 
        Interval is how many seconds should the samples be injected to
333
 
        the pipeline.
 
392
        The configuration is supported in one of two forms:
 
393
 
 
394
        1. Deprecated: the source and sink configuration are conflated
 
395
           as a list of consolidated pipelines.
 
396
 
 
397
           The pipelines are defined as a list of dictionaries each
 
398
           specifying the target samples, the transformers involved,
 
399
           and the target publishers, for example:
 
400
 
 
401
           [{"name": pipeline_1,
 
402
             "interval": interval_time,
 
403
             "meters" : ["meter_1", "meter_2"],
 
404
             "resources": ["resource_uri1", "resource_uri2"],
 
405
             "transformers": [
 
406
                              {"name": "Transformer_1",
 
407
                               "parameters": {"p1": "value"}},
 
408
 
 
409
                              {"name": "Transformer_2",
 
410
                               "parameters": {"p1": "value"}},
 
411
                              ],
 
412
             "publishers": ["publisher_1", "publisher_2"]
 
413
            },
 
414
            {"name": pipeline_2,
 
415
             "interval": interval_time,
 
416
             "meters" : ["meter_3"],
 
417
             "publishers": ["publisher_3"]
 
418
            },
 
419
           ]
 
420
 
 
421
        2. Decoupled: the source and sink configuration are separately
 
422
           specified before being linked together. This allows source-
 
423
           specific configuration, such as resource discovery, to be
 
424
           kept focused only on the fine-grained source while avoiding
 
425
           the necessity for wide duplication of sink-related config.
 
426
 
 
427
           The configuration is provided in the form of separate lists
 
428
           of dictionaries defining sources and sinks, for example:
 
429
 
 
430
           {"sources": [{"name": source_1,
 
431
                         "interval": interval_time,
 
432
                         "meters" : ["meter_1", "meter_2"],
 
433
                         "resources": ["resource_uri1", "resource_uri2"],
 
434
                         "sinks" : ["sink_1", "sink_2"]
 
435
                        },
 
436
                        {"name": source_2,
 
437
                         "interval": interval_time,
 
438
                         "meters" : ["meter_3"],
 
439
                         "sinks" : ["sink_2"]
 
440
                        },
 
441
                       ],
 
442
            "sinks": [{"name": sink_1,
 
443
                       "transformers": [
 
444
                              {"name": "Transformer_1",
 
445
                               "parameters": {"p1": "value"}},
 
446
 
 
447
                              {"name": "Transformer_2",
 
448
                               "parameters": {"p1": "value"}},
 
449
                             ],
 
450
                        "publishers": ["publisher_1", "publisher_2"]
 
451
                       },
 
452
                       {"name": sink_2,
 
453
                        "publishers": ["publisher_3"]
 
454
                       },
 
455
                      ]
 
456
           }
 
457
 
 
458
        The semantics of the common individual configuration elements
 
459
        are identical in the deprecated and decoupled version.
 
460
 
 
461
        The interval determines the cadence of sample injection into
 
462
        the pipeline where samples are produced under the direct control
 
463
        of an agent, i.e. via a polling cycle as opposed to incoming
 
464
        notifications.
334
465
 
335
466
        Valid meter format is '*', '!meter_name', or 'meter_name'.
336
467
        '*' is wildcard symbol means any meters; '!meter_name' means
348
479
        the meters should be polled. It's optional and it's up to the
349
480
        specific pollster to decide how to use it.
350
481
 
351
 
        Transformer's name is plugin name in setup.py.
 
482
        Transformer's name is plugin name in setup.cfg.
352
483
 
353
 
        Publisher's name is plugin name in setup.py
 
484
        Publisher's name is plugin name in setup.cfg
354
485
 
355
486
        """
356
 
        self.pipelines = [Pipeline(pipedef, transformer_manager)
357
 
                          for pipedef in cfg]
 
487
        self.pipelines = []
 
488
        if 'sources' in cfg or 'sinks' in cfg:
 
489
            if not ('sources' in cfg and 'sinks' in cfg):
 
490
                raise PipelineException("Both sources & sinks are required",
 
491
                                        cfg)
 
492
            LOG.info(_('detected decoupled pipeline config format'))
 
493
            sources = [Source(s) for s in cfg.get('sources', [])]
 
494
            sinks = dict((s['name'], Sink(s, transformer_manager))
 
495
                         for s in cfg.get('sinks', []))
 
496
            for source in sources:
 
497
                source.check_sinks(sinks)
 
498
                for target in source.sinks:
 
499
                    self.pipelines.append(Pipeline(source,
 
500
                                                   sinks[target]))
 
501
        else:
 
502
            LOG.warning(_('detected deprecated pipeline config format'))
 
503
            for pipedef in cfg:
 
504
                source = Source(pipedef)
 
505
                sink = Sink(pipedef, transformer_manager)
 
506
                self.pipelines.append(Pipeline(source, sink))
358
507
 
359
508
    def publisher(self, context):
360
509
        """Build a new Publisher for these manager pipelines.
364
513
        return PublishContext(context, self.pipelines)
365
514
 
366
515
 
367
 
def setup_pipeline(transformer_manager):
 
516
def setup_pipeline(transformer_manager=None):
368
517
    """Setup pipeline manager according to yaml config file."""
369
518
    cfg_file = cfg.CONF.pipeline_cfg_file
370
519
    if not os.path.exists(cfg_file):
379
528
    LOG.info(_("Pipeline config: %s"), pipeline_cfg)
380
529
 
381
530
    return PipelineManager(pipeline_cfg,
382
 
                           transformer_manager)
 
531
                           transformer_manager or
 
532
                           xformer.TransformerExtensionManager(
 
533
                               'ceilometer.transformer',
 
534
                           ))