~cjwatson/lazr.jobrunner/py3

« back to all changes in this revision

Viewing changes to src/lazr/jobrunner/tests/test_celerytask.py

  • Committer: Abel Deuring
  • Date: 2012-07-03 15:09:52 UTC
  • mfrom: (38.1.1 bug-1015667)
  • Revision ID: abel.deuring@canonical.com-20120703150952-ixjltl4520tigj9j
new script clear-queues

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
 
20
20
import contextlib
 
21
from cStringIO import StringIO
21
22
import errno
22
23
import json
23
24
import os
28
29
    )
29
30
import shutil
30
31
import subprocess
 
32
import sys
31
33
import tempfile
32
34
from time import sleep
33
35
from unittest import TestCase
37
39
 
38
40
from celery.exceptions import SoftTimeLimitExceeded
39
41
 
 
42
from lazr.jobrunner.bin.inspect_queues import inspect_queues
40
43
from lazr.jobrunner.celerytask import (
41
44
    drain_queues,
42
45
    list_queued,
476
479
    def test_no_tasks(self):
477
480
        """When no jobs are listed, the queue is shown as empty."""
478
481
        self.assertEqual([], list_queued(RunFileJob.app, [self.queue]))
 
482
 
 
483
 
 
484
class TestInspectQueues(TestCase):
 
485
    """Tests for the script inspect-queues."""
 
486
 
 
487
    def queueName(self, task_id):
 
488
        return task_id.replace('-', '')
 
489
 
 
490
    def runInspectQueues(self, celery_config, task_ids):
 
491
        """Invoke inspect_queues() and catch the data written to stdout
 
492
        and stderr.
 
493
        """
 
494
        queues = [self.queueName(task_id) for task_id in task_ids]
 
495
        real_stdout = sys.stdout
 
496
        real_stderr = sys.stderr
 
497
        try:
 
498
            sys.stdout = StringIO()
 
499
            sys.stderr = StringIO()
 
500
            args = ['program', '-c', celery_config] + queues
 
501
            inspect_queues(args)
 
502
            fake_stdout = sys.stdout.getvalue()
 
503
            fake_stderr = sys.stderr.getvalue()
 
504
        finally:
 
505
            sys.stdout = real_stdout
 
506
            sys.stderr = real_stderr
 
507
        return fake_stdout, fake_stderr
 
508
 
 
509
    def invokeJob(self, celery_config, task, delay=1, job_args={}):
 
510
        """Run the given task.
 
511
 
 
512
        :return: The name of the result queue.
 
513
        """
 
514
        with tempdir() as temp_dir:
 
515
            js = FileJobSource(temp_dir)
 
516
            job = FileJob(js, 11, **job_args)
 
517
            job.save()
 
518
            task_info = task.apply_async(args=(11, ))
 
519
            with celeryd(celery_config, temp_dir):
 
520
                # Wait just long enough so that celeryd can start and
 
521
                # process the job.
 
522
                sleep(delay)
 
523
            return task_info.task_id
 
524
 
 
525
    def successMessage(self, task_id):
 
526
        return (
 
527
            "%s: {'status': 'SUCCESS', 'traceback': None, 'result': None, "
 
528
            "'task_id': '%s'}\n" % (self.queueName(task_id), task_id))
 
529
 
 
530
    def noQueueMessage(self, task_id):
 
531
        return (
 
532
            "NOT_FOUND - no queue '%s' in vhost '/'\n"
 
533
            % self.queueName(task_id))
 
534
 
 
535
    def test_inspect_queues__result_not_consumed(self):
 
536
        """When a Celery task is started so that a result is returned
 
537
        but the result is not consumed, the related message can be
 
538
        retrieved with inspect_queues().
 
539
        """
 
540
        celery_config = 'lazr.jobrunner.tests.config1'
 
541
        task_id = self.invokeJob(celery_config, RunFileJob)
 
542
        stdout, stderr = self.runInspectQueues(celery_config, [task_id])
 
543
        self.assertEqual(self.successMessage(task_id), stdout)
 
544
        self.assertEqual('', stderr)
 
545
 
 
546
        # Reading a queue is destructive. An attempt to read again from
 
547
        # a queue results in an error.
 
548
        stdout, stderr = self.runInspectQueues(celery_config, [task_id])
 
549
        self.assertEqual('', stdout)
 
550
        self.assertEqual(self.noQueueMessage(task_id), stderr)
 
551
 
 
552
    def test_inspect_queues__two_queues(self):
 
553
        """More than one queue can be inspected in one call of
 
554
        inspect_queue().
 
555
        """
 
556
        celery_config = 'lazr.jobrunner.tests.config1'
 
557
        task_id_1 = self.invokeJob(celery_config, RunFileJob)
 
558
        task_id_2 = self.invokeJob(celery_config, RunFileJob)
 
559
        stdout, stderr = self.runInspectQueues(
 
560
            celery_config, [task_id_1, task_id_2])
 
561
        expected_stdout = (
 
562
            self.successMessage(task_id_1) + self.successMessage(task_id_2))
 
563
        self.assertEqual(expected_stdout, stdout)
 
564
        self.assertEqual('', stderr)
 
565
 
 
566
    def test_inspect_queues__task_without_result(self):
 
567
        """A Celery task which was started so that no result is returned
 
568
        does not write to a task queue.
 
569
        """
 
570
        celery_config = 'lazr.jobrunner.tests.config1'
 
571
        task_id = self.invokeJob(celery_config, RunFileJobNoResult)
 
572
        stdout, stderr = self.runInspectQueues(celery_config, [task_id])
 
573
        self.assertEqual('', stdout)
 
574
        self.assertEqual(self.noQueueMessage(task_id), stderr)