33
32
class QueueWorker(object):
34
33
"""Processes raw charm data"""
36
def __init__(self, db, queue_jobs, ingest_jobs, interval=INTERVAL):
35
def __init__(self, db, queue_jobs, interval=INTERVAL):
38
self.queue_jobs = queue_jobs or []
39
self.ingest_jobs = ingest_jobs or []
37
self.queue_jobs = queue_jobs
40
38
self.log = logging.getLogger('charm.worker')
41
39
self.interval = interval
44
def wait_seconds(interval, start, now):
45
remaining = interval - (now - start)
46
return int(max(0, remaining))
48
def run(self, args, now=None):
50
self.log.setLevel(logging.DEBUG)
52
'Starting processing with interval {} seconds.'.format(
54
self.start_time = now if now is not None else time.time()
41
def run(self, run_forever=False):
42
self.log.info('Starting processing')
56
for _, jobs in self.ingest_jobs:
44
for jobs in self.queue_jobs.values():
58
46
job.setup(db=self.db)
60
for job in self.queue_jobs:
62
# The clear argument is only used on the first queueing run.
63
# Unset it so it subequent passes will not clear the queue.
65
for queue, jobs in self.ingest_jobs:
48
for queue, jobs in self.queue_jobs.items():
67
50
item = queue.next()
53
charm_data = item.payload
74
job, item.payload, needs_setup=False):
58
job, charm_data, needs_setup=False):
76
60
except SearchServiceNotAvailable as e:
77
61
self.log.exception(str(e))
83
if not args.run_forever:
84
68
self.log.info('Empty queue and not running forever; quitting.')
87
wait_seconds = self.wait_seconds(
88
self.interval, self.start_time, time.time())
71
wait_time_in_minutes = self.interval / 60
90
'Empty queue. Waiting %s seconds to recheck.' %
92
time.sleep(wait_seconds)
73
'Empty queue. Waiting %s minutes to recheck.' %
75
time.sleep(self.interval)
105
88
help='Keep running after the queues are processed and process more '
107
90
action='store_true', dest='run_forever')
108
lp.configure_parser(parser)
109
91
# Explicitly pass sys.argv to make monkeypatching more reliable.
110
92
args = parser.parse_args(argv[1:])
111
queue_jobs = (lp.run, )
113
(get_queue(db, CHARM_QUEUE), [UpdateCharmJob(args.debug)]),
114
(get_queue(db, BASKET_QUEUE), [UpdateBundleJob(args.debug)]),
116
worker = QueueWorker(db, queue_jobs, ingest_jobs)
117
sys.exit(worker.run(args))
94
get_queue(db, CHARM_QUEUE): [UpdateCharmJob()],
95
get_queue(db, BASKET_QUEUE): [UpdateBundleJob()],
97
worker = QueueWorker(db, queue_jobs)
98
sys.exit(worker.run(run_forever=args.run_forever))