~sophie-middleton08/maus/devel

« back to all changes in this revision

Viewing changes to tests/py_unit/test_mauscelery/test_celery.py

  • Committer: Chris Rogers
  • Date: 2012-04-16 13:33:25 UTC
  • mfrom: (659.1.17 release-candidate)
  • Revision ID: chris.rogers@stfc.ac.uk-20120416133325-11tyj7pb7xs7m37m
ReleaseĀ 0.2.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
"""
2
 
Tests for Celery workers configured for MAUS. This provides tests for
3
 
"broadcast" commands to reconfigure the Celery worker and also for
4
 
executing transforms on spills.
5
 
"""
6
 
#  This file is part of MAUS: http://micewww.pp.rl.ac.uk:8080/projects/maus
7
 
8
 
#  MAUS is free software: you can redistribute it and/or modify
9
 
#  it under the terms of the GNU General Public License as published by
10
 
#  the Free Software Foundation, either version 3 of the License, or
11
 
#  (at your option) any later version.
12
 
13
 
#  MAUS is distributed in the hope that it will be useful,
14
 
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 
#  GNU General Public License for more details.
17
 
18
 
#  You should have received a copy of the GNU General Public License
19
 
#  along with MAUS.  If not, see <http://www.gnu.org/licenses/>.
20
 
 
21
 
# pylint: disable=C0103
22
 
 
23
 
import json
24
 
import unittest
25
 
 
26
 
from datetime import datetime
27
 
 
28
 
from celery.task.control import broadcast
29
 
from celery.task.control import inspect
30
 
 
31
 
from MapPyTestMap import MapPyTestMap
32
 
from mauscelery.tasks import execute_transform
33
 
 
34
 
class MausCeleryWorkerTestCase(unittest.TestCase): # pylint: disable=R0904, C0301
35
 
    """
36
 
    Test class for Celery workers configured for MAUS.
37
 
    """
38
 
 
39
 
    def setUp(self):
40
 
        """ 
41
 
        Check for at least one active Celery worker else skip the
42
 
        test.
43
 
        @param self Object reference.
44
 
        """
45
 
        self.__inspection = inspect()
46
 
        try:
47
 
            active_nodes = self.__inspection.active()
48
 
        except Exception: # pylint:disable = W0703
49
 
            unittest.TestCase.skipTest(self, 
50
 
                                       "Skip - RabbitMQ seems to be down")
51
 
        if (active_nodes == None):
52
 
            unittest.TestCase.skipTest(self, 
53
 
                                       "Skip - No active Celery workers")
54
 
        # Reset the worker. Invoke twice in case the first attempt
55
 
        # fails due to mess left by previous test.
56
 
        self.reset_worker()
57
 
        self.reset_worker()
58
 
 
59
 
    def reset_worker(self): # pylint:disable = R0201
60
 
        """
61
 
        Reset the Celery workers, via a broadcast, to have no
62
 
        configuration and to use MapPyDoNothing. 
63
 
        @param self Object reference.
64
 
        @return status of update.
65
 
        """
66
 
        config_id = datetime.now().microsecond
67
 
        broadcast("birth", 
68
 
            arguments={"configuration":"{}", \
69
 
                       "transform":"MapPyDoNothing", 
70
 
                       "config_id":config_id}, reply=True)
71
 
 
72
 
    def birth(self, config_id, configuration = "{}", transform = "MapPyDoNothing"): # pylint:disable = R0201, C0301
73
 
        """
74
 
        Configure the Celery workers, via a broadcast.
75
 
        @param self Object reference.
76
 
        @param config_id Configuration ID.
77
 
        @param configuration Configuration.
78
 
        @param transform Transform specification.
79
 
        @return status of update.
80
 
        """
81
 
        return broadcast("birth", 
82
 
            arguments={"configuration":configuration, \
83
 
                       "transform":transform, 
84
 
                       "config_id":config_id}, reply=True)
85
 
 
86
 
    def validate_configuration(self, configuration, transform,
87
 
        config_id = None):
88
 
        """
89
 
        Validate workers have the given configuration using
90
 
        set_maus_configuration.
91
 
        @param self Object reference.
92
 
        @param configuration Expected configuration.
93
 
        @param transform Expected transform specification.
94
 
        @param config_id Expected configuration ID.
95
 
        """
96
 
        result = broadcast("get_maus_configuration", reply=True)
97
 
        print "get_maus_configuration: %s " % result
98
 
        # Use built-in Celery worker inspection command to get
99
 
        # worker names.
100
 
        check_workers = self.__inspection.stats()
101
 
        check_worker_names = check_workers.keys()
102
 
        self.assertEquals(len(check_worker_names), len(result), 
103
 
            "Number of worker entries does not match that expected")
104
 
        for worker in result:
105
 
            worker_name = worker.keys()[0]
106
 
            self.assertTrue(worker_name in check_worker_names, 
107
 
                "Cannot find entry for worker %s" % worker_name)
108
 
            worker_config = worker[worker_name]
