~jcsackett/charmworld/bac-tag-constraints

« back to all changes in this revision

Viewing changes to charmworld/jobs/worker.py

  • Committer: Benji York
  • Date: 2013-07-19 15:48:27 UTC
  • mto: This revision was merged to the branch mainline in revision 320.
  • Revision ID: benji.york@canonical.com-20130719154827-ljfqd11uv1h754ti
make the worker able to handle multiple queues with custom workers

Show diffs side-by-side

added added

removed removed

Lines of Context:
30
30
class QueueWorker(object):
31
31
    """Processes raw charm data"""
32
32
 
33
 
    def __init__(self, ingest_jobs, queue, interval=INTERVAL):
34
 
        self.ingest_jobs = ingest_jobs
35
 
        self.queue = queue
 
33
    def __init__(self, queue_jobs, interval=INTERVAL):
 
34
        self.queue_jobs = queue_jobs
36
35
        self.log = logging.getLogger('charm.worker')
37
36
        self.interval = interval
38
37
 
39
38
    def run(self, quit_on_empty=False):
40
39
        self.log.info('Starting processing')
41
40
        return_code = 0
42
 
        for job in self.ingest_jobs:
43
 
            job.setup()
 
41
        for jobs in self.queue_jobs.values():
 
42
            for job in jobs:
 
43
                job.setup()
44
44
        while True:
45
 
            item = self.queue.next()
46
 
            if item is None:
47
 
                if quit_on_empty:
48
 
                    self.log.info(
49
 
                        'Empty queue with quit_on_empty set. Quitting.')
50
 
                    break
51
 
                else:
52
 
                    wait_time_in_minutes = self.interval / 60
53
 
                    self.log.info(
54
 
                        'Empty queue. Waiting %s minutes to recheck.' %
55
 
                        wait_time_in_minutes)
56
 
                    time.sleep(self.interval)
57
 
                    continue
58
 
            charm_data = item.payload
59
 
            try:
60
 
                for job in self.ingest_jobs:
 
45
            for queue, jobs in self.queue_jobs.items():
 
46
                while True:
 
47
                    item = queue.next()
 
48
                    if item is None:
 
49
                        break
 
50
                    charm_data = item.payload
61
51
                    try:
62
 
                        if not run_job(job, charm_data, needs_setup=False):
63
 
                            break
64
 
                    except SearchServiceNotAvailable as e:
65
 
                        self.log.exception(str(e))
66
 
                        self.queue.clear()
67
 
                        return_code = 1
 
52
                        for job in jobs:
 
53
                            try:
 
54
                                if not run_job(
 
55
                                        job, charm_data, needs_setup=False):
 
56
                                    break
 
57
                            except SearchServiceNotAvailable as e:
 
58
                                self.log.exception(str(e))
 
59
                                queue.clear()
 
60
                                return_code = 1
68
61
 
69
 
            finally:
70
 
                item.complete()
 
62
                    finally:
 
63
                        item.complete()
 
64
            if quit_on_empty: # XXX do we want to call this "once_only"?
 
65
                self.log.info(
 
66
                    'Empty queue with quit_on_empty set. Quitting.')
 
67
                break
 
68
            else:
 
69
                wait_time_in_minutes = self.interval / 60
 
70
                self.log.info(
 
71
                    'Empty queue. Waiting %s minutes to recheck.' %
 
72
                    wait_time_in_minutes)
 
73
                time.sleep(self.interval)
 
74
                continue
71
75
        return return_code
72
76
 
73
77
 
81
85
    # Explicitly pass sys.argv to make monkeypatching more reliable.
82
86
    args = parser.parse_args(sys.argv[1:])
83
87
    queue = get_queue(CHARM_QUEUE)
84
 
    jobs = [job() for job in DEFAULT_JOBS]
85
 
    worker = QueueWorker(jobs, queue)
 
88
    queue_jobs = {
 
89
        get_queue(CHARM_QUEUE): [job() for job in DEFAULT_JOBS],
 
90
    }
 
91
    worker = QueueWorker(queue_jobs)
86
92
    sys.exit(worker.run(quit_on_empty=args.quit_on_empty))
87
93
 
88
94