~jcsackett/charmworld/bac-tag-constraints

« back to all changes in this revision

Viewing changes to charmworld/jobs/worker.py

  • Committer: Benji York
  • Date: 2013-11-18 20:38:47 UTC
  • mto: This revision was merged to the branch mainline in revision 465.
  • Revision ID: benji@benjiyork.com-20131118203847-2mfs1w7b8aqy64mr
checkpoint

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
    UpdateBundleJob,
16
16
    UpdateCharmJob,
17
17
)
18
 
from charmworld.jobs import lp
19
18
from charmworld.jobs.utils import get_queue
20
19
from charmworld.models import getconnection
21
20
from charmworld.models import getdb
33
32
class QueueWorker(object):
34
33
    """Processes raw charm data"""
35
34
 
36
 
    def __init__(self, db, queue_jobs, ingest_jobs, interval=INTERVAL):
 
35
    def __init__(self, db, queue_jobs, interval=INTERVAL):
37
36
        self.db = db
38
 
        self.queue_jobs = queue_jobs or []
39
 
        self.ingest_jobs = ingest_jobs or []
 
37
        self.queue_jobs = queue_jobs
40
38
        self.log = logging.getLogger('charm.worker')
41
39
        self.interval = interval
42
40
 
43
 
    @staticmethod
44
 
    def wait_seconds(interval, start, now):
45
 
        remaining = interval - (now - start)
46
 
        return int(max(0, remaining))
47
 
 
48
 
    def run(self, args, now=None):
49
 
        if args.debug:
50
 
            self.log.setLevel(logging.DEBUG)
51
 
        self.log.info(
52
 
            'Starting processing with interval {} seconds.'.format(
53
 
                self.interval))
54
 
        self.start_time = now if now is not None else time.time()
 
41
    def run(self, run_forever=False):
 
42
        self.log.info('Starting processing')
55
43
        return_code = 0
56
 
        for _, jobs in self.ingest_jobs:
 
44
        for jobs in self.queue_jobs.values():
57
45
            for job in jobs:
58
46
                job.setup(db=self.db)
59
47
        while True:
60
 
            for job in self.queue_jobs:
61
 
                job(args)
62
 
            # The clear argument is only used on the first queueing run.
63
 
            # Unset it so it subequent passes will not clear the queue.
64
 
            args.clear = False
65
 
            for queue, jobs in self.ingest_jobs:
 
48
            for queue, jobs in self.queue_jobs.items():
66
49
                while True:
67
50
                    item = queue.next()
68
51
                    if item is None:
69
52
                        break
 
53
                    charm_data = item.payload
70
54
                    try:
71
55
                        for job in jobs:
72
56
                            try:
73
57
                                if not run_job(
74
 
                                        job, item.payload, needs_setup=False):
 
58
                                        job, charm_data, needs_setup=False):
75
59
                                    break
76
60
                            except SearchServiceNotAvailable as e:
77
61
                                self.log.exception(str(e))
80
64
                    finally:
81
65
                        item.complete()
82
66
 
83
 
            if not args.run_forever:
 
67
            if not run_forever:
84
68
                self.log.info('Empty queue and not running forever; quitting.')
85
69
                break
86
70
 
87
 
            wait_seconds = self.wait_seconds(
88
 
                self.interval, self.start_time, time.time())
 
71
            wait_time_in_minutes = self.interval / 60
89
72
            self.log.info(
90
 
                'Empty queue.  Waiting %s seconds to recheck.' %
91
 
                wait_seconds)
92
 
            time.sleep(wait_seconds)
 
73
                'Empty queue.  Waiting %s minutes to recheck.' %
 
74
                wait_time_in_minutes)
 
75
            time.sleep(self.interval)
93
76
        return return_code
94
77
 
95
78
 
105
88
        help='Keep running after the queues are processed and process more '
106
89
        'later.',
107
90
        action='store_true', dest='run_forever')
108
 
    lp.configure_parser(parser)
109
91
    # Explicitly pass sys.argv to make monkeypatching more reliable.
110
92
    args = parser.parse_args(argv[1:])
111
 
    queue_jobs = (lp.run, )
112
 
    ingest_jobs = (
113
 
        (get_queue(db, CHARM_QUEUE), [UpdateCharmJob(args.debug)]),
114
 
        (get_queue(db, BASKET_QUEUE), [UpdateBundleJob(args.debug)]),
115
 
    )
116
 
    worker = QueueWorker(db, queue_jobs, ingest_jobs)
117
 
    sys.exit(worker.run(args))
 
93
    queue_jobs = {
 
94
        get_queue(db, CHARM_QUEUE): [UpdateCharmJob()],
 
95
        get_queue(db, BASKET_QUEUE): [UpdateBundleJob()],
 
96
    }
 
97
    worker = QueueWorker(db, queue_jobs)
 
98
    sys.exit(worker.run(run_forever=args.run_forever))