1
from twisted.internet import task, defer
2
from twisted.protocols.basic import LineOnlyReceiver
3
from twisted.internet.protocol import (
4
DatagramProtocol, ReconnectingClientFactory)
7
class StatsDServerProtocol(DatagramProtocol):
8
"""A Twisted-based implementation of the StatsD server.
10
Data is received via UDP for local aggregation and then sent to a Graphite
14
def __init__(self, processor):
15
self.processor = processor
17
def datagramReceived(self, data, (host, port)):
18
"""Process received data and store it locally."""
19
self.processor.process(data)
22
class GraphiteProtocol(LineOnlyReceiver):
23
"""A client protocol for talking to Graphite.
25
Messages to Graphite are line-based and C{\n}-separated.
30
def __init__(self, processor, interval):
31
self.processor = processor
32
self.interval = interval
33
self.flush_task = task.LoopingCall(self.flushProcessor)
34
self.flush_task.start(self.interval / 1000, False)
36
@defer.inlineCallbacks
37
def flushProcessor(self):
38
"""Flush messages queued in the processor to Graphite."""
39
for message in self.processor.flush(interval=self.interval):
40
for line in message.splitlines():
46
class GraphiteClientFactory(ReconnectingClientFactory):
47
"""A reconnecting Graphite client."""
49
def __init__(self, processor, interval):
50
self.processor = processor
51
self.interval = interval
53
def buildProtocol(self, addr):
55
Build a new instance of the L{Graphite} protocol, bound to the
59
protocol = GraphiteProtocol(self.processor, self.interval)
60
protocol.factory = self