2
# Copyright 2015 Red Hat. All Rights Reserved.
4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
5
# not use this file except in compliance with the License. You may obtain
6
# a copy of the License at
8
# http://www.apache.org/licenses/LICENSE-2.0
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13
# License for the specific language governing permissions and limitations
16
"""Fixtures used during Gabbi-based test runs."""
18
from __future__ import print_function
23
from unittest import case
26
from gabbi import fixture
27
from oslo.config import fixture as fixture_config
29
from ceilometer.publisher import utils
30
from ceilometer import sample
31
from ceilometer import service
32
from ceilometer import storage
35
# TODO(chdent): For now only MongoDB is supported, because of easy
36
# database name handling and intentional focus on the API, not the
41
class ConfigFixture(fixture.GabbiFixture):
42
"""Establish the relevant configuration for a test run."""
44
def start_fixture(self):
47
service.prepare_service([])
48
conf = fixture_config.Config().conf
50
conf.import_opt('policy_file', 'ceilometer.openstack.common.policy')
51
conf.set_override('policy_file',
52
os.path.abspath('etc/ceilometer/policy.json'))
54
# A special pipeline is required to use the direct publisher.
55
conf.set_override('pipeline_cfg_file',
56
'etc/ceilometer/gabbi_pipeline.yaml')
58
# Determine the database connection.
60
for engine in ENGINES:
62
db_url = os.environ['CEILOMETER_TEST_%s_URL' % engine]
66
raise case.SkipTest('No database connection configured')
68
database_name = '%s-%s' % (db_url, str(uuid.uuid4()))
69
conf.set_override('connection', database_name, group='database')
70
conf.set_override('metering_connection', '', group='database')
71
conf.set_override('event_connection', '', group='database')
72
conf.set_override('alarm_connection', '', group='database')
74
conf.set_override('pecan_debug', True, group='api')
76
def stop_fixture(self):
77
"""Clean up the config."""
81
class SampleDataFixture(fixture.GabbiFixture):
82
"""Instantiate some sample data for use in testing."""
84
def start_fixture(self):
85
"""Create some samples."""
86
conf = fixture_config.Config().conf
87
self.conn = storage.get_connection_from_config(conf)
88
timestamp = datetime.datetime.utcnow()
89
project_id = str(uuid.uuid4())
90
self.source = str(uuid.uuid4())
91
resource_metadata = {'farmed_by': 'nancy'}
93
for name in ['cow', 'pig', 'sheep']:
94
resource_metadata.update({'breed': name}),
95
c = sample.Sample(name='livestock',
98
volume=int(10 * random.random()),
100
project_id=project_id,
101
resource_id=project_id,
103
resource_metadata=resource_metadata,
106
data = utils.meter_message_from_counter(
107
c, conf.publisher.telemetry_secret)
108
self.conn.record_metering_data(data)
110
def stop_fixture(self):
111
"""Destroy the samples."""
112
# NOTE(chdent): print here for sake of info during testing.
113
# This will go away eventually.
115
self.conn.db.resource.remove({'source': self.source}))
116
print('meter', self.conn.db.meter.remove({'source': self.source}))