109
 
            self.assertTrue(worker_config.has_key("config_id"),
110
 
                "Configuration has no config_id entry")
111
 
            if (config_id != None):
112
 
                self.assertEquals(config_id, worker_config["config_id"],
113
 
                    "Unexpected config_id value")
114
 
            self.assertTrue(worker_config.has_key("configuration"),
115
 
                "Configuration has no configuration entry")
116
 
            self.assertEquals(configuration,
117
 
                worker_config["configuration"],
118
 
                "Unexpected configuration value")
119
 
            self.assertTrue(worker_config.has_key("transform"),
120
 
                "Configuration has no transform entry")
121
 
            self.assertEquals(transform, worker_config["transform"],
122
 
                "Unexpected transform value")
123
 
 
124
 
    def validate_status(self, report, status = "ok"):
125
 
        """
126
 
        Validate the status report from a birth or death
127
 
        command. Expect one of:
128
 
        @verbatim
129
 
        [{u'w1': {'status': 'ok'}},...]
130
 
        [{u'w1': {'status': 'unchanged'}},...]
131
 
        [{u'w1': {u'error': [{u'error': CLASS, u'message': 
132
 
            MESSAGE},...], u'status': u'error'}},...]
133
 
        @endverbatim
134
 
        @param self Object reference.
135
 
        @param report Expected status report.
136
 
        @param status Expected status.
137
 
        """
138
 
        self.assertTrue(len(report) > 0, 
139
 
            "Expected at least one worker to respond")
140
 
        # Get the worker names.
141
 
        check_workers = self.__inspection.ping().keys()
142
 
        # Sort so both are ordered by worker names.
143
 
        check_workers.sort()
144
 
        report.sort()
145
 
        while len(report) > 0:
146
 
            worker = report.pop()
147
 
            worker_name = worker.keys()[0]
148
 
            check_worker = check_workers.pop()
149
 
            self.assertEquals(check_worker, worker_name,
150
 
                              "Unexpected worker name")
151
 
            worker_status = worker[worker_name]
152
 
            self.assertTrue(worker_status.has_key("status"), 
153
 
                "Missing status entry")
154
 
            self.assertEquals(status, worker_status["status"], 
155
 
                "Unexpected status value")
156
 
            if (status == "error"): 
157
 
                self.assertTrue(worker_status.has_key("error"), 
158
 
                    "Missing error entry")
159
 
                errors = worker_status["error"]
160
 
                self.assertTrue(len(errors) > 0, 
161
 
                    "Expected a list of error information")
162
 
 
163
 
    def test_get_maus_config(self):
164
 
        """
165
 
        Test get_maus_configuration broadcast command.
166
 
        @param self Object reference.
167
 
        """
168
 
        # Check using default values.
169
 
        self.validate_configuration("{}", "MapPyDoNothing")
170
 
 
171
 
    def test_birth(self):
172
 
        """
173
 
        Test birth broadcast command.
174
 
        @param self Object reference.
175
 
        """
176
 
        config_id = datetime.now().microsecond
177
 
        transform = "MapPyPrint"
178
 
        configuration = """{"TOFconversionFactor":%s}""" % config_id
179
 
        result = self.birth(config_id, configuration, transform)
180
 
        print "birth(OK): %s " % result
181
 
        # Check the status and that the configuration has been 
182
 
        # updated.  
183
 
        self.validate_status(result)
184
 
        self.validate_configuration(configuration, transform, config_id)
185
 
        # Check that another birth with the same ID is a no-op. Use
186
 
        # different transform name and configuration to be sure.
187
 
        result = self.birth(config_id)
188
 
        print "birth(unchanged): %s " % result
189
 
        # Check the status and configuration are unchanged.
190
 
        self.validate_status(result, "unchanged")
191
 
        self.validate_configuration(configuration, transform, config_id)
192
 
 
193
 
    def test_birth_bad_config_json(self):
194
 
        """
195
 
        Test birth broadcast command with an invalid JSON
196
 
        configuration document. 
197
 
        @param self Object reference.
198
 
        """
199
 
        config_id = datetime.now().microsecond
200
 
        transform = "MapPyPrint"
201
 
        configuration = """{"TOFconversionFactor":BAD"""
202
 
        result = self.birth(config_id, configuration, transform)
203
 
        print "birth(bad JSON document): %s " % result
204
 
        self.validate_status(result, "error")
205
 
 
206
 
    def test_birth_bad_transform(self):
207
 
        """
208
 
        Test birth broadcast command with an invalid transform name.
209
 
        @param self Object reference.
210
 
        """
211
 
        config_id = datetime.now().microsecond
212
 
        transform = "MapPyUnknown"
213
 
        configuration = "{}"
214
 
        result = self.birth(config_id, configuration, transform)
215
 
        print "birth(bad transform name): %s " % result
216
 
        self.validate_status(result, "error")
217
 
 
218
 
    def test_birth_error(self):
219
 
        """
220
 
        Test birth broadcast command where the transform returns
221
 
        False when birth is called.
222
 
        @param self Object reference.
223
 
        """
