~landscape/txstatsd/trunk

« back to all changes in this revision

Viewing changes to txstatsd/server/protocol.py

  • Committer: Sidnei da Silva
  • Date: 2011-07-27 20:35:23 UTC
  • mfrom: (11.2.6 txstatsd)
  • Revision ID: sidnei.da.silva@canonical.com-20110727203523-mmhr5clzkfyqva8f
- Merge from lp:txstatsd

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from twisted.internet import task, defer
 
2
from twisted.protocols.basic import LineOnlyReceiver
 
3
from twisted.internet.protocol import (
 
4
    DatagramProtocol, ReconnectingClientFactory)
 
5
 
 
6
 
 
7
class StatsDServerProtocol(DatagramProtocol):
 
8
    """A Twisted-based implementation of the StatsD server.
 
9
 
 
10
    Data is received via UDP for local aggregation and then sent to a Graphite
 
11
    server via TCP.
 
12
    """
 
13
 
 
14
    def __init__(self, processor):
 
15
        self.processor = processor
 
16
 
 
17
    def datagramReceived(self, data, (host, port)):
 
18
        """Process received data and store it locally."""
 
19
        self.processor.process(data)
 
20
 
 
21
 
 
22
class GraphiteProtocol(LineOnlyReceiver):
 
23
    """A client protocol for talking to Graphite.
 
24
 
 
25
    Messages to Graphite are line-based and C{\n}-separated.
 
26
    """
 
27
 
 
28
    delimiter = "\n"
 
29
 
 
30
    def __init__(self, processor, interval):
 
31
        self.processor = processor
 
32
        self.interval = interval
 
33
        self.flush_task = task.LoopingCall(self.flushProcessor)
 
34
        self.flush_task.start(self.interval / 1000, False)
 
35
 
 
36
    @defer.inlineCallbacks
 
37
    def flushProcessor(self):
 
38
        """Flush messages queued in the processor to Graphite."""
 
39
        for message in self.processor.flush(interval=self.interval):
 
40
            for line in message.splitlines():
 
41
                if self.connected:
 
42
                    self.sendLine(line)
 
43
                yield
 
44
 
 
45
 
 
46
class GraphiteClientFactory(ReconnectingClientFactory):
 
47
    """A reconnecting Graphite client."""
 
48
 
 
49
    def __init__(self, processor, interval):
 
50
        self.processor = processor
 
51
        self.interval = interval
 
52
 
 
53
    def buildProtocol(self, addr):
 
54
        """
 
55
        Build a new instance of the L{Graphite} protocol, bound to the
 
56
        L{MessageProcessor}.
 
57
        """
 
58
        self.resetDelay()
 
59
        protocol = GraphiteProtocol(self.processor, self.interval)
 
60
        protocol.factory = self
 
61
        return protocol