~canonical-ci-engineering/adt-request-proxy/trunk

« back to all changes in this revision

Viewing changes to adt_request_proxy/queue.py

  • Committer: Ubuntu CI Bot
  • Author(s): Thomi Richards
  • Date: 2015-03-11 02:14:35 UTC
  • mfrom: (3.1.2 trunk-additional-testing)
  • Revision ID: ubuntu_ci_bot-20150311021435-ilp3r0rwjkbkjrbo
Add some more tests for the adt-request-proxy service. [r=Celso Providelo]

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
    """A class that can send things to the rabbit queue. An instance of this
27
27
    is stored in 'flask.g'.
28
28
 
29
 
    TODO: This code can almost certainly be improved - for example, maybe
30
 
        don't create a new connection every time we queue something?
31
 
 
32
29
    """
33
30
 
34
31
    def __init__(self, config):
35
 
        self.amqp_uris = config.get('amqp', 'uris').split()
 
32
        self.amqp_uris = config['amqp']['uris'].split()
36
33
        self.adt_exchange = kombu.Exchange("adt.exchange", type="topic")
 
34
        self.connection = kombu.Connection(self.amqp_uris)
37
35
 
38
36
    def queue_test(self, payload):
39
37
        """Take 'payload' and enqueue it on the rabbit queue.
47
45
        On error, an exception will be raised.
48
46
 
49
47
        """
50
 
        routing_key = payload['arch'] + '.' + payload['platform']
 
48
        payload_copy = payload.copy()
 
49
        routing_key = payload_copy['arch'] + '.' + payload_copy['platform']
51
50
        queue = kombu.Queue(
52
51
            'adt.requests.{}'.format(routing_key),
53
52
            self.adt_exchange, routing_key=routing_key)
56
55
        # available works the request is preserved.
57
56
        declare = [self.adt_exchange, queue]
58
57
 
59
 
        connection = kombu.Connection(self.amqp_uris)
60
 
        payload['request_id'] = uuid()
 
58
        payload_copy['request_id'] = uuid()
61
59
 
62
 
        with producers[connection].acquire(block=True) as producer:
 
60
        with producers[self.connection].acquire(block=True) as producer:
63
61
 
64
62
            def errback(exc, interval):
65
63
                # TODO: This should raise an exception if something goes wrong.
69
67
                logger.error('Error: %r', exc, exc_info=1)
70
68
                logger.info('Retry in %s seconds.', interval)
71
69
 
72
 
            publish = connection.ensure(
 
70
            publish = self.connection.ensure(
73
71
                producer, producer.publish,
74
72
                errback=errback, max_retries=3)
75
73
            publish(
76
 
                payload,
 
74
                payload_copy,
77
75
                exchange=self.adt_exchange,
78
76
                routing_key=routing_key,
79
77
                declare=declare)
80
 
        return payload['request_id']
 
78
        return payload_copy['request_id']