~mpontillo/maas/dns-template-changes-1.7

« back to all changes in this revision

Viewing changes to src/maasserver/rabbit.py

  • Committer: MaaS Lander
  • Author(s): Gavin Panella
  • Date: 2014-09-29 16:01:05 UTC
  • mfrom: (3074.1.13 remove-rabbit-entirely)
  • Revision ID: maas_lander-20140929160105-gtpp7qn1avn32pis
[r=jtv][bug=][author=allenap] Remove RabbitMQ entirely.

Celery is still in there but configured to run tasks eagerly, which means that nothing gets queued, and thus there's no need for RabbitMQ.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright 2012 Canonical Ltd.  This software is licensed under the
2
 
# GNU Affero General Public License version 3 (see the file LICENSE).
3
 
 
4
 
"""Rabbit messaging."""
5
 
 
6
 
from __future__ import (
7
 
    absolute_import,
8
 
    print_function,
9
 
    unicode_literals,
10
 
    )
11
 
 
12
 
str = None
13
 
 
14
 
__metaclass__ = type
15
 
__all__ = [
16
 
    "RabbitExchange",
17
 
    "RabbitQueue",
18
 
    "RabbitMessaging",
19
 
    "RabbitSession",
20
 
    ]
21
 
 
22
 
 
23
 
from errno import ECONNREFUSED
24
 
import socket
25
 
import threading
26
 
 
27
 
from amqplib import client_0_8 as amqp
28
 
from django.conf import settings
29
 
from maasserver.exceptions import NoRabbit
30
 
 
31
 
 
32
 
def connect():
33
 
    """Connect to AMQP."""
34
 
    try:
35
 
        return amqp.Connection(
36
 
            host=settings.RABBITMQ_HOST,
37
 
            userid=settings.RABBITMQ_USERID,
38
 
            password=settings.RABBITMQ_PASSWORD,
39
 
            virtual_host=settings.RABBITMQ_VIRTUAL_HOST,
40
 
            insist=False)
41
 
    except socket.error as e:
42
 
        if e.errno == ECONNREFUSED:
43
 
            raise NoRabbit(e.message)
44
 
        else:
45
 
            raise
46
 
 
47
 
 
48
 
class RabbitSession(threading.local):
49
 
 
50
 
    def __init__(self):
51
 
        self._connection = None
52
 
 
53
 
    @property
54
 
    def connection(self):
55
 
        if self._connection is None or self._connection.transport is None:
56
 
            self._connection = connect()
57
 
        return self._connection
58
 
 
59
 
    def disconnect(self):
60
 
        if self._connection is not None:
61
 
            try:
62
 
                self._connection.close()
63
 
            finally:
64
 
                self._connection = None
65
 
 
66
 
 
67
 
class RabbitMessaging:
68
 
 
69
 
    def __init__(self, exchange_name):
70
 
        self.exchange_name = exchange_name
71
 
        self._session = RabbitSession()
72
 
 
73
 
    def getExchange(self):
74
 
        return RabbitExchange(self._session, self.exchange_name)
75
 
 
76
 
    def getQueue(self):
77
 
        return RabbitQueue(self._session, self.exchange_name)
78
 
 
79
 
 
80
 
class RabbitBase(threading.local):
81
 
 
82
 
    def __init__(self, session, exchange_name):
83
 
        self.exchange_name = exchange_name
84
 
        self._session = session
85
 
        self._channel = None
86
 
 
87
 
    @property
88
 
    def channel(self):
89
 
        if self._channel is None or not self._channel.is_open:
90
 
            self._channel = self._session.connection.channel()
91
 
            self._channel.exchange_declare(
92
 
                self.exchange_name, type='fanout')
93
 
        return self._channel
94
 
 
95
 
 
96
 
class RabbitExchange(RabbitBase):
97
 
 
98
 
    def publish(self, message):
99
 
        msg = amqp.Message(message)
100
 
        # Publish to a 'fanout' exchange: routing_key is ''.
101
 
        self.channel.basic_publish(
102
 
            exchange=self.exchange_name, routing_key='', msg=msg)
103
 
 
104
 
 
105
 
class RabbitQueue(RabbitBase):
106
 
 
107
 
    def __init__(self, session, exchange_name):
108
 
        super(RabbitQueue, self).__init__(session, exchange_name)
109
 
        self.queue_name = self.channel.queue_declare(
110
 
            nowait=False, auto_delete=False,
111
 
            arguments={"x-expires": 300000})[0]
112
 
        self.channel.queue_bind(
113
 
            exchange=self.exchange_name, queue=self.queue_name)
114
 
 
115
 
    @property
116
 
    def name(self):
117
 
        return self.queue_name