1
from twisted.internet import reactor
2
from twisted.internet.protocol import Factory
3
from twisted.internet.error import ConnectionDone
4
from twisted.protocols.basic import LineOnlyReceiver, Int32StringReceiver
5
from carbon.cache import MetricCache
6
from carbon.relay import relay
7
from carbon.instrumentation import increment
8
from carbon.events import metricReceived
12
import cPickle as pickle
18
def connectionMade(self):
19
self.peer = self.transport.getPeer()
20
self.peerAddr = "%s:%d" % (self.peer.host, self.peer.port)
21
log.listener("%s connection with %s established" % (self.__class__.__name__, self.peerAddr))
23
def connectionLost(self, reason):
24
if reason.check(ConnectionDone):
25
log.listener("%s connection with %s closed cleanly" % (self.__class__.__name__, self.peerAddr))
27
log.listener("%s connection with %s lost: %s" % (self.__class__.__name__, self.peerAddr, reason.value))
30
class MetricLineReceiver(LoggingMixin, LineOnlyReceiver):
33
def lineReceived(self, line):
35
metric, value, timestamp = line.strip().split()
36
datapoint = ( float(timestamp), float(value) )
38
log.listener('invalid line received from client %s, disconnecting' % self.peerAddr)
39
self.transport.loseConnection()
42
increment('metricsReceived')
43
metricReceived(metric, datapoint)
46
class MetricPickleReceiver(LoggingMixin, Int32StringReceiver):
47
def stringReceived(self, data):
49
datapoints = pickle.loads(data)
51
log.listener('invalid pickle received from client %s, disconnecting' % self.peerAddr)
52
self.transport.loseConnection()
55
for (metric, datapoint) in datapoints:
56
datapoint = ( float(datapoint[0]), float(datapoint[1]) ) #force proper types
57
if datapoint[1] == datapoint[1]: # filter out NaN values
58
metricReceived(metric, datapoint)
60
increment('metricsReceived', len(datapoints))
63
class CacheQueryHandler(LoggingMixin, Int32StringReceiver):
64
def stringReceived(self, metric):
65
values = MetricCache.get(metric, [])
66
log.msg('cache query for %s returned %d values' % (metric, len(values)))
67
response = pickle.dumps(values, protocol=-1)
68
self.sendString(response)
69
increment('cacheQueries')
72
def startListener(interface, port, protocol):
74
factory.protocol = protocol
75
return reactor.listenTCP( int(port), factory, interface=interface )