30
30
class QueueWorker(object):
31
31
"""Processes raw charm data"""
33
def __init__(self, ingest_jobs, queue, interval=INTERVAL):
34
self.ingest_jobs = ingest_jobs
33
def __init__(self, queue_jobs, interval=INTERVAL):
34
self.queue_jobs = queue_jobs
36
35
self.log = logging.getLogger('charm.worker')
37
36
self.interval = interval
39
38
def run(self, quit_on_empty=False):
40
39
self.log.info('Starting processing')
42
for job in self.ingest_jobs:
41
for jobs in self.queue_jobs.values():
45
item = self.queue.next()
49
'Empty queue with quit_on_empty set. Quitting.')
52
wait_time_in_minutes = self.interval / 60
54
'Empty queue. Waiting %s minutes to recheck.' %
56
time.sleep(self.interval)
58
charm_data = item.payload
60
for job in self.ingest_jobs:
45
for queue, jobs in self.queue_jobs.items():
50
charm_data = item.payload
62
if not run_job(job, charm_data, needs_setup=False):
64
except SearchServiceNotAvailable as e:
65
self.log.exception(str(e))
55
job, charm_data, needs_setup=False):
57
except SearchServiceNotAvailable as e:
58
self.log.exception(str(e))
64
if quit_on_empty: # XXX do we want to call this "once_only"?
66
'Empty queue with quit_on_empty set. Quitting.')
69
wait_time_in_minutes = self.interval / 60
71
'Empty queue. Waiting %s minutes to recheck.' %
73
time.sleep(self.interval)
81
85
# Explicitly pass sys.argv to make monkeypatching more reliable.
82
86
args = parser.parse_args(sys.argv[1:])
83
87
queue = get_queue(CHARM_QUEUE)
84
jobs = [job() for job in DEFAULT_JOBS]
85
worker = QueueWorker(jobs, queue)
89
get_queue(CHARM_QUEUE): [job() for job in DEFAULT_JOBS],
91
worker = QueueWorker(queue_jobs)
86
92
sys.exit(worker.run(quit_on_empty=args.quit_on_empty))