~coreygoldberg/uci-engine/subunit-results

« back to all changes in this revision

Viewing changes to subunit-results/subunitresults/queue_worker.py

  • Committer: Corey Goldberg
  • Date: 2014-08-18 15:30:49 UTC
  • Revision ID: corey.goldberg@canonical.com-20140818153049-xi6m3e8x52x7onn3
queue qorker

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
 
18
18
import logging
19
19
import json
 
20
from queue import Queue
 
21
 
 
22
from subunitresults.ci_utils import amqp_utils, amqp_worker
20
23
 
21
24
from subunitresults.utils import retry
22
 
from subunitresults.ci_utils import amqp_utils, amqp_worker
 
25
from subunitresults.subunit_parser import run_result
23
26
 
24
27
 
25
28
logging.basicConfig(level='INFO')
31
34
 
32
35
 
33
36
def get_subunit_path(json_blob):
34
 
    d = json.loads(json_blob)  # XXX what should we do here if json is invalid?
 
37
    d = json.loads(json_blob)
35
38
    try:
36
39
        subunit_path = d['subunit_path']
37
40
    except KeyError:
38
 
        # XXX what should we do here in case of bad message?
39
41
        raise Exception("subunit_path not found in json message")
40
42
    return subunit_path
41
43
 
46
48
    return contents
47
49
 
48
50
 
49
 
def put_attachment(data_store, file_name, contents, content_type):
50
 
    file_path = data_store.put_file(file_name, contents, content_type)
51
 
    return file_path
52
 
 
53
 
 
54
51
class Worker:
55
52
    def __init__(self, data_store, queue):
56
53
        self.queue = queue
61
58
            self.do_one_job()
62
59
 
63
60
    def do_one_job(self):
64
 
        pass
65
 
        # json_blob = get_item(self.queue)
66
 
        # path = get_subunit_path(json_blob)
67
 
        # subunit_file = fetch_subunit(data_store, self.path)
68
 
        # parse subunit here
69
 
        # send all attachments to Swift
 
61
        json_blob = get_item(self.queue)
 
62
        path = get_subunit_path(json_blob)
 
63
        subunit_file = fetch_subunit(data_store, self.path)
 
64
        run_result(data_store, subunit_stream)
70
65
 
71
66
 
72
67
class RabbitWorker(amqp_worker.AMQPWorker):
79
74
    def handle_request(self, params, logger):
80
75
        worker = Worker(self.data_store, self.queue)
81
76
        try:
82
 
            worker.do_one_job()
 
77
            worker.run()
83
78
            return (amqp_utils.progress_completed, {})
84
79
        except Exception:
85
80
            return (amqp_utils.progress_failed, {})
 
81
 
 
82
 
 
83
def launch_amqp_listener(logger_name, data_store):
 
84
    q = Queue()
 
85
    worker = queue_worker.Worker(q)