71
74
p.flush(self.context)
74
class Pipeline(object):
75
"""Sample handling pipeline
77
Pipeline describes a chain of handlers. The chain starts with
78
tranformer and ends with one or more publishers.
80
The first transformer in the chain gets sample from data collector, i.e.
81
pollster or notification handler, takes some action like dropping,
82
aggregation, changing field etc, then passes the updated sample
85
The subsequent transformers, if any, handle the data similarly.
87
In the end of the chain, publishers publish the data. The exact publishing
88
method depends on publisher type, for example, pushing into data storage
89
through message bus, sending to external CW software through CW API call.
91
If no transformer is included in the chain, the publishers get samples
92
from data collector and publish them directly.
78
"""Represents a source of samples, in effect a set of pollsters
79
and/or notification handlers emitting samples for a set of matching
82
Each source encapsulates meter name matching, polling interval
83
determination, optional resource enumeration or discovery, and
84
mapping to one or more sinks for publication.
96
def __init__(self, cfg, transformer_manager):
88
def __init__(self, cfg):
113
104
if self.interval <= 0:
114
105
raise PipelineException("Interval value should > 0", cfg)
118
if not cfg.get('publishers'):
119
raise PipelineException("No publisher specified", cfg)
122
for p in cfg['publishers']:
124
# Support old format without URL
127
self.publishers.append(publisher.get_publisher(p))
129
LOG.exception(_("Unable to load publisher %s"), p)
131
self.transformers = self._setup_transformers(cfg, transformer_manager)
133
107
self.resources = cfg.get('resources') or []
134
108
if not isinstance(self.resources, list):
135
109
raise PipelineException("Resources should be a list", cfg)
111
self.discovery = cfg.get('discovery') or []
112
if not isinstance(self.discovery, list):
113
raise PipelineException("Discovery should be a list", cfg)
137
117
def __str__(self):
161
141
"Included meters specified with wildcard",
144
# (yjiang5) To support meters like instance:m1.tiny,
145
# which include variable part at the end starting with ':'.
146
# Hope we will not add such meters in future.
148
def _variable_meter_name(name):
149
m = name.partition(':')
151
return m[1].join((m[0], '*'))
155
def support_meter(self, meter_name):
156
meter_name = self._variable_meter_name(meter_name)
158
# Special case: if we only have negation, we suppose the default is
160
default = all(meter.startswith('!') for meter in self.meters)
162
# Support wildcard like storage.* and !disk.*
163
# Start with negation, we consider that the order is deny, allow
164
if any(fnmatch.fnmatch(meter_name, meter[1:])
165
for meter in self.meters
169
if any(fnmatch.fnmatch(meter_name, meter)
170
for meter in self.meters
176
def check_sinks(self, sinks):
178
raise PipelineException(
179
"No sink defined in source %s" % self,
181
for sink in self.sinks:
182
if sink not in sinks:
183
raise PipelineException(
184
"Dangling sink %s from source %s" % (sink, self),
189
"""Represents a sink for the transformation and publication of
190
samples emitted from a related source.
192
Each sink config is concerned *only* with the transformation rules
193
and publication conduits for samples.
195
In effect, a sink describes a chain of handlers. The chain starts
196
with zero or more transformers and ends with one or more publishers.
198
The first transformer in the chain is passed samples from the
199
corresponding source, takes some action such as deriving rate of
200
change, performing unit conversion, or aggregating, before passing
201
the modified sample to next step.
203
The subsequent transformers, if any, handle the data similarly.
205
At the end of the chain, publishers publish the data. The exact
206
publishing method depends on publisher type, for example, pushing
207
into data storage via the message bus providing guaranteed delivery,
208
or for loss-tolerant samples UDP may be used.
210
If no transformers are included in the chain, the publishers are
211
passed samples directly from the sink which are published unchanged.
215
def __init__(self, cfg, transformer_manager):
219
self.name = cfg['name']
220
# It's legal to have no transformer specified
221
self.transformer_cfg = cfg['transformers'] or []
222
except KeyError as err:
223
raise PipelineException(
224
"Required field %s not specified" % err.args[0], cfg)
226
if not cfg.get('publishers'):
227
raise PipelineException("No publisher specified", cfg)
230
for p in cfg['publishers']:
232
# Support old format without URL
235
self.publishers.append(publisher.get_publisher(p))
237
LOG.exception(_("Unable to load publisher %s"), p)
239
self.transformers = self._setup_transformers(cfg, transformer_manager)
164
244
def _setup_transformers(self, cfg, transformer_manager):
165
245
transformer_cfg = cfg['transformers'] or []
166
246
transformers = []
235
315
LOG.audit(_("Pipeline %s: Published samples") % self)
237
def publish_sample(self, ctxt, sample):
238
self.publish_samples(ctxt, [sample])
240
317
def publish_samples(self, ctxt, samples):
241
318
for meter_name, samples in itertools.groupby(
242
319
sorted(samples, key=operator.attrgetter('name')),
243
320
operator.attrgetter('name')):
244
if self.support_meter(meter_name):
245
self._publish_samples(0, ctxt, samples)
247
# (yjiang5) To support meters like instance:m1.tiny,
248
# which include variable part at the end starting with ':'.
249
# Hope we will not add such meters in future.
250
def _variable_meter_name(self, name):
251
m = name.partition(':')
253
return m[1].join((m[0], '*'))
257
def support_meter(self, meter_name):
258
meter_name = self._variable_meter_name(meter_name)
260
# Special case: if we only have negation, we suppose the default it
262
if all(meter.startswith('!') for meter in self.meters):
267
# Support wildcard like storage.* and !disk.*
268
# Start with negation, we consider that the order is deny, allow
269
if any(fnmatch.fnmatch(meter_name, meter[1:])
270
for meter in self.meters
274
if any(fnmatch.fnmatch(meter_name, meter)
275
for meter in self.meters
321
self._publish_samples(0, ctxt, samples)
281
323
def flush(self, ctxt):
282
324
"""Flush data after all samples have been injected to pipeline."""
284
LOG.audit(_("Flush pipeline %s"), self)
285
326
for (i, transformer) in enumerate(self.transformers):
287
328
self._publish_samples(i + 1, ctxt,
293
334
'trans': transformer}))
294
335
LOG.exception(err)
338
class Pipeline(object):
339
"""Represents a coupling between a sink and a corresponding source.
342
def __init__(self, source, sink):
345
self.name = str(self)
348
return (self.source.name if self.source.name == self.sink.name
349
else '%s:%s' % (self.source.name, self.sink.name))
296
351
def get_interval(self):
352
return self.source.interval
356
return self.source.resources
360
return self.source.discovery
362
def support_meter(self, meter_name):
363
return self.source.support_meter(meter_name)
366
def publishers(self):
367
return self.sink.publishers
369
def publish_sample(self, ctxt, sample):
370
self.publish_samples(ctxt, [sample])
372
def publish_samples(self, ctxt, samples):
373
supported = [s for s in samples if self.source.support_meter(s.name)]
374
self.sink.publish_samples(ctxt, supported)
376
def flush(self, ctxt):
377
self.sink.flush(ctxt)
300
380
class PipelineManager(object):
309
def __init__(self, cfg,
310
transformer_manager):
389
def __init__(self, cfg, transformer_manager):
311
390
"""Setup the pipelines according to config.
313
The top of the cfg is a list of pipeline definitions.
315
Pipeline definition is an dictionary specifying the target samples,
316
the tranformers involved, and the target publishers:
318
"name": pipeline_name
319
"interval": interval_time
320
"meters" : ["meter_1", "meter_2"],
321
"resources": ["resource_uri1", "resource_uri2"],
323
{"name": "Transformer_1",
324
"parameters": {"p1": "value"}},
326
{"name": "Transformer_2",
327
"parameters": {"p1": "value"}},
329
"publishers": ["publisher_1", "publisher_2"]
332
Interval is how many seconds should the samples be injected to
392
The configuration is supported in one of two forms:
394
1. Deprecated: the source and sink configuration are conflated
395
as a list of consolidated pipelines.
397
The pipelines are defined as a list of dictionaries each
398
specifying the target samples, the transformers involved,
399
and the target publishers, for example:
401
[{"name": pipeline_1,
402
"interval": interval_time,
403
"meters" : ["meter_1", "meter_2"],
404
"resources": ["resource_uri1", "resource_uri2"],
406
{"name": "Transformer_1",
407
"parameters": {"p1": "value"}},
409
{"name": "Transformer_2",
410
"parameters": {"p1": "value"}},
412
"publishers": ["publisher_1", "publisher_2"]
415
"interval": interval_time,
416
"meters" : ["meter_3"],
417
"publishers": ["publisher_3"]
421
2. Decoupled: the source and sink configuration are separately
422
specified before being linked together. This allows source-
423
specific configuration, such as resource discovery, to be
424
kept focused only on the fine-grained source while avoiding
425
the necessity for wide duplication of sink-related config.
427
The configuration is provided in the form of separate lists
428
of dictionaries defining sources and sinks, for example:
430
{"sources": [{"name": source_1,
431
"interval": interval_time,
432
"meters" : ["meter_1", "meter_2"],
433
"resources": ["resource_uri1", "resource_uri2"],
434
"sinks" : ["sink_1", "sink_2"]
437
"interval": interval_time,
438
"meters" : ["meter_3"],
442
"sinks": [{"name": sink_1,
444
{"name": "Transformer_1",
445
"parameters": {"p1": "value"}},
447
{"name": "Transformer_2",
448
"parameters": {"p1": "value"}},
450
"publishers": ["publisher_1", "publisher_2"]
453
"publishers": ["publisher_3"]
458
The semantics of the common individual configuration elements
459
are identical in the deprecated and decoupled version.
461
The interval determines the cadence of sample injection into
462
the pipeline where samples are produced under the direct control
463
of an agent, i.e. via a polling cycle as opposed to incoming
335
466
Valid meter format is '*', '!meter_name', or 'meter_name'.
336
467
'*' is wildcard symbol means any meters; '!meter_name' means
348
479
the meters should be polled. It's optional and it's up to the
349
480
specific pollster to decide how to use it.
351
Transformer's name is plugin name in setup.py.
482
Transformer's name is plugin name in setup.cfg.
353
Publisher's name is plugin name in setup.py
484
Publisher's name is plugin name in setup.cfg
356
self.pipelines = [Pipeline(pipedef, transformer_manager)
488
if 'sources' in cfg or 'sinks' in cfg:
489
if not ('sources' in cfg and 'sinks' in cfg):
490
raise PipelineException("Both sources & sinks are required",
492
LOG.info(_('detected decoupled pipeline config format'))
493
sources = [Source(s) for s in cfg.get('sources', [])]
494
sinks = dict((s['name'], Sink(s, transformer_manager))
495
for s in cfg.get('sinks', []))
496
for source in sources:
497
source.check_sinks(sinks)
498
for target in source.sinks:
499
self.pipelines.append(Pipeline(source,
502
LOG.warning(_('detected deprecated pipeline config format'))
504
source = Source(pipedef)
505
sink = Sink(pipedef, transformer_manager)
506
self.pipelines.append(Pipeline(source, sink))
359
508
def publisher(self, context):
360
509
"""Build a new Publisher for these manager pipelines.