202.1.1
by Lucio Torre
added amqp client |
1 |
#!/usr/bin/env python
|
2 |
"""
|
|
3 |
Copyright 2009 Lucio Torre <lucio.torre@canonical.com>
|
|
4 |
||
5 |
This is an AMQP client that will connect to the specified broker and read
|
|
6 |
messages, parse them, and post them as metrics.
|
|
7 |
||
250.1.5
by Chris Davis
updated module docstring |
8 |
Each message's routing key should be a metric name.
|
9 |
The message body should be one or more lines of the form:
|
|
10 |
||
11 |
<value> <timestamp>\n
|
|
12 |
<value> <timestamp>\n
|
|
13 |
...
|
|
14 |
||
15 |
Where each <value> is a real number and <timestamp> is a UNIX epoch time.
|
|
16 |
||
17 |
||
18 |
This program can be started standalone for testing or using carbon-cache.py
|
|
19 |
(see example config file provided)
|
|
202.1.1
by Lucio Torre
added amqp client |
20 |
"""
|
21 |
import sys |
|
22 |
import os |
|
250.1.1
by Chris Davis
First iteration, basically everything functional. carbon-cache now consumes a queue (carbon.`hostname`) |
23 |
import socket |
202.1.1
by Lucio Torre
added amqp client |
24 |
from optparse import OptionParser |
25 |
||
26 |
from twisted.internet.defer import inlineCallbacks |
|
27 |
from twisted.internet import reactor |
|
299.1.121
by Sidnei da Silva
Merge from twistd-plugins [r=chrismd] |
28 |
from twisted.internet.protocol import ReconnectingClientFactory |
202.1.1
by Lucio Torre
added amqp client |
29 |
from txamqp.protocol import AMQClient |
30 |
from txamqp.client import TwistedDelegate |
|
31 |
import txamqp.spec |
|
32 |
||
33 |
try: |
|
34 |
import carbon |
|
35 |
except: |
|
36 |
# this is being run directly, carbon is not installed
|
|
37 |
LIB_DIR = os.path.dirname(os.path.dirname(__file__)) |
|
38 |
sys.path.insert(0, LIB_DIR) |
|
39 |
||
299.1.142
by Chris Davis
refactored ClientManager into less hackish and more correctly named protocolManager |
40 |
import carbon.protocols #satisfy import order requirements |
202.1.1
by Lucio Torre
added amqp client |
41 |
from carbon.instrumentation import increment |
42 |
from carbon.events import metricReceived |
|
250.1.2
by Chris Davis
implemented support for multiple configurable binding patterns, still don't have a good way to clear out old bindings though... |
43 |
from carbon.conf import settings |
202.1.1
by Lucio Torre
added amqp client |
44 |
from carbon import log |
45 |
||
46 |
||
250.1.1
by Chris Davis
First iteration, basically everything functional. carbon-cache now consumes a queue (carbon.`hostname`) |
47 |
HOSTNAME = socket.gethostname().split('.')[0] |
48 |
||
202.1.1
by Lucio Torre
added amqp client |
49 |
|
50 |
class AMQPGraphiteProtocol(AMQClient): |
|
51 |
"""This is the protocol instance that will receive and post metrics."""
|
|
52 |
||
53 |
consumer_tag = "graphite_consumer" |
|
54 |
||
55 |
@inlineCallbacks
|
|
56 |
def connectionMade(self): |
|
57 |
yield AMQClient.connectionMade(self) |
|
58 |
log.listener("New AMQP connection made") |
|
59 |
yield self.setup() |
|
60 |
yield self.receive_loop() |
|
61 |
||
62 |
@inlineCallbacks
|
|
63 |
def setup(self): |
|
250.1.2
by Chris Davis
implemented support for multiple configurable binding patterns, still don't have a good way to clear out old bindings though... |
64 |
exchange = self.factory.exchange_name |
65 |
||
202.1.1
by Lucio Torre
added amqp client |
66 |
yield self.authenticate(self.factory.username, self.factory.password) |
67 |
chan = yield self.channel(1) |
|
68 |
yield chan.channel_open() |
|
69 |
||
250.1.2
by Chris Davis
implemented support for multiple configurable binding patterns, still don't have a good way to clear out old bindings though... |
70 |
# declare the exchange and queue
|
71 |
yield chan.exchange_declare(exchange=exchange, type="topic", |
|
72 |
durable=True, auto_delete=False) |
|
73 |
||
250.1.9
by Chris Davis
now using a private amqp queue to avoid problems when changing binding patterns and to avoid other consumers stealing data from the queue |
74 |
# we use a private queue to avoid conflicting with existing bindings
|
75 |
reply = yield chan.queue_declare(exclusive=True) |
|
76 |
my_queue = reply.queue |
|
250.1.2
by Chris Davis
implemented support for multiple configurable binding patterns, still don't have a good way to clear out old bindings though... |
77 |
|
78 |
# bind each configured metric pattern
|
|
79 |
for bind_pattern in settings.BIND_PATTERNS: |
|
80 |
log.listener("binding exchange '%s' to queue '%s' with pattern %s" \ |
|
250.1.9
by Chris Davis
now using a private amqp queue to avoid problems when changing binding patterns and to avoid other consumers stealing data from the queue |
81 |
% (exchange, my_queue, bind_pattern)) |
82 |
yield chan.queue_bind(exchange=exchange, queue=my_queue, |
|
250.1.2
by Chris Davis
implemented support for multiple configurable binding patterns, still don't have a good way to clear out old bindings though... |
83 |
routing_key=bind_pattern) |
84 |
||
250.1.9
by Chris Davis
now using a private amqp queue to avoid problems when changing binding patterns and to avoid other consumers stealing data from the queue |
85 |
yield chan.basic_consume(queue=my_queue, no_ack=True, |
202.1.1
by Lucio Torre
added amqp client |
86 |
consumer_tag=self.consumer_tag) |
87 |
@inlineCallbacks
|
|
88 |
def receive_loop(self): |
|
89 |
queue = yield self.queue(self.consumer_tag) |
|
90 |
||
91 |
while True: |
|
92 |
msg = yield queue.get() |
|
93 |
self.processMessage(msg) |
|
94 |
||
95 |
def processMessage(self, message): |
|
96 |
"""Parse a message and post it as a metric."""
|
|
97 |
||
98 |
if self.factory.verbose: |
|
99 |
log.listener("Message received: %s" % (message,)) |
|
100 |
||
250.1.1
by Chris Davis
First iteration, basically everything functional. carbon-cache now consumes a queue (carbon.`hostname`) |
101 |
metric = message.routing_key |
102 |
||
202.1.1
by Lucio Torre
added amqp client |
103 |
for line in message.content.body.split("\n"): |
299.1.75
by Chris Davis
fixed bug parsing AMQP messages ending in newline |
104 |
line = line.strip() |
105 |
if not line: |
|
106 |
continue
|
|
202.1.1
by Lucio Torre
added amqp client |
107 |
try: |
299.2.25
by Chris Davis
merging in Pete's patch from Bug #705613 |
108 |
if settings.get("AMQP_METRIC_NAME_IN_BODY", False): |
299.1.75
by Chris Davis
fixed bug parsing AMQP messages ending in newline |
109 |
metric, value, timestamp = line.split() |
299.2.25
by Chris Davis
merging in Pete's patch from Bug #705613 |
110 |
else: |
299.1.75
by Chris Davis
fixed bug parsing AMQP messages ending in newline |
111 |
value, timestamp = line.split() |
202.1.1
by Lucio Torre
added amqp client |
112 |
datapoint = ( float(timestamp), float(value) ) |
113 |
except ValueError: |
|
250.1.1
by Chris Davis
First iteration, basically everything functional. carbon-cache now consumes a queue (carbon.`hostname`) |
114 |
log.listener("invalid message line: %s" % (line,)) |
202.1.3
by Lucio Torre
fixed proposal requests |
115 |
continue
|
202.1.1
by Lucio Torre
added amqp client |
116 |
|
117 |
increment('metricsReceived') |
|
118 |
metricReceived(metric, datapoint) |
|
250.1.9
by Chris Davis
now using a private amqp queue to avoid problems when changing binding patterns and to avoid other consumers stealing data from the queue |
119 |
|
202.1.1
by Lucio Torre
added amqp client |
120 |
if self.factory.verbose: |
121 |
log.listener("Metric posted: %s %s %s" % |
|
122 |
(metric, value, timestamp,)) |
|
123 |
||
124 |
||
125 |
class AMQPReconnectingFactory(ReconnectingClientFactory): |
|
126 |
"""The reconnecting factory.
|
|
127 |
||
128 |
Knows how to create the extended client and how to keep trying to
|
|
129 |
connect in case of errors."""
|
|
130 |
||
131 |
protocol = AMQPGraphiteProtocol |
|
132 |
||
133 |
def __init__(self, username, password, delegate, vhost, spec, channel, |
|
250.1.9
by Chris Davis
now using a private amqp queue to avoid problems when changing binding patterns and to avoid other consumers stealing data from the queue |
134 |
exchange_name, verbose): |
202.1.1
by Lucio Torre
added amqp client |
135 |
self.username = username |
136 |
self.password = password |
|
137 |
self.delegate = delegate |
|
138 |
self.vhost = vhost |
|
139 |
self.spec = spec |
|
140 |
self.channel = channel |
|
141 |
self.exchange_name = exchange_name |
|
142 |
self.verbose = verbose |
|
143 |
||
144 |
def buildProtocol(self, addr): |
|
145 |
p = self.protocol(self.delegate, self.vhost, self.spec) |
|
146 |
p.factory = self |
|
147 |
return p |
|
148 |
||
149 |
||
299.9.7
by Sidnei da Silva
- Refactor carbon-cache service startup. |
150 |
def createAMQPListener(username, password, vhost, exchange_name, |
151 |
spec=None, channel=1, verbose=False): |
|
152 |
"""
|
|
153 |
Create an C{AMQPReconnectingFactory} configured with the specified options.
|
|
154 |
"""
|
|
202.1.1
by Lucio Torre
added amqp client |
155 |
# use provided spec if not specified
|
156 |
if not spec: |
|
157 |
spec = txamqp.spec.load(os.path.normpath( |
|
158 |
os.path.join(os.path.dirname(__file__), 'amqp0-8.xml'))) |
|
159 |
||
160 |
delegate = TwistedDelegate() |
|
161 |
factory = AMQPReconnectingFactory(username, password, delegate, vhost, |
|
250.1.9
by Chris Davis
now using a private amqp queue to avoid problems when changing binding patterns and to avoid other consumers stealing data from the queue |
162 |
spec, channel, exchange_name, |
202.1.1
by Lucio Torre
added amqp client |
163 |
verbose=verbose) |
299.9.7
by Sidnei da Silva
- Refactor carbon-cache service startup. |
164 |
return factory |
165 |
||
166 |
||
167 |
def startReceiver(host, port, username, password, vhost, exchange_name, |
|
168 |
spec=None, channel=1, verbose=False): |
|
169 |
"""
|
|
170 |
Starts a twisted process that will read messages on the amqp broker and
|
|
171 |
post them as metrics.
|
|
172 |
"""
|
|
173 |
factory = createAMQPListener(username, password, vhost, exchange_name, |
|
174 |
spec=spec, channel=channel, verbose=verbose) |
|
202.1.1
by Lucio Torre
added amqp client |
175 |
reactor.connectTCP(host, port, factory) |
176 |
||
177 |
||
178 |
def main(): |
|
179 |
parser = OptionParser() |
|
180 |
parser.add_option("-t", "--host", dest="host", |
|
181 |
help="host name", metavar="HOST", default="localhost") |
|
182 |
||
183 |
parser.add_option("-p", "--port", dest="port", type=int, |
|
184 |
help="port number", metavar="PORT", |
|
185 |
default=5672) |
|
186 |
||
187 |
parser.add_option("-u", "--user", dest="username", |
|
188 |
help="username", metavar="USERNAME", |
|
189 |
default="guest") |
|
190 |
||
191 |
parser.add_option("-w", "--password", dest="password", |
|
192 |
help="password", metavar="PASSWORD", |
|
193 |
default="guest") |
|
194 |
||
195 |
parser.add_option("-V", "--vhost", dest="vhost", |
|
196 |
help="vhost", metavar="VHOST", |
|
197 |
default="/") |
|
198 |
||
250.1.1
by Chris Davis
First iteration, basically everything functional. carbon-cache now consumes a queue (carbon.`hostname`) |
199 |
parser.add_option("-e", "--exchange", dest="exchange", |
200 |
help="exchange", metavar="EXCHANGE", |
|
201 |
default="graphite") |
|
202 |
||
202.1.1
by Lucio Torre
added amqp client |
203 |
parser.add_option("-v", "--verbose", dest="verbose", |
204 |
help="verbose", |
|
205 |
default=False, action="store_true") |
|
206 |
||
207 |
(options, args) = parser.parse_args() |
|
208 |
||
209 |
||
210 |
log.logToStdout() |
|
211 |
startReceiver(options.host, options.port, options.username, |
|
212 |
options.password, vhost=options.vhost, |
|
250.1.9
by Chris Davis
now using a private amqp queue to avoid problems when changing binding patterns and to avoid other consumers stealing data from the queue |
213 |
exchange_name=options.exchange, verbose=options.verbose) |
202.1.1
by Lucio Torre
added amqp client |
214 |
reactor.run() |
215 |
||
216 |
if __name__ == "__main__": |
|
217 |
main() |