~lucio.torre/graphite/add-events

202.1.1 by Lucio Torre
added amqp client
1
#!/usr/bin/env python
2
"""
3
Copyright 2009 Lucio Torre <lucio.torre@canonical.com>
4
5
This is an AMQP client that will connect to the specified broker and read
6
messages, parse them, and post them as metrics.
7
250.1.5 by Chris Davis
updated module docstring
8
Each message's routing key should be a metric name.
9
The message body should be one or more lines of the form:
10
11
<value> <timestamp>\n
12
<value> <timestamp>\n
13
...
14
15
Where each <value> is a real number and <timestamp> is a UNIX epoch time.
16
17
18
This program can be started standalone for testing or using carbon-cache.py
19
(see example config file provided)
202.1.1 by Lucio Torre
added amqp client
20
"""
21
import sys
22
import os
250.1.1 by Chris Davis
First iteration, basically everything functional. carbon-cache now consumes a queue (carbon.`hostname`)
23
import socket
202.1.1 by Lucio Torre
added amqp client
24
from optparse import OptionParser
25
26
from twisted.internet.defer import inlineCallbacks
27
from twisted.internet import reactor
299.1.121 by Sidnei da Silva
Merge from twistd-plugins [r=chrismd]
28
from twisted.internet.protocol import ReconnectingClientFactory
202.1.1 by Lucio Torre
added amqp client
29
from txamqp.protocol import AMQClient
30
from txamqp.client import TwistedDelegate
31
import txamqp.spec
32
33
try:
34
    import carbon
35
except:
36
    # this is being run directly, carbon is not installed
37
    LIB_DIR = os.path.dirname(os.path.dirname(__file__))
38
    sys.path.insert(0, LIB_DIR)
39
299.1.142 by Chris Davis
refactored ClientManager into less hackish and more correctly named protocolManager
40
import carbon.protocols #satisfy import order requirements
202.1.1 by Lucio Torre
added amqp client
41
from carbon.instrumentation import increment
42
from carbon.events import metricReceived
250.1.2 by Chris Davis
implemented support for multiple configurable binding patterns, still don't have a good way to clear out old bindings though...
43
from carbon.conf import settings
202.1.1 by Lucio Torre
added amqp client
44
from carbon import log
45
46
250.1.1 by Chris Davis
First iteration, basically everything functional. carbon-cache now consumes a queue (carbon.`hostname`)
47
HOSTNAME = socket.gethostname().split('.')[0]
48
202.1.1 by Lucio Torre
added amqp client
49
50
class AMQPGraphiteProtocol(AMQClient):
51
    """This is the protocol instance that will receive and post metrics."""
52
53
    consumer_tag = "graphite_consumer"
54
55
    @inlineCallbacks
56
    def connectionMade(self):
57
        yield AMQClient.connectionMade(self)
58
        log.listener("New AMQP connection made")
59
        yield self.setup()
60
        yield self.receive_loop()
61
62
    @inlineCallbacks
63
    def setup(self):
250.1.2 by Chris Davis
implemented support for multiple configurable binding patterns, still don't have a good way to clear out old bindings though...
64
        exchange = self.factory.exchange_name
65
202.1.1 by Lucio Torre
added amqp client
66
        yield self.authenticate(self.factory.username, self.factory.password)
67
        chan = yield self.channel(1)
68
        yield chan.channel_open()
69
250.1.2 by Chris Davis
implemented support for multiple configurable binding patterns, still don't have a good way to clear out old bindings though...
70
        # declare the exchange and queue
71
        yield chan.exchange_declare(exchange=exchange, type="topic",
72
                                    durable=True, auto_delete=False)
73
250.1.9 by Chris Davis
now using a private amqp queue to avoid problems when changing binding patterns and to avoid other consumers stealing data from the queue
74
        # we use a private queue to avoid conflicting with existing bindings
75
        reply = yield chan.queue_declare(exclusive=True)
76
        my_queue = reply.queue
250.1.2 by Chris Davis
implemented support for multiple configurable binding patterns, still don't have a good way to clear out old bindings though...
77
78
        # bind each configured metric pattern
79
        for bind_pattern in settings.BIND_PATTERNS:
80
            log.listener("binding exchange '%s' to queue '%s' with pattern %s" \
250.1.9 by Chris Davis
now using a private amqp queue to avoid problems when changing binding patterns and to avoid other consumers stealing data from the queue
81
                         % (exchange, my_queue, bind_pattern))
82
            yield chan.queue_bind(exchange=exchange, queue=my_queue,
250.1.2 by Chris Davis
implemented support for multiple configurable binding patterns, still don't have a good way to clear out old bindings though...
83
                                  routing_key=bind_pattern)
84
250.1.9 by Chris Davis
now using a private amqp queue to avoid problems when changing binding patterns and to avoid other consumers stealing data from the queue
85
        yield chan.basic_consume(queue=my_queue, no_ack=True,
202.1.1 by Lucio Torre
added amqp client
86
                                 consumer_tag=self.consumer_tag)
87
    @inlineCallbacks
88
    def receive_loop(self):
89
        queue = yield self.queue(self.consumer_tag)
90
91
        while True:
92
            msg = yield queue.get()
93
            self.processMessage(msg)
94
95
    def processMessage(self, message):
96
        """Parse a message and post it as a metric."""
97
98
        if self.factory.verbose:
99
            log.listener("Message received: %s" % (message,))
100
250.1.1 by Chris Davis
First iteration, basically everything functional. carbon-cache now consumes a queue (carbon.`hostname`)
101
        metric = message.routing_key
102
202.1.1 by Lucio Torre
added amqp client
103
        for line in message.content.body.split("\n"):
299.1.75 by Chris Davis
fixed bug parsing AMQP messages ending in newline
104
            line = line.strip()