224
 
        # Set up a transform that will fail when it is birthed.
225
 
        config_id = datetime.now().microsecond
226
 
        transform = "MapPyTestMap"
227
 
        configuration = """{"birth_result":%s}""" \
228
 
            % MapPyTestMap.EXCEPTION
229
 
        result = self.birth(config_id, configuration, transform)
230
 
        print "birth(transform.birth exception): %s " % result
231
 
        # Check the status specifies an error.
232
 
        self.validate_status(result, "error")
233
 
 
234
 
    def test_death_before_birth_error(self):
235
 
        """
236
 
        Test death broadcast command where the transform returns
237
 
        False when death is called prior to a birth command.
238
 
        @param self Object reference.
239
 
        """
240
 
        # Set up a transform that will fail when it is deathed.
241
 
        config_id = datetime.now().microsecond
242
 
        transform = "MapPyTestMap"
243
 
        configuration = """{"death_result":%s}""" % MapPyTestMap.EXCEPTION
244
 
        result = self.birth(config_id, configuration, transform)
245
 
        # Check the status is OK.
246
 
        self.validate_status(result)
247
 
        # Now create a new transform thereby causing the death failure
248
 
        # of the current one.
249
 
        config_id = datetime.now().microsecond
250
 
        transform = "MapPyDoNothing"
251
 
        configuration = "{}"
252
 
        result = self.birth(config_id, configuration, transform)
253
 
        print "birth(transform.death exception): %s " % result
254
 
        # Check the status specifies an error.
255
 
        self.validate_status(result, "error")
256
 
        # Try again with the same configuration ID. Expect
257
 
        # success this time.
258
 
        result = self.birth(config_id, configuration, transform)
259
 
        # Check the status and that the configuration has been
260
 
        # updated. 
261
 
        self.validate_status(result)
262
 
        self.validate_configuration(configuration, transform, config_id)
263
 
 
264
 
    def test_death(self):
265
 
        """
266
 
        Test death broadcast command.
267
 
        @param self Object reference.
268
 
        """
269
 
        result = broadcast("death", reply=True)
270
 
        result.sort()
271
 
        print "death: %s " % result
272
 
        self.validate_status(result)
273
 
        # Expect subsequent attempt to succeed.
274
 
        result = broadcast("death", reply=True)
275
 
        self.validate_status(result)
276
 
 
277
 
    def test_death_exception(self):
278
 
        """
279
 
        Test death broadcast command where the transform throws
280
 
        an exception when death is called.
281
 
        @param self Object reference.
282
 
        """
283
 
        # Set up a transform that will fail when it is deathed.
284
 
        config_id = datetime.now().microsecond
285
 
        transform = "MapPyTestMap"
286
 
        configuration = """{"death_result":%s}""" % MapPyTestMap.EXCEPTION
287
 
        result = self.birth(config_id, configuration, transform)
288
 
        # Check the status is OK.
289
 
        self.validate_status(result)
290
 
        # Now death the transform.
291
 
        result = broadcast("death", reply=True)
292
 
        print "death(transform.death exception): %s " % result
293
 
        self.validate_status(result, "error")
294
 
        # Expect subsequent attempt to succeed.
295
 
        result = broadcast("death", reply=True)
296
 
        self.validate_status(result)
297
 
 
298
 
    def test_process(self):
299
 
        """
300
 
        Test process command.
301
 
        @param self Object reference.
302
 
        """
303
 
        config_id = datetime.now().microsecond
304
 
        transform = "MapPyTestMap"
305
 
        configuration = "{}"
306
 
        result = self.birth(config_id, configuration, transform)
307
 
        self.validate_status(result)
308
 
        # Call process.
309
 
        result = execute_transform.delay("{}", 1, 1) # pylint:disable=E1101, C0301
310
 
        # Wait for it to complete.
311
 
        result.wait()
312
 
        self.assertTrue(result.successful(), "Expected success")
313
 
        spill = json.loads(result.result)
314
 
        self.assertTrue(spill.has_key("processed"), 
315
 
            "Spill does not seem to have been updated")
316
 
 
317
 
    def test_process_after_death(self):
318
 
        """
319
 
        Test process command fails after death has been called.
320
 
        @param self Object reference.
321
 
        """
322
 
        config_id = datetime.now().microsecond
323
 
        transform = "MapPyTestMap"
324
 
        configuration = "{}"
325
 
        result = self.birth(config_id, configuration, transform)
326
 
        self.validate_status(result)
327
 
        result = broadcast("death", reply=True)
328
 
        self.validate_status(result)
329
 
        # Call process.
330
 
        result = execute_transform.delay("{}", 1, 1) # pylint:disable=E1101, C0301
331
 
        # Wait for it to complete.
332
 
        try:
333
 
            result.wait()
334
 
        except Exception:  # pylint:disable = W0703
335
 
            pass
336
 
        self.assertTrue(result.failed(), "Expected failure")
337
 
 
338
 
if __name__ == '__main__':
339
 
    unittest.main()