~ubuntu-branches/ubuntu/vivid/ceilometer/vivid

« back to all changes in this revision

Viewing changes to ceilometer/tests/test_collector.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2014-03-06 14:44:28 UTC
  • mto: (28.1.1 utopic-proposed) (1.2.1)
  • mto: This revision was merged to the branch mainline in revision 19.
  • Revision ID: package-import@ubuntu.com-20140306144428-rvphsh4igwyulzf0
Tags: upstream-2014.1~b3
ImportĀ upstreamĀ versionĀ 2014.1~b3

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
from mock import patch
22
22
import msgpack
23
23
from stevedore import extension
24
 
from stevedore.tests import manager as test_manager
25
24
 
26
25
from ceilometer import collector
27
26
from ceilometer.openstack.common.fixture import config
52
51
            resource_metadata={},
53
52
        ).as_dict()
54
53
 
 
54
    def _make_test_manager(self, plugin):
 
55
        return extension.ExtensionManager.make_test_instance([
 
56
            extension.Extension(
 
57
                'test',
 
58
                None,
 
59
                None,
 
60
                plugin,
 
61
            ),
 
62
        ])
 
63
 
55
64
    def _make_fake_socket(self):
56
65
        def recvfrom(size):
57
66
            # Make the loop stop
71
80
 
72
81
    def test_record_metering_data(self):
73
82
        mock_dispatcher = mock.MagicMock()
74
 
        self.srv.dispatcher_manager = test_manager.TestExtensionManager(
75
 
            [extension.Extension('test',
76
 
                                 None,
77
 
                                 None,
78
 
                                 mock_dispatcher
79
 
                                 ),
80
 
             ])
 
83
        self.srv.dispatcher_manager = self._make_test_manager(mock_dispatcher)
81
84
 
82
85
        self.srv.record_metering_data(None, self.counter)
83
86
 
86
89
 
87
90
    def test_udp_receive(self):
88
91
        mock_dispatcher = mock.MagicMock()
89
 
        self.srv.dispatcher_manager = test_manager.TestExtensionManager(
90
 
            [extension.Extension('test',
91
 
                                 None,
92
 
                                 None,
93
 
                                 mock_dispatcher
94
 
                                 ),
95
 
             ])
 
92
        self.srv.dispatcher_manager = self._make_test_manager(mock_dispatcher)
96
93
        self.counter['source'] = 'mysource'
97
94
        self.counter['counter_name'] = self.counter['name']
98
95
        self.counter['counter_volume'] = self.counter['volume']
110
107
 
111
108
    def test_udp_receive_storage_error(self):
112
109
        mock_dispatcher = mock.MagicMock()
113
 
        self.srv.dispatcher_manager = test_manager.TestExtensionManager(
114
 
            [extension.Extension('test',
115
 
                                 None,
116
 
                                 None,
117
 
                                 mock_dispatcher
118
 
                                 ),
119
 
             ])
 
110
        self.srv.dispatcher_manager = self._make_test_manager(mock_dispatcher)
120
111
        mock_dispatcher.record_metering_data.side_effect = self._raise_error
121
112
 
122
113
        self.counter['source'] = 'mysource'