~verterok/txstatsd/use-mock-instead-of-mocker

« back to all changes in this revision

Viewing changes to txstatsd/protocol.py

  • Committer: Tarmac
  • Author(s): Anthony Lenton
  • Date: 2012-07-13 18:03:00 UTC
  • mfrom: (98.1.3 straight)
  • Revision ID: tarmac-20120713180300-pjbneixiihyo1ato
[r=sidnei] Made it possible to use the non-twisted client code even if twisted is unavailable.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2011-2012 Canonical Services Ltd
 
2
#
 
3
# Permission is hereby granted, free of charge, to any person obtaining
 
4
# a copy of this software and associated documentation files (the
 
5
# "Software"), to deal in the Software without restriction, including
 
6
# without limitation the rights to use, copy, modify, merge, publish,
 
7
# distribute, sublicense, and/or sell copies of the Software, and to
 
8
# permit persons to whom the Software is furnished to do so, subject to
 
9
# the following conditions:
 
10
#
 
11
# The above copyright notice and this permission notice shall be
 
12
# included in all copies or substantial portions of the Software.
 
13
#
 
14
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 
15
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 
16
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
 
17
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
 
18
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
 
19
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
 
20
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 
21
 
 
22
import socket
 
23
 
 
24
from twisted.internet.defer import inlineCallbacks, returnValue
 
25
from twisted.internet.protocol import DatagramProtocol
 
26
from twisted.python import log
 
27
 
 
28
 
 
29
class StatsDClientProtocol(DatagramProtocol):
 
30
    """A Twisted-based implementation of the StatsD client protocol.
 
31
 
 
32
    Data is sent via UDP to a StatsD server for aggregation.
 
33
    """
 
34
 
 
35
    def __init__(self, client):
 
36
        self.client = client
 
37
 
 
38
    def startProtocol(self):
 
39
        """Connect to destination host."""
 
40
        self.client.connect(self.transport)
 
41
 
 
42
    def stopProtocol(self):
 
43
        """Connection was lost."""
 
44
        self.client.disconnect()
 
45
 
 
46
 
 
47
class TwistedStatsDClient(object):
 
48
 
 
49
    def __init__(self, host, port,
 
50
                 connect_callback=None,
 
51
                 disconnect_callback=None,
 
52
                 resolver_errback=None):
 
53
        """
 
54
        Build a connection that reports to the endpoint (on C{host} and
 
55
        C{port}) using UDP.
 
56
 
 
57
        @param host: The StatsD server host.
 
58
        @param port: The StatsD server port.
 
59
        @param resolver_errback: The errback to invoke should
 
60
            issues occur resolving the supplied C{host}.
 
61
        @param connect_callback: The callback to invoke on connection.
 
62
        @param disconnect_callback: The callback to invoke on disconnection.
 
63
        """
 
64
        from twisted.internet import reactor
 
65
 
 
66
        self.reactor = reactor
 
67
 
 
68
        @inlineCallbacks
 
69
        def resolve(host):
 
70
            self.host = yield reactor.resolve(host)
 
71
            returnValue(self.host)
 
72
 
 
73
        self.original_host = host
 
74
        self.host = None
 
75
        self.resolver = resolve(host)
 
76
        if resolver_errback is None:
 
77
            self.resolver.addErrback(log.err)
 
78
        else:
 
79
            self.resolver.addErrback(resolver_errback)
 
80
 
 
81
        self.port = port
 
82
        self.connect_callback = connect_callback
 
83
        self.disconnect_callback = disconnect_callback
 
84
 
 
85
        self.transport = None
 
86
 
 
87
    def __str__(self):
 
88
        return "%s:%d" % (self.original_host, self.port)
 
89
 
 
90
    @inlineCallbacks
 
91
    def connect(self, transport=None):
 
92
        """Connect to the StatsD server."""
 
93
        host = yield self.resolver
 
94
        if host is not None:
 
95
            self.transport = transport
 
96
            if self.transport is not None:
 
97
                if self.connect_callback is not None:
 
98
                    self.connect_callback()
 
99
 
 
100
    def disconnect(self):
 
101
        """Disconnect from the StatsD server."""
 
102
        if self.disconnect_callback is not None:
 
103
            self.disconnect_callback()
 
104
        self.transport = None
 
105
 
 
106
    def write(self, data, callback=None):
 
107
        """Send the metric to the StatsD server.
 
108
 
 
109
        @param data: The data to be sent.
 
110
        @param callback: The callback to which the result should be sent.
 
111
            B{Note}: The C{callback} will be called in the C{reactor}
 
112
            thread, and not in the thread of the original caller.
 
113
        """
 
114
        self.reactor.callFromThread(self._write, data, callback)
 
115
 
 
116
    def _write(self, data, callback):
 
117
        """Send the metric to the StatsD server.
 
118
 
 
119
        @param data: The data to be sent.
 
120
        @param callback: The callback to which the result should be sent.
 
121
        @raise twisted.internet.error.MessageLengthError: If the size of data
 
122
            is too large.
 
123
        """
 
124
        if self.host is not None and self.transport is not None:
 
125
            try:
 
126
                bytes_sent = self.transport.write(data, (self.host, self.port))
 
127
                if callback is not None:
 
128
                    callback(bytes_sent)
 
129
            except (OverflowError, TypeError, socket.error, socket.gaierror):
 
130
                if callback is not None:
 
131
                    callback(None)