~ubuntu-branches/ubuntu/trusty/juju-core/trusty

« back to all changes in this revision

Viewing changes to src/launchpad.net/juju-core/worker/instancepoller/aggregate.go

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2014-03-24 16:05:44 UTC
  • mfrom: (1.1.20)
  • Revision ID: package-import@ubuntu.com-20140324160544-g8lsfufby18d5fj4
Tags: 1.17.6-0ubuntu1
* New upstream point release, including fixes for:
  - br0 not bought up by cloud-init with MAAS provider (LP: #1271144).
  - ppc64el enablement for juju/lxc (LP: #1273769).
  - juju userdata should not restart networking (LP: #1248283).
  - error detecting hardware characteristics (LP: #1276909).
  - juju instances not including the default security group (LP: #1129720).
  - juju bootstrap does not honor https_proxy (LP: #1240260).
* d/control,rules: Drop BD on bash-completion, install bash-completion
  direct from upstream source code.
* d/rules: Set HOME prior to generating man pages.
* d/control: Drop alternative dependency on mongodb-server; juju now only
  works on trusty with juju-mongodb.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2014 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package instancepoller
 
5
 
 
6
import (
 
7
        "time"
 
8
 
 
9
        "github.com/juju/ratelimit"
 
10
        "launchpad.net/tomb"
 
11
 
 
12
        "launchpad.net/juju-core/environs"
 
13
        "launchpad.net/juju-core/errors"
 
14
        "launchpad.net/juju-core/instance"
 
15
)
 
16
 
 
17
type instanceGetter interface {
 
18
        Instances(ids []instance.Id) ([]instance.Instance, error)
 
19
}
 
20
 
 
21
type aggregator struct {
 
22
        environ instanceGetter
 
23
        reqc    chan instanceInfoReq
 
24
        tomb    tomb.Tomb
 
25
}
 
26
 
 
27
func newAggregator(env instanceGetter) *aggregator {
 
28
        a := &aggregator{
 
29
                environ: env,
 
30
                reqc:    make(chan instanceInfoReq),
 
31
        }
 
32
        go func() {
 
33
                defer a.tomb.Done()
 
34
                a.tomb.Kill(a.loop())
 
35
        }()
 
36
        return a
 
37
}
 
38
 
 
39
type instanceInfoReq struct {
 
40
        instId instance.Id
 
41
        reply  chan<- instanceInfoReply
 
42
}
 
43
 
 
44
type instanceInfoReply struct {
 
45
        info instanceInfo
 
46
        err  error
 
47
}
 
48
 
 
49
func (a *aggregator) instanceInfo(id instance.Id) (instanceInfo, error) {
 
50
        reply := make(chan instanceInfoReply)
 
51
        a.reqc <- instanceInfoReq{
 
52
                instId: id,
 
53
                reply:  reply,
 
54
        }
 
55
        r := <-reply
 
56
        return r.info, r.err
 
57
}
 
58
 
 
59
var gatherTime = 3 * time.Second
 
60
 
 
61
func (a *aggregator) loop() error {
 
62
        timer := time.NewTimer(0)
 
63
        timer.Stop()
 
64
        var reqs []instanceInfoReq
 
65
        // We use a capacity of 1 so that sporadic requests will
 
66
        // be serviced immediately without having to wait.
 
67
        bucket := ratelimit.New(gatherTime, 1)
 
68
        for {
 
69
                select {
 
70
                case <-a.tomb.Dying():
 
71
                        return tomb.ErrDying
 
72
                case req := <-a.reqc:
 
73
                        if len(reqs) == 0 {
 
74
                                waitTime := bucket.Take(1)
 
75
                                timer.Reset(waitTime)
 
76
                        }
 
77
                        reqs = append(reqs, req)
 
78
                case <-timer.C:
 
79
                        ids := make([]instance.Id, len(reqs))
 
80
                        for i, req := range reqs {
 
81
                                ids[i] = req.instId
 
82
                        }
 
83
                        insts, err := a.environ.Instances(ids)
 
84
                        for i, req := range reqs {
 
85
                                var reply instanceInfoReply
 
86
                                if err != nil && err != environs.ErrPartialInstances {
 
87
                                        reply.err = err
 
88
                                } else {
 
89
                                        reply.info, reply.err = a.instInfo(req.instId, insts[i])
 
90
                                }
 
91
                                req.reply <- reply
 
92
                        }
 
93
                        reqs = nil
 
94
                }
 
95
        }
 
96
}
 
97
 
 
98
// instInfo returns the instance info for the given id
 
99
// and instance. If inst is nil, it returns a not-found error.
 
100
func (*aggregator) instInfo(id instance.Id, inst instance.Instance) (instanceInfo, error) {
 
101
        if inst == nil {
 
102
                return instanceInfo{}, errors.NotFoundf("instance %v", id)
 
103
        }
 
104
        addr, err := inst.Addresses()
 
105
        if err != nil {
 
106
                return instanceInfo{}, err
 
107
        }
 
108
        return instanceInfo{
 
109
                addr,
 
110
                inst.Status(),
 
111
        }, nil
 
112
}
 
113
 
 
114
func (a *aggregator) Kill() {
 
115
        a.tomb.Kill(nil)
 
116
}
 
117
 
 
118
func (a *aggregator) Wait() error {
 
119
        return a.tomb.Wait()
 
120
}