~c-e-pidcott/maus/1389

« back to all changes in this revision

Viewing changes to tests/integration/test_distributed_processing/test_celery.py

  • Committer: Durga Rajaram
  • Date: 2013-10-01 00:19:57 UTC
  • mfrom: (659.1.74 rc)
  • Revision ID: durga@fnal.gov-20131001001957-iswih60vis9rodw0
Tags: MAUS-v0.7.1
MAUS-v0.7.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
29
29
 
30
30
from datetime import datetime
31
31
 
 
32
import celery
32
33
from celery.task.control import broadcast #pylint: disable=E0611, F0401
33
34
from celery.task.control import inspect #pylint: disable=E0611, F0401
34
35
 
 
36
import maus_cpp
35
37
from Configuration import Configuration
36
38
from MapPyTestMap import MapPyTestMap
37
39
from mauscelery.tasks import execute_transform
79
81
        # fails due to mess left by previous test.
80
82
        self.reset_worker()
81
83
        self.reset_worker()
 
84
        if maus_cpp.globals.has_instance():
 
85
            maus_cpp.globals.death()
 
86
        maus_cpp.globals.birth(self.config_doc)
82
87
 
83
88
    def reset_worker(self): # pylint:disable = R0201
84
89
        """
93
98
                       "transform":"MapPyDoNothing", 
94
99
                       "config_id":config_id}, reply=True)
95
100
 
96
 
    def birth(self, config_id, configuration = None, transform = "MapPyDoNothing"): # pylint:disable = R0201, C0301
 
101
    def birth(self, config_id, configuration = None,
 
102
              transform = "MapPyDoNothing", merge_configuration = False): # pylint:disable = R0201, C0301
97
103
        """
98
104
        Configure the Celery workers, via a broadcast.
99
105
        @param self Object reference.
104
110
        """
105
111
        if configuration == None:
106
112
            configuration = self.config_doc
 
113
        elif merge_configuration:
 
114
            my_config = json.loads(self.config_doc)
 
115
            config_user = json.loads(configuration)
 
116
            for key in config_user:
 
117
                my_config[key] = config_user[key]
 
118
            configuration = json.dumps(my_config)
107
119
        return broadcast("birth", 
108
120
            arguments={"configuration":configuration, \
109
121
                       "transform":transform, 
371
383
        result = broadcast("death", arguments={"run_number":1}, reply=True)
372
384
        self.validate_status(result)
373
385
        # Call process.
374
 
        result = execute_transform.delay("{}", 1) 
 
386
        result = execute_transform.delay("{}", 1)
375
387
        # Wait for it to complete.
376
388
        try:
377
389
            result.wait()
379
391
            pass
380
392
        self.assertTrue(result.failed(), "Expected failure")
381
393
 
 
394
    def test_process_timeout(self):
 
395
        """
 
396
        Test process timeout.
 
397
        @param self Object reference.
 
398
        """
 
399
        config_id = datetime.now().microsecond
 
400
        transform = "MapPyTestMap"
 
401
        # check that process does not time out when execution is quick
 
402
        configuration = """{"maus_version":"%s", "process_delay":9.0}""" \
 
403
                                                                % self.__version
 
404
        result = self.birth(config_id, configuration, transform, True)
 
405
        self.validate_status(result)
 
406
        # Call process.
 
407
        result = execute_transform.delay("{}", 1)
 
408
        # Wait for it to complete.
 
409
        print "Executing delayed transform"
 
410
        result.wait()
 
411
 
 
412
        # check that process does time out when execution is slow
 
413
        configuration = """{"maus_version":"%s", "process_delay":15.0}""" \
 
414
                                                                % self.__version
 
415
        result = self.birth(config_id, configuration, transform, True)
 
416
        self.validate_status(result)
 
417
        # Call process. Check that time_limit flag is ignored
 
418
        # (true for celery version < 3); if we move to a new version of celery,
 
419
        # this should fail; and we should change the timeout to be soft coded
 
420
        result = execute_transform.apply_async(["{}", 1], time_limit=20.)
 
421
        # Wait for it to complete.
 
422
        try:
 
423
            print "Executing delayed transform"
 
424
            result.wait()
 
425
            self.assertTrue(False, 'Should have failed')
 
426
        except celery.exceptions.TimeLimitExceeded:  # pylint:disable = W0703
 
427
            pass
 
428
        # should have failed
 
429
        self.assertEqual(result.state, 'FAILURE')
 
430
 
382
431
if __name__ == '__main__':
383
432
    unittest.main()