~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/worker/metrics/sender/sender.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2015 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
// Package sender contains the implementation of the metric
 
5
// sender manifold.
 
6
package sender
 
7
 
 
8
import (
 
9
        "fmt"
 
10
        "net"
 
11
        "path"
 
12
        "runtime"
 
13
        "time"
 
14
 
 
15
        "github.com/juju/errors"
 
16
 
 
17
        "github.com/juju/juju/api/metricsadder"
 
18
        "github.com/juju/juju/apiserver/params"
 
19
        "github.com/juju/juju/worker/metrics/spool"
 
20
)
 
21
 
 
22
const (
 
23
        defaultSocketName = "metrics-send.socket"
 
24
)
 
25
 
 
26
type stopper interface {
 
27
        Stop() error
 
28
}
 
29
 
 
30
type sender struct {
 
31
        client   metricsadder.MetricsAdderClient
 
32
        factory  spool.MetricFactory
 
33
        listener stopper
 
34
}
 
35
 
 
36
// Do sends metrics from the metric spool to the
 
37
// controller via an api call.
 
38
func (s *sender) Do(stop <-chan struct{}) error {
 
39
        reader, err := s.factory.Reader()
 
40
        if err != nil {
 
41
                return errors.Trace(err)
 
42
        }
 
43
        defer reader.Close()
 
44
        return s.sendMetrics(reader)
 
45
}
 
46
 
 
47
func (s *sender) sendMetrics(reader spool.MetricReader) error {
 
48
        batches, err := reader.Read()
 
49
        if err != nil {
 
50
                return errors.Annotate(err, "failed to open the metric reader")
 
51
        }
 
52
        var sendBatches []params.MetricBatchParam
 
53
        for _, batch := range batches {
 
54
                sendBatches = append(sendBatches, spool.APIMetricBatch(batch))
 
55
        }
 
56
        results, err := s.client.AddMetricBatches(sendBatches)
 
57
        if err != nil {
 
58
                return errors.Annotate(err, "could not send metrics")
 
59
        }
 
60
        for batchUUID, resultErr := range results {
 
61
                // if we fail to send any metric batch we log a warning with the assumption that
 
62
                // the unsent metric batches remain in the spool directory and will be sent to the
 
63
                // controller when the network partition is restored.
 
64
                if _, ok := resultErr.(*params.Error); ok || params.IsCodeAlreadyExists(resultErr) {
 
65
                        err := reader.Remove(batchUUID)
 
66
                        if err != nil {
 
67
                                logger.Errorf("could not remove batch %q from spool: %v", batchUUID, err)
 
68
                        }
 
69
                } else {
 
70
                        logger.Errorf("failed to send batch %q: %v", batchUUID, resultErr)
 
71
                }
 
72
        }
 
73
        return nil
 
74
}
 
75
 
 
76
// Handle sends metrics from the spool directory to the
 
77
// controller.
 
78
func (s *sender) Handle(c net.Conn, _ <-chan struct{}) (err error) {
 
79
        defer func() {
 
80
                if err != nil {
 
81
                        fmt.Fprintf(c, "%v\n", err)
 
82
                } else {
 
83
                        fmt.Fprintf(c, "ok\n")
 
84
                }
 
85
                c.Close()
 
86
        }()
 
87
        // TODO(fwereade): 2016-03-17 lp:1558657
 
88
        if err := c.SetDeadline(time.Now().Add(spool.DefaultTimeout)); err != nil {
 
89
                return errors.Annotate(err, "failed to set the deadline")
 
90
        }
 
91
        reader, err := s.factory.Reader()
 
92
        if err != nil {
 
93
                return errors.Trace(err)
 
94
        }
 
95
        defer reader.Close()
 
96
        return s.sendMetrics(reader)
 
97
}
 
98
 
 
99
func (s *sender) stop() {
 
100
        if s.listener != nil {
 
101
                s.listener.Stop()
 
102
        }
 
103
}
 
104
 
 
105
var socketName = func(baseDir, unitTag string) string {
 
106
        switch runtime.GOOS {
 
107
        case "windows":
 
108
                return fmt.Sprintf(`\\.\pipe\send-metrics-%s`, unitTag)
 
109
        default:
 
110
                return path.Join(baseDir, defaultSocketName)
 
111
        }
 
112
}
 
113
 
 
114
func newSender(client metricsadder.MetricsAdderClient, factory spool.MetricFactory, baseDir, unitTag string) (*sender, error) {
 
115
        s := &sender{
 
116
                client:  client,
 
117
                factory: factory,
 
118
        }
 
119
        listener, err := spool.NewSocketListener(socketName(baseDir, unitTag), s)
 
120
        if err != nil {
 
121
                return nil, errors.Trace(err)
 
122
        }
 
123
        s.listener = listener
 
124
        return s, nil
 
125
}