~ahasenack/graphite/packaging-experiment-1source-3binaries

« back to all changes in this revision

Viewing changes to webapp/graphite/render/carbonlink.py

merging in a buttload of changes from my private branch. this is going to be the 0.9.5 release and is probably the most substantial release to date.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
See the License for the specific language governing permissions and
13
13
limitations under the License."""
14
14
 
15
 
import socket, cPickle
 
15
import socket
 
16
import struct
16
17
from django.conf import settings
17
 
from web.logger import log
 
18
from graphite.logger import log
 
19
 
 
20
try:
 
21
  import cPickle as pickle
 
22
except ImportError:
 
23
  import pickle
18
24
 
19
25
 
20
26
class CarbonLinkPool:
56
62
  def sendRequest(self, metric):
57
63
    "Sends a request and returns a completion callback"
58
64
    host = self.selectHost(metric)
59
 
    query = metric + '\x00'
 
65
    query = struct.pack("!L", len(metric)) + metric # 32-bit length prefix string
60
66
    connection = None
61
67
 
62
68
    try:
67
73
      def receiveResponse():
68
74
        try:
69
75
          buf = ''
70
 
          while True:
71
 
            pkt = connection.recv(65536)
72
 
            assert pkt, "CarbonLink lost connection to %s:%d" % host
73
 
            buf += pkt
74
 
            if buf.endswith('\x00'): break
 
76
          remaining = 4
 
77
          message_size = None
 
78
 
 
79
          while remaining:
 
80
            packet = connection.recv(remaining)
 
81
            assert packet, "CarbonLink lost connection to %s:%d" % host
 
82
 
 
83
            buf += packet
 
84
 
 
85
            if message_size is None:
 
86
              if len(buf) == 4:
 
87
                remaining = message_size = struct.unpack("!L", buf)[0]
 
88
                buf = ''
 
89
                continue
 
90
 
 
91
            remaining -= len(packet)
75
92
 
76
93
          # We're done with the connection for this request, put it in the pool
77
94
          self.putConnectionInPool(host, connection)
78
95
 
79
96
          # Now parse the response
80
 
          pointStrings = cPickle.loads(buf[:-1])
81
 
          log.cache("CarbonLink to %s, retrieved %d points for %s" % (host,len(pointStrings),metric))
82
 
          for point in pointStrings:
83
 
            (value, timestamp) = point.split(' ',1)
84
 
            yield ( int(timestamp), float(value) )
 
97
          points = pickle.loads(buf)
 
98
          log.cache("CarbonLink to %s, retrieved %d points for %s" % (host,len(points),metric))
 
99
 
 
100
          for point in points:
 
101
            yield point
 
102
 
85
103
        except:
86
104
          log.exception("CarbonLink to %s, exception while getting response" % str(host))
87
105
          self.removeConnectionFromPool(host, connection)