~mbogomilov/maus/devel3

« back to all changes in this revision

Viewing changes to tests/integration/test_distributed_processing/test_celery.py

  • Committer: Durga Rajaram
  • Date: 2014-01-14 04:39:44 UTC
  • mfrom: (663.38.24 merge)
  • mto: (698.1.1 release)
  • mto: This revision was merged to the branch mainline in revision 693.
  • Revision ID: durga@fnal.gov-20140114043944-qf0i8nksnwu74cll
candidate 0.7.6 - trunk r1026

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
"""
2
 
Tests for Celery workers configured for MAUS. This provides tests for
3
 
"broadcast" commands to reconfigure the Celery worker and also for
4
 
executing transforms on spills.
5
 
"""
6
 
#  This file is part of MAUS: http://micewww.pp.rl.ac.uk:8080/projects/maus
7
 
8
 
#  MAUS is free software: you can redistribute it and/or modify
9
 
#  it under the terms of the GNU General Public License as published by
10
 
#  the Free Software Foundation, either version 3 of the License, or
11
 
#  (at your option) any later version.
12
 
13
 
#  MAUS is distributed in the hope that it will be useful,
14
 
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 
#  GNU General Public License for more details.
17
 
18
 
#  You should have received a copy of the GNU General Public License
19
 
#  along with MAUS.  If not, see <http://www.gnu.org/licenses/>.
20
 
 
21
 
# pylint: disable=E1101
22
 
# pylint: disable=C0103
23
 
 
24
 
import json
25
 
import unittest
26
 
import subprocess
27
 
import signal
28
 
import time
29
 
 
30
 
from datetime import datetime
31
 
 
32
 
import celery
33
 
from celery.task.control import broadcast #pylint: disable=E0611, F0401
34
 
from celery.task.control import inspect #pylint: disable=E0611, F0401
35
 
 
36
 
import maus_cpp
37
 
from Configuration import Configuration
38
 
from MapPyTestMap import MapPyTestMap
39
 
from mauscelery.tasks import execute_transform
40
 
 
41
 
class MausCeleryWorkerTestCase(unittest.TestCase): # pylint: disable=R0904, C0301
42
 
    """
43
 
    Test class for Celery workers configured for MAUS.
44
 
    """
45
 
 
46
 
    proc = None
47
 
    default_config = None
48
 
 
49
 
    @classmethod
50
 
    def setUpClass(cls):
51
 
        cls.proc = subprocess.Popen(['celeryd', '-lINFO', '-c2', '--purge'])
52
 
        time.sleep(1)
53
 
 
54
 
    @classmethod
55
 
    def tearDownClass(cls):
56
 
        if cls.proc != None:
57
 
            print "Killing celeryd process", cls.proc.pid
58
 
            cls.proc.send_signal(signal.SIGKILL)
59
 
 
60
 
    def setUp(self):
61
 
        """ 
62
 
        Check for at least one active Celery worker else skip the
63
 
        test.
64
 
        @param self Object reference.
65
 
        """
66
 
        self.__inspection = inspect()
67
 
        try:
68
 
            active_nodes = self.__inspection.active()
69
 
        except Exception: # pylint:disable = W0703
70
 
            unittest.TestCase.skipTest(self, 
71
 
                                       "Skip - RabbitMQ seems to be down")
72
 
        if (active_nodes == None):
73
 
            unittest.TestCase.skipTest(self, 
74
 
                                       "Skip - No active Celery workers")
75
 
        # Get the current MAUS version.
76
 
        configuration  = Configuration()
77
 
        self.config_doc = configuration.getConfigJSON()
78
 
        config_dictionary = json.loads(self.config_doc)
79
 
        self.__version = config_dictionary["maus_version"]
80
 
        # Reset the worker. Invoke twice in case the first attempt
81
 
        # fails due to mess left by previous test.
82
 
        self.reset_worker()
83
 
        self.reset_worker()
84
 
        if maus_cpp.globals.has_instance():
