~ahasenack/graphite/packaging-experiment-1source-3binaries

« back to all changes in this revision

Viewing changes to carbon/lib/carbon/listeners.py

merging in a buttload of changes from my private branch. this is going to be the 0.9.5 release and is probably the most substantial release to date.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from twisted.internet import reactor
 
2
from twisted.internet.protocol import Factory
 
3
from twisted.internet.error import ConnectionDone
 
4
from twisted.protocols.basic import LineOnlyReceiver, Int32StringReceiver
 
5
from carbon.cache import MetricCache
 
6
from carbon.relay import relay
 
7
from carbon.instrumentation import increment
 
8
from carbon.events import metricReceived
 
9
from carbon import log
 
10
 
 
11
try:
 
12
  import cPickle as pickle
 
13
except ImportError:
 
14
  import pickle
 
15
 
 
16
 
 
17
class LoggingMixin:
 
18
  def connectionMade(self):
 
19
    self.peer = self.transport.getPeer()
 
20
    self.peerAddr = "%s:%d" % (self.peer.host, self.peer.port)
 
21
    log.listener("%s connection with %s established" % (self.__class__.__name__, self.peerAddr))
 
22
 
 
23
  def connectionLost(self, reason):
 
24
    if reason.check(ConnectionDone):
 
25
      log.listener("%s connection with %s closed cleanly" % (self.__class__.__name__, self.peerAddr))
 
26
    else:
 
27
      log.listener("%s connection with %s lost: %s" % (self.__class__.__name__, self.peerAddr, reason.value))
 
28
 
 
29
 
 
30
class MetricLineReceiver(LoggingMixin, LineOnlyReceiver):
 
31
  delimiter = '\n'
 
32
 
 
33
  def lineReceived(self, line):
 
34
    try:
 
35
      metric, value, timestamp = line.strip().split()
 
36
      datapoint = ( float(timestamp), float(value) )
 
37
    except:
 
38
      log.listener('invalid line received from client %s, disconnecting' % self.peerAddr)
 
39
      self.transport.loseConnection()
 
40
      return
 
41
 
 
42
    increment('metricsReceived')
 
43
    metricReceived(metric, datapoint)
 
44
 
 
45
 
 
46
class MetricPickleReceiver(LoggingMixin, Int32StringReceiver):
 
47
  def stringReceived(self, data):
 
48
    try:
 
49
      datapoints = pickle.loads(data)
 
50
    except:
 
51
      log.listener('invalid pickle received from client %s, disconnecting' % self.peerAddr)
 
52
      self.transport.loseConnection()
 
53
      return
 
54
 
 
55
    for (metric, datapoint) in datapoints:
 
56
      datapoint = ( float(datapoint[0]), float(datapoint[1]) ) #force proper types
 
57
      if datapoint[1] == datapoint[1]: # filter out NaN values
 
58
        metricReceived(metric, datapoint)
 
59
 
 
60
    increment('metricsReceived', len(datapoints))
 
61
 
 
62
 
 
63
class CacheQueryHandler(LoggingMixin, Int32StringReceiver):
 
64
  def stringReceived(self, metric):
 
65
    values = MetricCache.get(metric, [])
 
66
    log.msg('cache query for %s returned %d values' % (metric, len(values)))
 
67
    response = pickle.dumps(values, protocol=-1)
 
68
    self.sendString(response)
 
69
    increment('cacheQueries')
 
70
 
 
71
 
 
72
def startListener(interface, port, protocol):
 
73
  factory = Factory()
 
74
  factory.protocol = protocol
 
75
  return reactor.listenTCP( int(port), factory, interface=interface )