~jcsackett/charmworld/bac-tag-constraints

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# Copyright 2013 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

import argparse
import logging
import sys
import time

from charmworld.utils import (
    configure_logging,
    get_ini,
)
from charmworld.jobs.config import CHARM_QUEUE
from charmworld.jobs.ingest import (
    run_job,
    UpdateCharmJob,
)
from charmworld.jobs.utils import get_queue
from charmworld.search import SearchServiceNotAvailable


DEFAULT_JOBS = (
    UpdateCharmJob,
)

settings = get_ini()
INTERVAL = int(settings.get('worker_interval'))


class QueueWorker(object):
    """Processes raw charm data"""

    def __init__(self, queue_jobs, interval=INTERVAL):
        self.queue_jobs = queue_jobs
        self.log = logging.getLogger('charm.worker')
        self.interval = interval

    def run(self, quit_on_empty=False):
        self.log.info('Starting processing')
        return_code = 0
        for jobs in self.queue_jobs.values():
            for job in jobs:
                job.setup()
        while True:
            for queue, jobs in self.queue_jobs.items():
                while True:
                    item = queue.next()
                    if item is None:
                        break
                    charm_data = item.payload
                    try:
                        for job in jobs:
                            try:
                                if not run_job(
                                        job, charm_data, needs_setup=False):
                                    break
                            except SearchServiceNotAvailable as e:
                                self.log.exception(str(e))
                                queue.clear()
                                return_code = 1

                    finally:
                        item.complete()
            if quit_on_empty: # XXX do we want to call this "once_only"?
                self.log.info(
                    'Empty queue with quit_on_empty set. Quitting.')
                break
            else:
                wait_time_in_minutes = self.interval / 60
                self.log.info(
                    'Empty queue. Waiting %s minutes to recheck.' %
                    wait_time_in_minutes)
                time.sleep(self.interval)
                continue
        return return_code


def main():
    configure_logging()
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--quit-on-empty",
        help="Shut down the worker if the queue is empty",
        action="store_true")
    # Explicitly pass sys.argv to make monkeypatching more reliable.
    args = parser.parse_args(sys.argv[1:])
    queue = get_queue(CHARM_QUEUE)
    queue_jobs = {
        get_queue(CHARM_QUEUE): [job() for job in DEFAULT_JOBS],
    }
    worker = QueueWorker(queue_jobs)
    sys.exit(worker.run(quit_on_empty=args.quit_on_empty))


if __name__ == "__main__":
    main()