85
 
            maus_cpp.globals.death()
86
 
        maus_cpp.globals.birth(self.config_doc)
87
 
 
88
 
    def reset_worker(self): # pylint:disable = R0201
89
 
        """
90
 
        Reset the Celery workers, via a broadcast, to have no
91
 
        configuration and to use MapPyDoNothing. 
92
 
        @param self Object reference.
93
 
        @return status of update.
94
 
        """
95
 
        config_id = datetime.now().microsecond
96
 
        broadcast("birth", 
97
 
            arguments={"configuration":self.config_doc,
98
 
                       "transform":"MapPyDoNothing", 
99
 
                       "config_id":config_id}, reply=True)
100
 
 
101
 
    def birth(self, config_id, configuration = None,
102
 
              transform = "MapPyDoNothing", merge_configuration = False): # pylint:disable = R0201, C0301
103
 
        """
104
 
        Configure the Celery workers, via a broadcast.
105
 
        @param self Object reference.
106
 
        @param config_id Configuration ID.
107
 
        @param configuration Configuration.
108
 
        @param transform Transform specification.
109
 
        @return status of update.
110
 
        """
111
 
        if configuration == None:
112
 
            configuration = self.config_doc
113
 
        elif merge_configuration:
114
 
            my_config = json.loads(self.config_doc)
115
 
            config_user = json.loads(configuration)
116
 
            for key in config_user:
117
 
                my_config[key] = config_user[key]
118
 
            configuration = json.dumps(my_config)
119
 
        return broadcast("birth", 
120
 
            arguments={"configuration":configuration, \
121
 
                       "transform":transform, 
122
 
                       "config_id":config_id,
123
 
                       "run_number":1}, reply=True)
124
 
 
125
 
    def validate_configuration(self, configuration, transform,
126
 
        config_id = None):
127
 
        """
128
 
        Validate workers have the given configuration using
129
 
        set_maus_configuration and the same MAUS version as the
130
 
        test class.
131
 
        @param self Object reference.
132
 
        @param configuration Expected configuration.
133
 
        @param transform Expected transform specification.
134
 
        @param config_id Expected configuration ID.
135
 
        """
136
 
        result = broadcast("get_maus_configuration", reply=True)
137
 
        print "get_maus_configuration: %s " % result
138
 
        # Use built-in Celery worker inspection command to get
139
 
        # worker names.
140
 
        check_workers = self.__inspection.stats()
141
 
        check_worker_names = check_workers.keys()
142
 
        self.assertEquals(len(check_worker_names), len(result), 
143
 
            "Number of worker entries does not match that expected")
144
 
        for worker in result:
145
 
            worker_name = worker.keys()[0]
146
 
            self.assertTrue(worker_name in check_worker_names, 
147
 
                "Cannot find entry for worker %s" % worker_name)
148
 
            worker_config = worker[worker_name]
149
 
            self.assertTrue(worker_config.has_key("config_id"),
150
 
                "Configuration has no config_id entry")
151
 
            if (config_id != None):
152
 
                self.assertEquals(config_id, worker_config["config_id"],
153
 
                    "Unexpected config_id value")
154
 
            self.assertTrue(worker_config.has_key("configuration"),
155
 
                "Configuration has no configuration entry")
156
 
            self.assertEquals(configuration,
157
 
                worker_config["configuration"],
158
 
                "Unexpected configuration value\n\n%s\n\n%s" % (configuration,
159
 
                worker_config["configuration"]))
160
 
            self.assertTrue(worker_config.has_key("transform"),
161
 
                "Configuration has no transform entry")
162
 
            self.assertEquals(transform, worker_config["transform"],
163
 
                "Unexpected transform value")
164
 
            self.assertTrue(worker_config.has_key("version"),
165
 
                "Configuration has no version entry")
166
 
            self.assertEquals(self.__version, 
167
 
                worker_config["version"],
168
 
                "Unexpected version value")
