1
# -*- encoding: utf-8 -*-
3
# Copyright 2012 Red Hat, Inc.
5
# Author: Angus Salkeld <asalkeld@redhat.com>
7
# Licensed under the Apache License, Version 2.0 (the "License"); you may
8
# not use this file except in compliance with the License. You may obtain
9
# a copy of the License at
11
# http://www.apache.org/licenses/LICENSE-2.0
13
# Unless required by applicable law or agreed to in writing, software
14
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16
# License for the specific language governing permissions and limitations
18
"""Test listing meters.
24
from oslo.config import cfg
26
from ceilometer.publisher import rpc
27
from ceilometer import counter
29
from .base import FunctionalTest
31
LOG = logging.getLogger(__name__)
34
class TestListEmptyMeters(FunctionalTest):
37
data = self.get_json('/meters')
38
self.assertEquals([], data)
41
class TestListMeters(FunctionalTest):
44
super(TestListMeters, self).setUp()
55
timestamp=datetime.datetime(2012, 7, 2, 10, 40),
56
resource_metadata={'display_name': 'test-server',
57
'tag': 'self.counter'}),
66
timestamp=datetime.datetime(2012, 7, 2, 11, 40),
67
resource_metadata={'display_name': 'test-server',
68
'tag': 'self.counter'}),
77
timestamp=datetime.datetime(2012, 7, 2, 10, 41),
78
resource_metadata={'display_name': 'test-server',
79
'tag': 'self.counter2'}),
88
timestamp=datetime.datetime(2012, 7, 2, 10, 42),
89
resource_metadata={'display_name': 'test-server',
90
'tag': 'self.counter3'}),
99
timestamp=datetime.datetime(2012, 7, 2, 10, 43),
100
resource_metadata={'display_name': 'test-server',
101
'tag': 'self.counter4'})]:
102
msg = rpc.meter_message_from_counter(
104
cfg.CONF.publisher_rpc.metering_secret,
106
self.conn.record_metering_data(msg)
108
def test_list_meters(self):
109
data = self.get_json('/meters')
110
self.assertEquals(4, len(data))
111
self.assertEquals(set(r['resource_id'] for r in data),
116
self.assertEquals(set(r['name'] for r in data),
120
def test_with_resource(self):
121
data = self.get_json('/meters', q=[{'field': 'resource_id',
122
'value': 'resource-id',
124
ids = set(r['name'] for r in data)
125
self.assertEquals(set(['meter.test']), ids)
127
def test_with_source(self):
128
data = self.get_json('/meters', q=[{'field': 'source',
129
'value': 'test_source',
131
ids = set(r['resource_id'] for r in data)
132
self.assertEquals(set(['resource-id',
135
'resource-id4']), ids)
137
def test_with_source_non_existent(self):
138
data = self.get_json('/meters',
139
q=[{'field': 'source',
140
'value': 'test_source_doesnt_exist',
145
def test_with_user(self):
146
data = self.get_json('/meters',
147
q=[{'field': 'user_id',
152
uids = set(r['user_id'] for r in data)
153
self.assertEquals(set(['user-id']), uids)
155
nids = set(r['name'] for r in data)
156
self.assertEquals(set(['meter.mine', 'meter.test']), nids)
158
rids = set(r['resource_id'] for r in data)
159
self.assertEquals(set(['resource-id', 'resource-id2']), rids)
161
def test_with_user_non_existent(self):
162
data = self.get_json('/meters',
163
q=[{'field': 'user_id',
164
'value': 'user-id-foobar123',
167
self.assertEquals(data, [])
169
def test_with_project(self):
170
data = self.get_json('/meters',
171
q=[{'field': 'project_id',
172
'value': 'project-id2',
175
ids = set(r['resource_id'] for r in data)
176
self.assertEquals(set(['resource-id3', 'resource-id4']), ids)
178
def test_with_project_non_existent(self):
179
data = self.get_json('/meters',
180
q=[{'field': 'project_id',
181
'value': 'jd-was-here',
184
self.assertEquals(data, [])