2
Tests for Celery workers configured for MAUS. This provides tests for
3
"broadcast" commands to reconfigure the Celery worker and also for
4
executing transforms on spills.
6
# This file is part of MAUS: http://micewww.pp.rl.ac.uk:8080/projects/maus
8
# MAUS is free software: you can redistribute it and/or modify
9
# it under the terms of the GNU General Public License as published by
10
# the Free Software Foundation, either version 3 of the License, or
11
# (at your option) any later version.
13
# MAUS is distributed in the hope that it will be useful,
14
# but WITHOUT ANY WARRANTY; without even the implied warranty of
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16
# GNU General Public License for more details.
18
# You should have received a copy of the GNU General Public License
19
# along with MAUS. If not, see <http://www.gnu.org/licenses/>.
21
# pylint: disable=E1101
22
# pylint: disable=C0103
30
from datetime import datetime
33
from celery.task.control import broadcast #pylint: disable=E0611, F0401
34
from celery.task.control import inspect #pylint: disable=E0611, F0401
37
from Configuration import Configuration
38
from MapPyTestMap import MapPyTestMap
39
from mauscelery.tasks import execute_transform
41
class MausCeleryWorkerTestCase(unittest.TestCase): # pylint: disable=R0904, C0301
43
Test class for Celery workers configured for MAUS.
51
cls.proc = subprocess.Popen(['celeryd', '-lINFO', '-c2', '--purge'])
55
def tearDownClass(cls):
57
print "Killing celeryd process", cls.proc.pid
58
cls.proc.send_signal(signal.SIGKILL)
62
Check for at least one active Celery worker else skip the
64
@param self Object reference.
66
self.__inspection = inspect()
68
active_nodes = self.__inspection.active()
69
except Exception: # pylint:disable = W0703
70
unittest.TestCase.skipTest(self,
71
"Skip - RabbitMQ seems to be down")
72
if (active_nodes == None):
73
unittest.TestCase.skipTest(self,
74
"Skip - No active Celery workers")
75
# Get the current MAUS version.
76
configuration = Configuration()
77
self.config_doc = configuration.getConfigJSON()
78
config_dictionary = json.loads(self.config_doc)
79
self.__version = config_dictionary["maus_version"]
80
# Reset the worker. Invoke twice in case the first attempt
81
# fails due to mess left by previous test.
84
if maus_cpp.globals.has_instance():
85
maus_cpp.globals.death()
86
maus_cpp.globals.birth(self.config_doc)
88
def reset_worker(self): # pylint:disable = R0201
90
Reset the Celery workers, via a broadcast, to have no
91
configuration and to use MapPyDoNothing.
92
@param self Object reference.
93
@return status of update.
95
config_id = datetime.now().microsecond
97
arguments={"configuration":self.config_doc,
98
"transform":"MapPyDoNothing",
99
"config_id":config_id}, reply=True)
101
def birth(self, config_id, configuration = None,
102
transform = "MapPyDoNothing", merge_configuration = False): # pylint:disable = R0201, C0301
104
Configure the Celery workers, via a broadcast.
105
@param self Object reference.
106
@param config_id Configuration ID.
107
@param configuration Configuration.
108
@param transform Transform specification.
109
@return status of update.
111
if configuration == None:
112
configuration = self.config_doc
113
elif merge_configuration:
114
my_config = json.loads(self.config_doc)
115
config_user = json.loads(configuration)
116
for key in config_user:
117
my_config[key] = config_user[key]
118
configuration = json.dumps(my_config)
119
return broadcast("birth",
120
arguments={"configuration":configuration, \
121
"transform":transform,
122
"config_id":config_id,
123
"run_number":1}, reply=True)
125
def validate_configuration(self, configuration, transform,
128
Validate workers have the given configuration using
129
set_maus_configuration and the same MAUS version as the
131
@param self Object reference.
132
@param configuration Expected configuration.
133
@param transform Expected transform specification.
134
@param config_id Expected configuration ID.
136
result = broadcast("get_maus_configuration", reply=True)
137
print "get_maus_configuration: %s " % result
138
# Use built-in Celery worker inspection command to get
140
check_workers = self.__inspection.stats()
141
check_worker_names = check_workers.keys()
142
self.assertEquals(len(check_worker_names), len(result),
143
"Number of worker entries does not match that expected")
144
for worker in result:
145
worker_name = worker.keys()[0]
146
self.assertTrue(worker_name in check_worker_names,
147
"Cannot find entry for worker %s" % worker_name)
148
worker_config = worker[worker_name]
149
self.assertTrue(worker_config.has_key("config_id"),
150
"Configuration has no config_id entry")
151
if (config_id != None):
152
self.assertEquals(config_id, worker_config["config_id"],
153
"Unexpected config_id value")
154
self.assertTrue(worker_config.has_key("configuration"),
155
"Configuration has no configuration entry")
156
self.assertEquals(configuration,
157
worker_config["configuration"],
158
"Unexpected configuration value\n\n%s\n\n%s" % (configuration,
159
worker_config["configuration"]))
160
self.assertTrue(worker_config.has_key("transform"),
161
"Configuration has no transform entry")
162
self.assertEquals(transform, worker_config["transform"],
163
"Unexpected transform value")
164
self.assertTrue(worker_config.has_key("version"),
165
"Configuration has no version entry")
166
self.assertEquals(self.__version,
167
worker_config["version"],
168
"Unexpected version value")
170
def validate_status(self, report, status = "ok"):
172
Validate the status report from a birth or death
173
command. Expect one of:
175
[{u'w1': {'status': 'ok'}},...]
176
[{u'w1': {u'error': [{u'error': CLASS, u'message':
177
MESSAGE},...], u'status': u'error'}},...]
179
@param self Object reference.
180
@param report Status report.
181
@param status Expected status.
183
self.assertTrue(len(report) > 0,
184
"Expected at least one worker to respond")
185
# Get the worker names.
186
check_workers = self.__inspection.ping().keys()
187
# Sort so both are ordered by worker names.
190
while len(report) > 0:
191
worker = report.pop()
192
worker_name = worker.keys()[0]
193
check_worker = check_workers.pop()
194
self.assertEquals(check_worker, worker_name,
195
"Unexpected worker name")
196
worker_status = worker[worker_name]
197
self.assertTrue(worker_status.has_key("status"),
198
"Missing status entry")
199
self.assertEquals(status, worker_status["status"],
200
"Unexpected status value")
201
if (status == "error"):
202
self.assertTrue(worker_status.has_key("error"),
203
"Missing error entry")
204
errors = worker_status["error"]
205
self.assertTrue(len(errors) > 0,
206
"Expected a list of error information")
208
def test_birth(self):
210
Test birth broadcast command.
212
Note we also test mausworker.get_maus_configuration here.
214
@param self Object reference.
216
config_id = datetime.now().microsecond
217
transform = "MapPyPrint"
218
configuration = json.loads(self.config_doc)
219
configuration["TOFconversionFactor"] = 1
220
configuration = json.dumps(configuration)
221
result = self.birth(config_id, configuration, transform)
222
print "birth(OK): %s " % result
223
# Check the status and that the configuration has been
225
self.validate_status(result)
226
self.validate_configuration(configuration, transform, config_id)
227
# Check that another birth with the same ID is a no-op. Use
228
# different transform name and configuration to be sure.
229
result = self.birth(config_id, configuration, transform)
230
print "birth(OK): %s " % result
231
# Check the status and configuration are OK.
232
self.validate_status(result)
233
self.validate_configuration(configuration, transform, config_id)
235
def test_birth_bad_config_json(self):
237
Test birth broadcast command with an invalid JSON
238
configuration document.
239
@param self Object reference.
241
config_id = datetime.now().microsecond
242
transform = "MapPyPrint"
243
configuration = """{"TOFconversionFactor":BADJSON"""
244
result = self.birth(config_id, configuration, transform)
245
print "birth(bad JSON document): %s " % result
246
self.validate_status(result, "error")
248
def test_birth_bad_version(self):
250
Test birth broadcast command with a mismatched MAUS version.
251
@param self Object reference.
253
config_id = datetime.now().microsecond
254
transform = "MapPyPrint"
255
configuration = """{"maus_version":"BAD"}"""
256
result = self.birth(config_id, configuration, transform)
257
print "birth(bad version): %s " % result
258
self.validate_status(result, "error")
260
def test_birth_bad_transform(self):
262
Test birth broadcast command with an invalid transform name.
263
@param self Object reference.
265
config_id = datetime.now().microsecond
266
transform = "MapPyUnknown"
267
configuration = """{"maus_version":"%s"}""" % self.__version
268
result = self.birth(config_id, configuration, transform)
269
print "birth(bad transform name): %s " % result
270
self.validate_status(result, "error")
272
def test_birth_error(self):
274
Test birth broadcast command where the transform returns
275
False when birth is called.
276
@param self Object reference.
278
# Set up a transform that will fail when it is birthed.
279
config_id = datetime.now().microsecond
280
transform = "MapPyTestMap"
281
configuration = """{"birth_result":%s, "maus_version":"%s"}""" \
282
% (MapPyTestMap.EXCEPTION, self.__version)
283
result = self.birth(config_id, configuration, transform)
284
print "birth(transform.birth exception): %s " % result
285
# Check the status specifies an error.
286
self.validate_status(result, "error")
288
def test_death_before_birth_error(self):
290
Test death broadcast command where the transform returns
291
False when death is called prior to a birth command.
292
@param self Object reference.
294
# Set up a transform that will fail when it is deathed.
295
config_id = datetime.now().microsecond
296
transform = "MapPyTestMap"
297
configuration = """{"death_result":%s, "maus_version":"%s"}""" \
298
% (MapPyTestMap.EXCEPTION, self.__version)
299
result = self.birth(config_id, configuration, transform)
300
# Check the status is OK.
301
self.validate_status(result)
302
# Now create a new transform thereby causing the death failure
303
# of the current one.
304
config_id = datetime.now().microsecond
305
transform = "MapPyDoNothing"
306
configuration = """{"maus_version":"%s"}""" % self.__version
307
result = self.birth(config_id, configuration, transform)
308
print "birth(transform.death exception): %s " % result
309
# Check the status specifies an error.
310
self.validate_status(result, "error")
311
# Try again with the same configuration ID. Expect
313
result = self.birth(config_id, configuration, transform)
314
# Check the status and that the configuration has been
316
self.validate_status(result)
317
self.validate_configuration(configuration, transform, config_id)
319
def test_death(self):
321
Test death broadcast command.
322
@param self Object reference.
324
result = broadcast("death", arguments={"run_number":1}, reply=True)
326
print "death: %s " % result
327
self.validate_status(result)
328
# Expect subsequent attempt to succeed.
329
result = broadcast("death", arguments={"run_number":1}, reply=True)
330
self.validate_status(result)
332
def test_death_exception(self):
334
Test death broadcast command where the transform throws
335
an exception when death is called.
336
@param self Object reference.
338
# Set up a transform that will fail when it is deathed.
339
config_id = datetime.now().microsecond
340
transform = "MapPyTestMap"
341
configuration = """{"death_result":%s, "maus_version":"%s"}""" \
342
% (MapPyTestMap.EXCEPTION, self.__version)
343
result = self.birth(config_id, configuration, transform)
344
# Check the status is OK.
345
self.validate_status(result)
346
# Now death the transform.
347
result = broadcast("death", arguments={"run_number":1}, reply=True)
348
print "death(transform.death exception): %s " % result
349
self.validate_status(result, "error")
350
# Expect subsequent attempt to succeed.
351
result = broadcast("death", arguments={"run_number":1}, reply=True)
352
self.validate_status(result)
354
def test_process(self):
356
Test process command.
357
@param self Object reference.
359
config_id = datetime.now().microsecond
360
transform = "MapPyTestMap"
361
configuration = """{"maus_version":"%s"}""" % self.__version
362
result = self.birth(config_id, configuration, transform)
363
self.validate_status(result)
365
result = execute_transform.delay("{}", 1)
366
# Wait for it to complete.
368
self.assertTrue(result.successful(), "Expected success")
369
spill = json.loads(result.result)
370
self.assertTrue(spill.has_key("processed"),
371
"Spill does not seem to have been updated")
373
def test_process_after_death(self):
375
Test process command fails after death has been called.
376
@param self Object reference.
378
config_id = datetime.now().microsecond
379
transform = "MapPyTestMap"
380
configuration = """{"maus_version":"%s"}""" % self.__version
381
result = self.birth(config_id, configuration, transform)
382
self.validate_status(result)
383
result = broadcast("death", arguments={"run_number":1}, reply=True)
384
self.validate_status(result)
386
result = execute_transform.delay("{}", 1)
387
# Wait for it to complete.
390
except Exception: # pylint:disable = W0703
392
self.assertTrue(result.failed(), "Expected failure")
394
def test_process_timeout(self):
396
Test process timeout.
397
@param self Object reference.
399
config_id = datetime.now().microsecond
400
transform = "MapPyTestMap"
401
# check that process does not time out when execution is quick
402
configuration = """{"maus_version":"%s", "process_delay":9.0}""" \
404
result = self.birth(config_id, configuration, transform, True)
405
self.validate_status(result)
407
result = execute_transform.delay("{}", 1)
408
# Wait for it to complete.
409
print "Executing delayed transform"
412
# check that process does time out when execution is slow
413
configuration = """{"maus_version":"%s", "process_delay":15.0}""" \
415
result = self.birth(config_id, configuration, transform, True)
416
self.validate_status(result)
417
# Call process. Check that time_limit flag is ignored
418
# (true for celery version < 3); if we move to a new version of celery,
419
# this should fail; and we should change the timeout to be soft coded
420
result = execute_transform.apply_async(["{}", 1], time_limit=20.)
421
# Wait for it to complete.
423
print "Executing delayed transform"
425
self.assertTrue(False, 'Should have failed')
426
except celery.exceptions.TimeLimitExceeded: # pylint:disable = W0703
429
self.assertEqual(result.state, 'FAILURE')
431
if __name__ == '__main__':