169
 
 
170
 
    def validate_status(self, report, status = "ok"):
171
 
        """
172
 
        Validate the status report from a birth or death
173
 
        command. Expect one of:
174
 
        @verbatim
175
 
        [{u'w1': {'status': 'ok'}},...]
176
 
        [{u'w1': {u'error': [{u'error': CLASS, u'message': 
177
 
            MESSAGE},...], u'status': u'error'}},...]
178
 
        @endverbatim
179
 
        @param self Object reference.
180
 
        @param report Status report.
181
 
        @param status Expected status.
182
 
        """
183
 
        self.assertTrue(len(report) > 0, 
184
 
            "Expected at least one worker to respond")
185
 
        # Get the worker names.
186
 
        check_workers = self.__inspection.ping().keys()
187
 
        # Sort so both are ordered by worker names.
188
 
        check_workers.sort()
189
 
        report.sort()
190
 
        while len(report) > 0:
191
 
            worker = report.pop()
192
 
            worker_name = worker.keys()[0]
193
 
            check_worker = check_workers.pop()
194
 
            self.assertEquals(check_worker, worker_name,
195
 
                              "Unexpected worker name")
196
 
            worker_status = worker[worker_name]
197
 
            self.assertTrue(worker_status.has_key("status"), 
198
 
                "Missing status entry")
199
 
            self.assertEquals(status, worker_status["status"], 
200
 
                "Unexpected status value")
201
 
            if (status == "error"): 
202
 
                self.assertTrue(worker_status.has_key("error"), 
203
 
                    "Missing error entry")
204
 
                errors = worker_status["error"]
205
 
                self.assertTrue(len(errors) > 0, 
206
 
                    "Expected a list of error information")
207
 
 
208
 
    def test_birth(self):
209
 
        """
210
 
        Test birth broadcast command. 
211
 
 
212
 
        Note we also test mausworker.get_maus_configuration here.
213
 
 
214
 
        @param self Object reference.
215
 
        """
216
 
        config_id = datetime.now().microsecond
217
 
        transform = "MapPyPrint"
218
 
        configuration = json.loads(self.config_doc)
219
 
        configuration["TOFconversionFactor"] = 1
220
 
        configuration = json.dumps(configuration)
221
 
        result = self.birth(config_id, configuration, transform)
222
 
        print "birth(OK): %s " % result
223
 
        # Check the status and that the configuration has been 
224
 
        # updated.  
225
 
        self.validate_status(result)
226
 
        self.validate_configuration(configuration, transform, config_id)
227
 
        # Check that another birth with the same ID is a no-op. Use
228
 
        # different transform name and configuration to be sure.
229
 
        result = self.birth(config_id, configuration, transform)
230
 
        print "birth(OK): %s " % result
231
 
        # Check the status and configuration are OK.
232
 
        self.validate_status(result)
233
 
        self.validate_configuration(configuration, transform, config_id)
234
 
 
235
 
    def test_birth_bad_config_json(self):
236
 
        """
237
 
        Test birth broadcast command with an invalid JSON
238
 
        configuration document. 
239
 
        @param self Object reference.
240
 
        """
241
 
        config_id = datetime.now().microsecond
242
 
        transform = "MapPyPrint"
243
 
        configuration = """{"TOFconversionFactor":BADJSON"""
244
 
        result = self.birth(config_id, configuration, transform)
245
 
        print "birth(bad JSON document): %s " % result
246
 
        self.validate_status(result, "error")
247
 
 
248
 
    def test_birth_bad_version(self):
249
 
        """
250
 
        Test birth broadcast command with a mismatched MAUS version. 
251
 
        @param self Object reference.
252
 
        """
253
 
        config_id = datetime.now().microsecond
254
 
        transform = "MapPyPrint"
255
 
        configuration = """{"maus_version":"BAD"}"""
256
 
        result = self.birth(config_id, configuration, transform)
257
 
        print "birth(bad version): %s " % result
