1
// Copyright 2015 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
4
// Package sender contains the implementation of the metric
15
"github.com/juju/errors"
17
"github.com/juju/juju/api/metricsadder"
18
"github.com/juju/juju/apiserver/params"
19
"github.com/juju/juju/worker/metrics/spool"
23
defaultSocketName = "metrics-send.socket"
26
type stopper interface {
31
client metricsadder.MetricsAdderClient
32
factory spool.MetricFactory
36
// Do sends metrics from the metric spool to the
37
// controller via an api call.
38
func (s *sender) Do(stop <-chan struct{}) error {
39
reader, err := s.factory.Reader()
41
return errors.Trace(err)
44
return s.sendMetrics(reader)
47
func (s *sender) sendMetrics(reader spool.MetricReader) error {
48
batches, err := reader.Read()
50
return errors.Annotate(err, "failed to open the metric reader")
52
var sendBatches []params.MetricBatchParam
53
for _, batch := range batches {
54
sendBatches = append(sendBatches, spool.APIMetricBatch(batch))
56
results, err := s.client.AddMetricBatches(sendBatches)
58
return errors.Annotate(err, "could not send metrics")
60
for batchUUID, resultErr := range results {
61
// if we fail to send any metric batch we log a warning with the assumption that
62
// the unsent metric batches remain in the spool directory and will be sent to the
63
// controller when the network partition is restored.
64
if _, ok := resultErr.(*params.Error); ok || params.IsCodeAlreadyExists(resultErr) {
65
err := reader.Remove(batchUUID)
67
logger.Errorf("could not remove batch %q from spool: %v", batchUUID, err)
70
logger.Errorf("failed to send batch %q: %v", batchUUID, resultErr)
76
// Handle sends metrics from the spool directory to the
78
func (s *sender) Handle(c net.Conn, _ <-chan struct{}) (err error) {
81
fmt.Fprintf(c, "%v\n", err)
83
fmt.Fprintf(c, "ok\n")
87
// TODO(fwereade): 2016-03-17 lp:1558657
88
if err := c.SetDeadline(time.Now().Add(spool.DefaultTimeout)); err != nil {
89
return errors.Annotate(err, "failed to set the deadline")
91
reader, err := s.factory.Reader()
93
return errors.Trace(err)
96
return s.sendMetrics(reader)
99
func (s *sender) stop() {
100
if s.listener != nil {
105
var socketName = func(baseDir, unitTag string) string {
106
switch runtime.GOOS {
108
return fmt.Sprintf(`\\.\pipe\send-metrics-%s`, unitTag)
110
return path.Join(baseDir, defaultSocketName)
114
func newSender(client metricsadder.MetricsAdderClient, factory spool.MetricFactory, baseDir, unitTag string) (*sender, error) {
119
listener, err := spool.NewSocketListener(socketName(baseDir, unitTag), s)
121
return nil, errors.Trace(err)
123
s.listener = listener