~lucio.torre/graphite/add-events

« back to all changes in this revision

Viewing changes to carbon/lib/carbon/amqp_listener.py

  • Committer: Lucio Torre
  • Date: 2011-07-25 18:59:21 UTC
  • mfrom: (299.1.145 graphite)
  • Revision ID: lucio.torre@gmail.com-20110725185921-1f7decdj8pwfpnmn
merged with trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
 
26
26
from twisted.internet.defer import inlineCallbacks
27
27
from twisted.internet import reactor
28
 
from twisted.internet.protocol import ClientCreator, Protocol, \
29
 
    ReconnectingClientFactory
 
28
from twisted.internet.protocol import ReconnectingClientFactory
30
29
from txamqp.protocol import AMQClient
31
30
from txamqp.client import TwistedDelegate
32
31
import txamqp.spec
38
37
    LIB_DIR = os.path.dirname(os.path.dirname(__file__))
39
38
    sys.path.insert(0, LIB_DIR)
40
39
 
41
 
import carbon.listeners #satisfy import order requirements
 
40
import carbon.protocols #satisfy import order requirements
42
41
from carbon.instrumentation import increment
43
42
from carbon.events import metricReceived
44
43
from carbon.conf import settings
102
101
        metric = message.routing_key
103
102
 
104
103
        for line in message.content.body.split("\n"):
 
104
            line = line.strip()
 
105
            if not line:
 
106
                continue
105
107
            try:
106
 
                value, timestamp = line.strip().split()
 
108
                if settings.get("AMQP_METRIC_NAME_IN_BODY", False):
 
109
                    metric, value, timestamp = line.split()
 
110
                else:
 
111
                    value, timestamp = line.split()
107
112
                datapoint = ( float(timestamp), float(value) )
108
113
            except ValueError:
109
114
                log.listener("invalid message line: %s" % (line,))
141
146
        p.factory = self
142
147
        return p
143
148
 
144
 
def startReceiver(host, port, username, password, vhost, exchange_name,
145
 
                  spec=None, channel=1, verbose=False):
146
 
    """Starts a twisted process that will read messages on the amqp broker
147
 
    and post them as metrics."""
148
149
 
 
150
def createAMQPListener(username, password, vhost, exchange_name,
 
151
                       spec=None, channel=1, verbose=False):
 
152
    """
 
153
    Create an C{AMQPReconnectingFactory} configured with the specified options.
 
154
    """
149
155
    # use provided spec if not specified
150
156
    if not spec:
151
157
        spec = txamqp.spec.load(os.path.normpath(
155
161
    factory = AMQPReconnectingFactory(username, password, delegate, vhost,
156
162
                                      spec, channel, exchange_name,
157
163
                                      verbose=verbose)
 
164
    return factory
 
165
 
 
166
 
 
167
def startReceiver(host, port, username, password, vhost, exchange_name,
 
168
                  spec=None, channel=1, verbose=False):
 
169
    """
 
170
    Starts a twisted process that will read messages on the amqp broker and
 
171
    post them as metrics.
 
172
    """
 
173
    factory = createAMQPListener(username, password, vhost, exchange_name,
 
174
                                 spec=spec, channel=channel, verbose=verbose)
158
175
    reactor.connectTCP(host, port, factory)
159
176
 
160
177