2
Tests for Celery workers configured for MAUS. This provides tests for
3
"broadcast" commands to reconfigure the Celery worker and also for
4
executing transforms on spills.
6
# This file is part of MAUS: http://micewww.pp.rl.ac.uk:8080/projects/maus
8
# MAUS is free software: you can redistribute it and/or modify
9
# it under the terms of the GNU General Public License as published by
10
# the Free Software Foundation, either version 3 of the License, or
11
# (at your option) any later version.
13
# MAUS is distributed in the hope that it will be useful,
14
# but WITHOUT ANY WARRANTY; without even the implied warranty of
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16
# GNU General Public License for more details.
18
# You should have received a copy of the GNU General Public License
19
# along with MAUS. If not, see <http://www.gnu.org/licenses/>.
21
# pylint: disable=C0103
26
from datetime import datetime
28
from celery.task.control import broadcast
29
from celery.task.control import inspect
31
from MapPyTestMap import MapPyTestMap
32
from mauscelery.tasks import execute_transform
34
class MausCeleryWorkerTestCase(unittest.TestCase): # pylint: disable=R0904, C0301
36
Test class for Celery workers configured for MAUS.
41
Check for at least one active Celery worker else skip the
43
@param self Object reference.
45
self.__inspection = inspect()
47
active_nodes = self.__inspection.active()
48
except Exception: # pylint:disable = W0703
49
unittest.TestCase.skipTest(self,
50
"Skip - RabbitMQ seems to be down")
51
if (active_nodes == None):
52
unittest.TestCase.skipTest(self,
53
"Skip - No active Celery workers")
54
# Reset the worker. Invoke twice in case the first attempt
55
# fails due to mess left by previous test.
59
def reset_worker(self): # pylint:disable = R0201
61
Reset the Celery workers, via a broadcast, to have no
62
configuration and to use MapPyDoNothing.
63
@param self Object reference.
64
@return status of update.
66
config_id = datetime.now().microsecond
68
arguments={"configuration":"{}", \
69
"transform":"MapPyDoNothing",
70
"config_id":config_id}, reply=True)
72
def birth(self, config_id, configuration = "{}", transform = "MapPyDoNothing"): # pylint:disable = R0201, C0301
74
Configure the Celery workers, via a broadcast.
75
@param self Object reference.
76
@param config_id Configuration ID.
77
@param configuration Configuration.
78
@param transform Transform specification.
79
@return status of update.
81
return broadcast("birth",
82
arguments={"configuration":configuration, \
83
"transform":transform,
84
"config_id":config_id}, reply=True)
86
def validate_configuration(self, configuration, transform,
89
Validate workers have the given configuration using
90
set_maus_configuration.
91
@param self Object reference.
92
@param configuration Expected configuration.
93
@param transform Expected transform specification.
94
@param config_id Expected configuration ID.
96
result = broadcast("get_maus_configuration", reply=True)
97
print "get_maus_configuration: %s " % result
98
# Use built-in Celery worker inspection command to get
100
check_workers = self.__inspection.stats()
101
check_worker_names = check_workers.keys()
102
self.assertEquals(len(check_worker_names), len(result),
103
"Number of worker entries does not match that expected")
104
for worker in result:
105
worker_name = worker.keys()[0]
106
self.assertTrue(worker_name in check_worker_names,
107
"Cannot find entry for worker %s" % worker_name)
108
worker_config = worker[worker_name]
109
self.assertTrue(worker_config.has_key("config_id"),
110
"Configuration has no config_id entry")
111
if (config_id != None):
112
self.assertEquals(config_id, worker_config["config_id"],
113
"Unexpected config_id value")
114
self.assertTrue(worker_config.has_key("configuration"),
115
"Configuration has no configuration entry")
116
self.assertEquals(configuration,
117
worker_config["configuration"],
118
"Unexpected configuration value")
119
self.assertTrue(worker_config.has_key("transform"),
120
"Configuration has no transform entry")
121
self.assertEquals(transform, worker_config["transform"],
122
"Unexpected transform value")
124
def validate_status(self, report, status = "ok"):
126
Validate the status report from a birth or death
127
command. Expect one of:
129
[{u'w1': {'status': 'ok'}},...]
130
[{u'w1': {'status': 'unchanged'}},...]
131
[{u'w1': {u'error': [{u'error': CLASS, u'message':
132
MESSAGE},...], u'status': u'error'}},...]
134
@param self Object reference.
135
@param report Expected status report.
136
@param status Expected status.
138
self.assertTrue(len(report) > 0,
139
"Expected at least one worker to respond")
140
# Get the worker names.
141
check_workers = self.__inspection.ping().keys()
142
# Sort so both are ordered by worker names.
145
while len(report) > 0:
146
worker = report.pop()
147
worker_name = worker.keys()[0]
148
check_worker = check_workers.pop()
149
self.assertEquals(check_worker, worker_name,
150
"Unexpected worker name")
151
worker_status = worker[worker_name]
152
self.assertTrue(worker_status.has_key("status"),
153
"Missing status entry")
154
self.assertEquals(status, worker_status["status"],
155
"Unexpected status value")
156
if (status == "error"):
157
self.assertTrue(worker_status.has_key("error"),
158
"Missing error entry")
159
errors = worker_status["error"]
160
self.assertTrue(len(errors) > 0,
161
"Expected a list of error information")
163
def test_get_maus_config(self):
165
Test get_maus_configuration broadcast command.
166
@param self Object reference.
168
# Check using default values.
169
self.validate_configuration("{}", "MapPyDoNothing")
171
def test_birth(self):
173
Test birth broadcast command.
174
@param self Object reference.
176
config_id = datetime.now().microsecond
177
transform = "MapPyPrint"
178
configuration = """{"TOFconversionFactor":%s}""" % config_id
179
result = self.birth(config_id, configuration, transform)
180
print "birth(OK): %s " % result
181
# Check the status and that the configuration has been
183
self.validate_status(result)
184
self.validate_configuration(configuration, transform, config_id)
185
# Check that another birth with the same ID is a no-op. Use
186
# different transform name and configuration to be sure.
187
result = self.birth(config_id)
188
print "birth(unchanged): %s " % result
189
# Check the status and configuration are unchanged.
190
self.validate_status(result, "unchanged")
191
self.validate_configuration(configuration, transform, config_id)
193
def test_birth_bad_config_json(self):
195
Test birth broadcast command with an invalid JSON
196
configuration document.
197
@param self Object reference.
199
config_id = datetime.now().microsecond
200
transform = "MapPyPrint"
201
configuration = """{"TOFconversionFactor":BAD"""
202
result = self.birth(config_id, configuration, transform)
203
print "birth(bad JSON document): %s " % result
204
self.validate_status(result, "error")
206
def test_birth_bad_transform(self):
208
Test birth broadcast command with an invalid transform name.
209
@param self Object reference.
211
config_id = datetime.now().microsecond
212
transform = "MapPyUnknown"
214
result = self.birth(config_id, configuration, transform)
215
print "birth(bad transform name): %s " % result
216
self.validate_status(result, "error")
218
def test_birth_error(self):
220
Test birth broadcast command where the transform returns
221
False when birth is called.
222
@param self Object reference.
224
# Set up a transform that will fail when it is birthed.
225
config_id = datetime.now().microsecond
226
transform = "MapPyTestMap"
227
configuration = """{"birth_result":%s}""" \
228
% MapPyTestMap.EXCEPTION
229
result = self.birth(config_id, configuration, transform)
230
print "birth(transform.birth exception): %s " % result
231
# Check the status specifies an error.
232
self.validate_status(result, "error")
234
def test_death_before_birth_error(self):
236
Test death broadcast command where the transform returns
237
False when death is called prior to a birth command.
238
@param self Object reference.
240
# Set up a transform that will fail when it is deathed.
241
config_id = datetime.now().microsecond
242
transform = "MapPyTestMap"
243
configuration = """{"death_result":%s}""" % MapPyTestMap.EXCEPTION
244
result = self.birth(config_id, configuration, transform)
245
# Check the status is OK.
246
self.validate_status(result)
247
# Now create a new transform thereby causing the death failure
248
# of the current one.
249
config_id = datetime.now().microsecond
250
transform = "MapPyDoNothing"
252
result = self.birth(config_id, configuration, transform)
253
print "birth(transform.death exception): %s " % result
254
# Check the status specifies an error.
255
self.validate_status(result, "error")
256
# Try again with the same configuration ID. Expect
258
result = self.birth(config_id, configuration, transform)
259
# Check the status and that the configuration has been
261
self.validate_status(result)
262
self.validate_configuration(configuration, transform, config_id)
264
def test_death(self):
266
Test death broadcast command.
267
@param self Object reference.
269
result = broadcast("death", reply=True)
271
print "death: %s " % result
272
self.validate_status(result)
273
# Expect subsequent attempt to succeed.
274
result = broadcast("death", reply=True)
275
self.validate_status(result)
277
def test_death_exception(self):
279
Test death broadcast command where the transform throws
280
an exception when death is called.
281
@param self Object reference.
283
# Set up a transform that will fail when it is deathed.
284
config_id = datetime.now().microsecond
285
transform = "MapPyTestMap"
286
configuration = """{"death_result":%s}""" % MapPyTestMap.EXCEPTION
287
result = self.birth(config_id, configuration, transform)
288
# Check the status is OK.
289
self.validate_status(result)
290
# Now death the transform.
291
result = broadcast("death", reply=True)
292
print "death(transform.death exception): %s " % result
293
self.validate_status(result, "error")
294
# Expect subsequent attempt to succeed.
295
result = broadcast("death", reply=True)
296
self.validate_status(result)
298
def test_process(self):
300
Test process command.
301
@param self Object reference.
303
config_id = datetime.now().microsecond
304
transform = "MapPyTestMap"
306
result = self.birth(config_id, configuration, transform)
307
self.validate_status(result)
309
result = execute_transform.delay("{}", 1, 1) # pylint:disable=E1101, C0301
310
# Wait for it to complete.
312
self.assertTrue(result.successful(), "Expected success")
313
spill = json.loads(result.result)
314
self.assertTrue(spill.has_key("processed"),
315
"Spill does not seem to have been updated")
317
def test_process_after_death(self):
319
Test process command fails after death has been called.
320
@param self Object reference.
322
config_id = datetime.now().microsecond
323
transform = "MapPyTestMap"
325
result = self.birth(config_id, configuration, transform)
326
self.validate_status(result)
327
result = broadcast("death", reply=True)
328
self.validate_status(result)
330
result = execute_transform.delay("{}", 1, 1) # pylint:disable=E1101, C0301
331
# Wait for it to complete.
334
except Exception: # pylint:disable = W0703
336
self.assertTrue(result.failed(), "Expected failure")
338
if __name__ == '__main__':