30
30
from datetime import datetime
32
33
from celery.task.control import broadcast #pylint: disable=E0611, F0401
33
34
from celery.task.control import inspect #pylint: disable=E0611, F0401
35
37
from Configuration import Configuration
36
38
from MapPyTestMap import MapPyTestMap
37
39
from mauscelery.tasks import execute_transform
93
98
"transform":"MapPyDoNothing",
94
99
"config_id":config_id}, reply=True)
96
def birth(self, config_id, configuration = None, transform = "MapPyDoNothing"): # pylint:disable = R0201, C0301
101
def birth(self, config_id, configuration = None,
102
transform = "MapPyDoNothing", merge_configuration = False): # pylint:disable = R0201, C0301
98
104
Configure the Celery workers, via a broadcast.
99
105
@param self Object reference.
105
111
if configuration == None:
106
112
configuration = self.config_doc
113
elif merge_configuration:
114
my_config = json.loads(self.config_doc)
115
config_user = json.loads(configuration)
116
for key in config_user:
117
my_config[key] = config_user[key]
118
configuration = json.dumps(my_config)
107
119
return broadcast("birth",
108
120
arguments={"configuration":configuration, \
109
121
"transform":transform,
371
383
result = broadcast("death", arguments={"run_number":1}, reply=True)
372
384
self.validate_status(result)
374
result = execute_transform.delay("{}", 1)
386
result = execute_transform.delay("{}", 1)
375
387
# Wait for it to complete.
380
392
self.assertTrue(result.failed(), "Expected failure")
394
def test_process_timeout(self):
396
Test process timeout.
397
@param self Object reference.
399
config_id = datetime.now().microsecond
400
transform = "MapPyTestMap"
401
# check that process does not time out when execution is quick
402
configuration = """{"maus_version":"%s", "process_delay":9.0}""" \
404
result = self.birth(config_id, configuration, transform, True)
405
self.validate_status(result)
407
result = execute_transform.delay("{}", 1)
408
# Wait for it to complete.
409
print "Executing delayed transform"
412
# check that process does time out when execution is slow
413
configuration = """{"maus_version":"%s", "process_delay":15.0}""" \
415
result = self.birth(config_id, configuration, transform, True)
416
self.validate_status(result)
417
# Call process. Check that time_limit flag is ignored
418
# (true for celery version < 3); if we move to a new version of celery,
419
# this should fail; and we should change the timeout to be soft coded
420
result = execute_transform.apply_async(["{}", 1], time_limit=20.)
421
# Wait for it to complete.
423
print "Executing delayed transform"
425
self.assertTrue(False, 'Should have failed')
426
except celery.exceptions.TimeLimitExceeded: # pylint:disable = W0703
429
self.assertEqual(result.state, 'FAILURE')
382
431
if __name__ == '__main__':