476
479
def test_no_tasks(self):
477
480
"""When no jobs are listed, the queue is shown as empty."""
478
481
self.assertEqual([], list_queued(RunFileJob.app, [self.queue]))
484
class TestInspectQueues(TestCase):
485
"""Tests for the script inspect-queues."""
487
def queueName(self, task_id):
488
return task_id.replace('-', '')
490
def runInspectQueues(self, celery_config, task_ids):
491
"""Invoke inspect_queues() and catch the data written to stdout
494
queues = [self.queueName(task_id) for task_id in task_ids]
495
real_stdout = sys.stdout
496
real_stderr = sys.stderr
498
sys.stdout = StringIO()
499
sys.stderr = StringIO()
500
args = ['program', '-c', celery_config] + queues
502
fake_stdout = sys.stdout.getvalue()
503
fake_stderr = sys.stderr.getvalue()
505
sys.stdout = real_stdout
506
sys.stderr = real_stderr
507
return fake_stdout, fake_stderr
509
def invokeJob(self, celery_config, task, delay=1, job_args={}):
510
"""Run the given task.
512
:return: The name of the result queue.
514
with tempdir() as temp_dir:
515
js = FileJobSource(temp_dir)
516
job = FileJob(js, 11, **job_args)
518
task_info = task.apply_async(args=(11, ))
519
with celeryd(celery_config, temp_dir):
520
# Wait just long enough so that celeryd can start and
523
return task_info.task_id
525
def successMessage(self, task_id):
527
"%s: {'status': 'SUCCESS', 'traceback': None, 'result': None, "
528
"'task_id': '%s'}\n" % (self.queueName(task_id), task_id))
530
def noQueueMessage(self, task_id):
532
"NOT_FOUND - no queue '%s' in vhost '/'\n"
533
% self.queueName(task_id))
535
def test_inspect_queues__result_not_consumed(self):
536
"""When a Celery task is started so that a result is returned
537
but the result is not consumed, the related message can be
538
retrieved with inspect_queues().
540
celery_config = 'lazr.jobrunner.tests.config1'
541
task_id = self.invokeJob(celery_config, RunFileJob)
542
stdout, stderr = self.runInspectQueues(celery_config, [task_id])
543
self.assertEqual(self.successMessage(task_id), stdout)
544
self.assertEqual('', stderr)
546
# Reading a queue is destructive. An attempt to read again from
547
# a queue results in an error.
548
stdout, stderr = self.runInspectQueues(celery_config, [task_id])
549
self.assertEqual('', stdout)
550
self.assertEqual(self.noQueueMessage(task_id), stderr)
552
def test_inspect_queues__two_queues(self):
553
"""More than one queue can be inspected in one call of
556
celery_config = 'lazr.jobrunner.tests.config1'
557
task_id_1 = self.invokeJob(celery_config, RunFileJob)
558
task_id_2 = self.invokeJob(celery_config, RunFileJob)
559
stdout, stderr = self.runInspectQueues(
560
celery_config, [task_id_1, task_id_2])
562
self.successMessage(task_id_1) + self.successMessage(task_id_2))
563
self.assertEqual(expected_stdout, stdout)
564
self.assertEqual('', stderr)
566
def test_inspect_queues__task_without_result(self):
567
"""A Celery task which was started so that no result is returned
568
does not write to a task queue.
570
celery_config = 'lazr.jobrunner.tests.config1'
571
task_id = self.invokeJob(celery_config, RunFileJobNoResult)
572
stdout, stderr = self.runInspectQueues(celery_config, [task_id])
573
self.assertEqual('', stdout)
574
self.assertEqual(self.noQueueMessage(task_id), stderr)