258
 
        self.validate_status(result, "error")
259
 
 
260
 
    def test_birth_bad_transform(self):
261
 
        """
262
 
        Test birth broadcast command with an invalid transform name.
263
 
        @param self Object reference.
264
 
        """
265
 
        config_id = datetime.now().microsecond
266
 
        transform = "MapPyUnknown"
267
 
        configuration = """{"maus_version":"%s"}""" % self.__version
268
 
        result = self.birth(config_id, configuration, transform)
269
 
        print "birth(bad transform name): %s " % result
270
 
        self.validate_status(result, "error")
271
 
 
272
 
    def test_birth_error(self):
273
 
        """
274
 
        Test birth broadcast command where the transform returns
275
 
        False when birth is called.
276
 
        @param self Object reference.
277
 
        """
278
 
        # Set up a transform that will fail when it is birthed.
279
 
        config_id = datetime.now().microsecond
280
 
        transform = "MapPyTestMap"
281
 
        configuration = """{"birth_result":%s, "maus_version":"%s"}""" \
282
 
            % (MapPyTestMap.EXCEPTION, self.__version)
283
 
        result = self.birth(config_id, configuration, transform)
284
 
        print "birth(transform.birth exception): %s " % result
285
 
        # Check the status specifies an error.
286
 
        self.validate_status(result, "error")
287
 
 
288
 
    def test_death_before_birth_error(self):
289
 
        """
290
 
        Test death broadcast command where the transform returns
291
 
        False when death is called prior to a birth command.
292
 
        @param self Object reference.
293
 
        """
294
 
        # Set up a transform that will fail when it is deathed.
295
 
        config_id = datetime.now().microsecond
296
 
        transform = "MapPyTestMap"
297
 
        configuration = """{"death_result":%s, "maus_version":"%s"}""" \
298
 
            % (MapPyTestMap.EXCEPTION, self.__version)
299
 
        result = self.birth(config_id, configuration, transform)
300
 
        # Check the status is OK.
301
 
        self.validate_status(result)
302
 
        # Now create a new transform thereby causing the death failure
303
 
        # of the current one.
304
 
        config_id = datetime.now().microsecond
305
 
        transform = "MapPyDoNothing"
306
 
        configuration = """{"maus_version":"%s"}""" % self.__version
307
 
        result = self.birth(config_id, configuration, transform)
308
 
        print "birth(transform.death exception): %s " % result
309
 
        # Check the status specifies an error.
310
 
        self.validate_status(result, "error")
311
 
        # Try again with the same configuration ID. Expect
312
 
        # success this time.
313
 
        result = self.birth(config_id, configuration, transform)
314
 
        # Check the status and that the configuration has been
315
 
        # updated. 
316
 
        self.validate_status(result)
317
 
        self.validate_configuration(configuration, transform, config_id)
318
 
 
319
 
    def test_death(self):
320
 
        """
321
 
        Test death broadcast command.
322
 
        @param self Object reference.
323
 
        """
324
 
        result = broadcast("death", arguments={"run_number":1}, reply=True)
325
 
        result.sort()
326
 
        print "death: %s " % result
327
 
        self.validate_status(result)
328
 
        # Expect subsequent attempt to succeed.
329
 
        result = broadcast("death", arguments={"run_number":1}, reply=True)
330
 
        self.validate_status(result)
331
 
 
332
 
    def test_death_exception(self):
333
 
        """
334
 
        Test death broadcast command where the transform throws
335
 
        an exception when death is called.
336
 
        @param self Object reference.
337
 
        """
338
 
        # Set up a transform that will fail when it is deathed.
339
 
        config_id = datetime.now().microsecond
340
 
        transform = "MapPyTestMap"
341
 
        configuration = """{"death_result":%s, "maus_version":"%s"}""" \
342
 
            % (MapPyTestMap.EXCEPTION, self.__version)
343
 
        result = self.birth(config_id, configuration, transform)
