1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
# Copyright 2013 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
import argparse
import logging
import sys
import time
from charmworld.utils import (
configure_logging,
get_ini,
)
from charmworld.jobs.config import CHARM_QUEUE
from charmworld.jobs.ingest import (
run_job,
UpdateCharmJob,
)
from charmworld.jobs.utils import get_queue
from charmworld.search import SearchServiceNotAvailable
DEFAULT_JOBS = (
UpdateCharmJob,
)
settings = get_ini()
INTERVAL = int(settings.get('worker_interval'))
class QueueWorker(object):
"""Processes raw charm data"""
def __init__(self, queue_jobs, interval=INTERVAL):
self.queue_jobs = queue_jobs
self.log = logging.getLogger('charm.worker')
self.interval = interval
def run(self, quit_on_empty=False):
self.log.info('Starting processing')
return_code = 0
for jobs in self.queue_jobs.values():
for job in jobs:
job.setup()
while True:
for queue, jobs in self.queue_jobs.items():
while True:
item = queue.next()
if item is None:
break
charm_data = item.payload
try:
for job in jobs:
try:
if not run_job(
job, charm_data, needs_setup=False):
break
except SearchServiceNotAvailable as e:
self.log.exception(str(e))
queue.clear()
return_code = 1
finally:
item.complete()
if quit_on_empty: # XXX do we want to call this "once_only"?
self.log.info(
'Empty queue with quit_on_empty set. Quitting.')
break
else:
wait_time_in_minutes = self.interval / 60
self.log.info(
'Empty queue. Waiting %s minutes to recheck.' %
wait_time_in_minutes)
time.sleep(self.interval)
continue
return return_code
def main():
configure_logging()
parser = argparse.ArgumentParser()
parser.add_argument(
"--quit-on-empty",
help="Shut down the worker if the queue is empty",
action="store_true")
# Explicitly pass sys.argv to make monkeypatching more reliable.
args = parser.parse_args(sys.argv[1:])
queue = get_queue(CHARM_QUEUE)
queue_jobs = {
get_queue(CHARM_QUEUE): [job() for job in DEFAULT_JOBS],
}
worker = QueueWorker(queue_jobs)
sys.exit(worker.run(quit_on_empty=args.quit_on_empty))
if __name__ == "__main__":
main()
|