~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/worker/stateconfigwatcher/manifold.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2016 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package stateconfigwatcher
 
5
 
 
6
import (
 
7
        "github.com/juju/errors"
 
8
        "github.com/juju/loggo"
 
9
        "github.com/juju/utils/voyeur"
 
10
        "gopkg.in/juju/names.v2"
 
11
        "launchpad.net/tomb"
 
12
 
 
13
        "github.com/juju/juju/agent"
 
14
        "github.com/juju/juju/worker"
 
15
        "github.com/juju/juju/worker/dependency"
 
16
)
 
17
 
 
18
var logger = loggo.GetLogger("juju.worker.stateconfigwatcher")
 
19
 
 
20
type ManifoldConfig struct {
 
21
        AgentName          string
 
22
        AgentConfigChanged *voyeur.Value
 
23
}
 
24
 
 
25
// Manifold returns a dependency.Manifold which wraps the machine
 
26
// agent's voyeur.Value which gets set whenever it the machine agent's
 
27
// config is changed. Whenever the config is updated the presence of
 
28
// state serving info is checked and if state serving info was added
 
29
// or removed the manifold worker will bounce itself.
 
30
//
 
31
// The manifold offes a single boolean output which will be true if
 
32
// state serving info is available (i.e. the machine agent should be a
 
33
// state server) and false otherwise.
 
34
//
 
35
// This manifold is intended to be used as a dependency for the state
 
36
// manifold.
 
37
func Manifold(config ManifoldConfig) dependency.Manifold {
 
38
        return dependency.Manifold{
 
39
                Inputs: []string{config.AgentName},
 
40
                Start: func(context dependency.Context) (worker.Worker, error) {
 
41
                        var a agent.Agent
 
42
                        if err := context.Get(config.AgentName, &a); err != nil {
 
43
                                return nil, err
 
44
                        }
 
45
 
 
46
                        if config.AgentConfigChanged == nil {
 
47
                                return nil, errors.NotValidf("nil AgentConfigChanged")
 
48
                        }
 
49
 
 
50
                        if _, ok := a.CurrentConfig().Tag().(names.MachineTag); !ok {
 
51
                                return nil, errors.New("manifold can only be used with a machine agent")
 
52
                        }
 
53
 
 
54
                        w := &stateConfigWatcher{
 
55
                                agent:              a,
 
56
                                agentConfigChanged: config.AgentConfigChanged,
 
57
                        }
 
58
                        go func() {
 
59
                                defer w.tomb.Done()
 
60
                                w.tomb.Kill(w.loop())
 
61
                        }()
 
62
                        return w, nil
 
63
                },
 
64
                Output: outputFunc,
 
65
        }
 
66
}
 
67
 
 
68
// outputFunc extracts a bool from a *stateConfigWatcher. If true, the
 
69
// agent is a state server.
 
70
func outputFunc(in worker.Worker, out interface{}) error {
 
71
        inWorker, _ := in.(*stateConfigWatcher)
 
72
        if inWorker == nil {
 
73
                return errors.Errorf("in should be a %T; got %T", inWorker, in)
 
74
        }
 
75
        switch outPointer := out.(type) {
 
76
        case *bool:
 
77
                *outPointer = inWorker.isStateServer()
 
78
        default:
 
79
                return errors.Errorf("out should be *bool; got %T", out)
 
80
        }
 
81
        return nil
 
82
}
 
83
 
 
84
type stateConfigWatcher struct {
 
85
        tomb               tomb.Tomb
 
86
        agent              agent.Agent
 
87
        agentConfigChanged *voyeur.Value
 
88
}
 
89
 
 
90
func (w *stateConfigWatcher) isStateServer() bool {
 
91
        config := w.agent.CurrentConfig()
 
92
        _, ok := config.StateServingInfo()
 
93
        return ok
 
94
}
 
95
 
 
96
func (w *stateConfigWatcher) loop() error {
 
97
        watch := w.agentConfigChanged.Watch()
 
98
        defer watch.Close()
 
99
 
 
100
        lastValue := w.isStateServer()
 
101
 
 
102
        watchCh := make(chan bool)
 
103
        go func() {
 
104
                for {
 
105
                        if watch.Next() {
 
106
                                select {
 
107
                                case <-w.tomb.Dying():
 
108
                                        return
 
109
                                case watchCh <- true:
 
110
                                }
 
111
                        } else {
 
112
                                // watcher or voyeur.Value closed.
 
113
                                close(watchCh)
 
114
                                return
 
115
                        }
 
116
                }
 
117
        }()
 
118
 
 
119
        for {
 
120
                select {
 
121
                case <-w.tomb.Dying():
 
122
                        logger.Infof("tomb dying")
 
123
                        return tomb.ErrDying
 
124
                case _, ok := <-watchCh:
 
125
                        if !ok {
 
126
                                return errors.New("config changed value closed")
 
127
                        }
 
128
                        if w.isStateServer() != lastValue {
 
129
                                // State serving info has been set or unset so restart
 
130
                                // so that dependents get notified. ErrBounce ensures
 
131
                                // that the manifold is restarted quickly.
 
132
                                logger.Debugf("state serving info change in agent config")
 
133
                                return dependency.ErrBounce
 
134
                        }
 
135
                }
 
136
        }
 
137
}
 
138
 
 
139
// Kill implements worker.Worker.
 
140
func (w *stateConfigWatcher) Kill() {
 
141
        w.tomb.Kill(nil)
 
142
}
 
143
 
 
144
// Wait implements worker.Worker.
 
145
func (w *stateConfigWatcher) Wait() error {
 
146
        return w.tomb.Wait()
 
147
}