12
12
See the License for the specific language governing permissions and
13
13
limitations under the License."""
15
import socket, cPickle
16
17
from django.conf import settings
17
from web.logger import log
18
from graphite.logger import log
21
import cPickle as pickle
20
26
class CarbonLinkPool:
56
62
def sendRequest(self, metric):
57
63
"Sends a request and returns a completion callback"
58
64
host = self.selectHost(metric)
59
query = metric + '\x00'
65
query = struct.pack("!L", len(metric)) + metric # 32-bit length prefix string
67
73
def receiveResponse():
71
pkt = connection.recv(65536)
72
assert pkt, "CarbonLink lost connection to %s:%d" % host
74
if buf.endswith('\x00'): break
80
packet = connection.recv(remaining)
81
assert packet, "CarbonLink lost connection to %s:%d" % host
85
if message_size is None:
87
remaining = message_size = struct.unpack("!L", buf)[0]
91
remaining -= len(packet)
76
93
# We're done with the connection for this request, put it in the pool
77
94
self.putConnectionInPool(host, connection)
79
96
# Now parse the response
80
pointStrings = cPickle.loads(buf[:-1])
81
log.cache("CarbonLink to %s, retrieved %d points for %s" % (host,len(pointStrings),metric))
82
for point in pointStrings:
83
(value, timestamp) = point.split(' ',1)
84
yield ( int(timestamp), float(value) )
97
points = pickle.loads(buf)
98
log.cache("CarbonLink to %s, retrieved %d points for %s" % (host,len(points),metric))
86
104
log.exception("CarbonLink to %s, exception while getting response" % str(host))
87
105
self.removeConnectionFromPool(host, connection)