105
            if not line:
106
                continue
202.1.1 by Lucio Torre
added amqp client
107
            try:
299.2.25 by Chris Davis
merging in Pete's patch from Bug #705613
108
                if settings.get("AMQP_METRIC_NAME_IN_BODY", False):
299.1.75 by Chris Davis
fixed bug parsing AMQP messages ending in newline
109
                    metric, value, timestamp = line.split()
299.2.25 by Chris Davis
merging in Pete's patch from Bug #705613
110
                else:
299.1.75 by Chris Davis
fixed bug parsing AMQP messages ending in newline
111
                    value, timestamp = line.split()
202.1.1 by Lucio Torre
added amqp client
112
                datapoint = ( float(timestamp), float(value) )
113
            except ValueError:
250.1.1 by Chris Davis
First iteration, basically everything functional. carbon-cache now consumes a queue (carbon.`hostname`)
114
                log.listener("invalid message line: %s" % (line,))
202.1.3 by Lucio Torre
fixed proposal requests
115
                continue
202.1.1 by Lucio Torre
added amqp client
116
117
            increment('metricsReceived')
118
            metricReceived(metric, datapoint)
250.1.9 by Chris Davis
now using a private amqp queue to avoid problems when changing binding patterns and to avoid other consumers stealing data from the queue
119
202.1.1 by Lucio Torre
added amqp client
120
            if self.factory.verbose:
121
                log.listener("Metric posted: %s %s %s" %
122
                             (metric, value, timestamp,))
123
124
125
class AMQPReconnectingFactory(ReconnectingClientFactory):
126
    """The reconnecting factory.
127
128
    Knows how to create the extended client and how to keep trying to
129
    connect in case of errors."""
130
131
    protocol = AMQPGraphiteProtocol
132
133
    def __init__(self, username, password, delegate, vhost, spec, channel,
250.1.9 by Chris Davis
now using a private amqp queue to avoid problems when changing binding patterns and to avoid other consumers stealing data from the queue
134
                 exchange_name, verbose):
202.1.1 by Lucio Torre
added amqp client
135
        self.username = username
136
        self.password = password
137
        self.delegate = delegate
138
        self.vhost = vhost
139
        self.spec = spec
140
        self.channel = channel
141
        self.exchange_name = exchange_name
142
        self.verbose = verbose
143
144
    def buildProtocol(self, addr):
145
        p = self.protocol(self.delegate, self.vhost, self.spec)
146
        p.factory = self
147
        return p
148
149
299.9.7 by Sidnei da Silva
- Refactor carbon-cache service startup.
150
def createAMQPListener(username, password, vhost, exchange_name,
151
                       spec=None, channel=1, verbose=False):
152
    """
153
    Create an C{AMQPReconnectingFactory} configured with the specified options.
154
    """
202.1.1 by Lucio Torre
added amqp client
155
    # use provided spec if not specified
156
    if not spec:
157
        spec = txamqp.spec.load(os.path.normpath(
158
            os.path.join(os.path.dirname(__file__), 'amqp0-8.xml')))
159
160
    delegate = TwistedDelegate()
161
    factory = AMQPReconnectingFactory(username, password, delegate, vhost,
250.1.9 by Chris Davis
now using a private amqp queue to avoid problems when changing binding patterns and to avoid other consumers stealing data from the queue
162
                                      spec, channel, exchange_name,
202.1.1 by Lucio Torre
added amqp client
163
                                      verbose=verbose)
299.9.7 by Sidnei da Silva
- Refactor carbon-cache service startup.
164
    return factory
165
166
167
def startReceiver(host, port, username, password, vhost, exchange_name,
168
                  spec=None, channel=1, verbose=False):
169
    """
170
    Starts a twisted process that will read messages on the amqp broker and
171
    post them as metrics.
172
    """
173
    factory = createAMQPListener(username, password, vhost, exchange_name,
174
                                 spec=spec, channel=channel, verbose=verbose)
202.1.1 by Lucio Torre
added amqp client
175
    reactor.connectTCP(host, port, factory)
176
177
178
def main():
179
    parser = OptionParser()
180
    parser.add_option("-t", "--host", dest="host",
181
                      help="host name", metavar="HOST", default="localhost")
182
183
    parser.add_option("-p", "--port", dest="port", type=int,
184
                      help="port number", metavar="PORT",
185
                      default=5672)
186
187
    parser.add_option("-u", "--user", dest="username",
188
                      help="username", metavar="USERNAME",
189
                      default="guest")
190
191
    parser.add_option("-w", "--password", dest="password",
192
                      help="password", metavar="PASSWORD",
193
                      default="guest")
194
195
    parser.add_option("-V", "--vhost", dest="vhost",
196
                      help="vhost", metavar="VHOST",
197
                      default="/")
198
250.1.1 by Chris Davis
First iteration, basically everything functional. carbon-cache now consumes a queue (carbon.`hostname`)
199
    parser.add_option("-e", "--exchange", dest="exchange",
200
                      help="exchange", metavar="EXCHANGE",
201
                      default="graphite")
202
202.1.1 by Lucio Torre
added amqp client
203
    parser.add_option("-v", "--verbose", dest="verbose",
204
                      help="verbose",
205
                      default=False, action="store_true")
206
207
    (options, args) = parser.parse_args()
208
209
210
    log.logToStdout()
211
    startReceiver(options.host, options.port, options.username,
212
                  options.password, vhost=options.vhost,
250.1.9 by Chris Davis
now using a private amqp queue to avoid problems when changing binding patterns and to avoid other consumers stealing data from the queue
213
                  exchange_name=options.exchange, verbose=options.verbose)
202.1.1 by Lucio Torre
added amqp client
214
    reactor.run()
215
216
if __name__ == "__main__":
217
    main()