344
 
        # Check the status is OK.
345
 
        self.validate_status(result)
346
 
        # Now death the transform.
347
 
        result = broadcast("death", arguments={"run_number":1}, reply=True)
348
 
        print "death(transform.death exception): %s " % result
349
 
        self.validate_status(result, "error")
350
 
        # Expect subsequent attempt to succeed.
351
 
        result = broadcast("death", arguments={"run_number":1}, reply=True)
352
 
        self.validate_status(result)
353
 
 
354
 
    def test_process(self):
355
 
        """
356
 
        Test process command.
357
 
        @param self Object reference.
358
 
        """
359
 
        config_id = datetime.now().microsecond
360
 
        transform = "MapPyTestMap"
361
 
        configuration = """{"maus_version":"%s"}""" % self.__version
362
 
        result = self.birth(config_id, configuration, transform)
363
 
        self.validate_status(result)
364
 
        # Call process.
365
 
        result = execute_transform.delay("{}", 1)
366
 
        # Wait for it to complete.
367
 
        result.wait()
368
 
        self.assertTrue(result.successful(), "Expected success")
369
 
        spill = json.loads(result.result)
370
 
        self.assertTrue(spill.has_key("processed"), 
371
 
            "Spill does not seem to have been updated")
372
 
 
373
 
    def test_process_after_death(self):
374
 
        """
375
 
        Test process command fails after death has been called.
376
 
        @param self Object reference.
377
 
        """
378
 
        config_id = datetime.now().microsecond
379
 
        transform = "MapPyTestMap"
380
 
        configuration = """{"maus_version":"%s"}""" % self.__version
381
 
        result = self.birth(config_id, configuration, transform)
382
 
        self.validate_status(result)
383
 
        result = broadcast("death", arguments={"run_number":1}, reply=True)
384
 
        self.validate_status(result)
385
 
        # Call process.
386
 
        result = execute_transform.delay("{}", 1)
387
 
        # Wait for it to complete.
388
 
        try:
389
 
            result.wait()
390
 
        except Exception:  # pylint:disable = W0703
391
 
            pass
392
 
        self.assertTrue(result.failed(), "Expected failure")
393
 
 
394
 
    def test_process_timeout(self):
395
 
        """
396
 
        Test process timeout.
397
 
        @param self Object reference.
398
 
        """
399
 
        config_id = datetime.now().microsecond
400
 
        transform = "MapPyTestMap"
401
 
        # check that process does not time out when execution is quick
402
 
        configuration = """{"maus_version":"%s", "process_delay":9.0}""" \
403
 
                                                                % self.__version
404
 
        result = self.birth(config_id, configuration, transform, True)
405
 
        self.validate_status(result)
406
 
        # Call process.
407
 
        result = execute_transform.delay("{}", 1)
408
 
        # Wait for it to complete.
409
 
        print "Executing delayed transform"
410
 
        result.wait()
411
 
 
412
 
        # check that process does time out when execution is slow
413
 
        configuration = """{"maus_version":"%s", "process_delay":15.0}""" \
414
 
                                                                % self.__version
415
 
        result = self.birth(config_id, configuration, transform, True)
416
 
        self.validate_status(result)
417
 
        # Call process. Check that time_limit flag is ignored
418
 
        # (true for celery version < 3); if we move to a new version of celery,
419
 
        # this should fail; and we should change the timeout to be soft coded
420
 
        result = execute_transform.apply_async(["{}", 1], time_limit=20.)
421
 
        # Wait for it to complete.
422
 
        try:
423
 
            print "Executing delayed transform"
424
 
            result.wait()
425
 
            self.assertTrue(False, 'Should have failed')
426
 
        except celery.exceptions.TimeLimitExceeded:  # pylint:disable = W0703
427
 
            pass
428
 
        # should have failed
429
 
        self.assertEqual(result.state, 'FAILURE')
430
 
 
431
 
if __name__ == '__main__':
432
 
    unittest.main()