~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/worker/provisioner/provisioner.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2012, 2013 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package provisioner
 
5
 
 
6
import (
 
7
        "sync"
 
8
        "time"
 
9
 
 
10
        "github.com/juju/errors"
 
11
        "github.com/juju/loggo"
 
12
        "gopkg.in/juju/names.v2"
 
13
 
 
14
        "github.com/juju/juju/agent"
 
15
        apiprovisioner "github.com/juju/juju/api/provisioner"
 
16
        "github.com/juju/juju/controller/authentication"
 
17
        "github.com/juju/juju/environs"
 
18
        "github.com/juju/juju/environs/config"
 
19
        "github.com/juju/juju/instance"
 
20
        "github.com/juju/juju/watcher"
 
21
        "github.com/juju/juju/worker"
 
22
        "github.com/juju/juju/worker/catacomb"
 
23
)
 
24
 
 
25
var logger = loggo.GetLogger("juju.provisioner")
 
26
 
 
27
// Ensure our structs implement the required Provisioner interface.
 
28
var _ Provisioner = (*environProvisioner)(nil)
 
29
var _ Provisioner = (*containerProvisioner)(nil)
 
30
 
 
31
var (
 
32
        retryStrategyDelay = 10 * time.Second
 
33
        retryStrategyCount = 3
 
34
)
 
35
 
 
36
// Provisioner represents a running provisioner worker.
 
37
type Provisioner interface {
 
38
        worker.Worker
 
39
        getMachineWatcher() (watcher.StringsWatcher, error)
 
40
        getRetryWatcher() (watcher.NotifyWatcher, error)
 
41
}
 
42
 
 
43
// environProvisioner represents a running provisioning worker for machine nodes
 
44
// belonging to an environment.
 
45
type environProvisioner struct {
 
46
        provisioner
 
47
        environ environs.Environ
 
48
        configObserver
 
49
}
 
50
 
 
51
// containerProvisioner represents a running provisioning worker for containers
 
52
// hosted on a machine.
 
53
type containerProvisioner struct {
 
54
        provisioner
 
55
        containerType instance.ContainerType
 
56
        machine       *apiprovisioner.Machine
 
57
        configObserver
 
58
}
 
59
 
 
60
// provisioner providers common behaviour for a running provisioning worker.
 
61
type provisioner struct {
 
62
        Provisioner
 
63
        st          *apiprovisioner.State
 
64
        agentConfig agent.Config
 
65
        broker      environs.InstanceBroker
 
66
        toolsFinder ToolsFinder
 
67
        catacomb    catacomb.Catacomb
 
68
}
 
69
 
 
70
// RetryStrategy defines the retry behavior when encountering a retryable
 
71
// error during provisioning.
 
72
//
 
73
// TODO(katco): 2016-08-09: lp:1611427
 
74
type RetryStrategy struct {
 
75
        retryDelay time.Duration
 
76
        retryCount int
 
77
}
 
78
 
 
79
// NewRetryStrategy returns a new retry strategy with the specified delay and
 
80
// count for use with retryable provisioning errors.
 
81
func NewRetryStrategy(delay time.Duration, count int) RetryStrategy {
 
82
        return RetryStrategy{
 
83
                retryDelay: delay,
 
84
                retryCount: count,
 
85
        }
 
86
}
 
87
 
 
88
// configObserver is implemented so that tests can see
 
89
// when the environment configuration changes.
 
90
type configObserver struct {
 
91
        sync.Mutex
 
92
        observer chan<- *config.Config
 
93
}
 
94
 
 
95
// notify notifies the observer of a configuration change.
 
96
func (o *configObserver) notify(cfg *config.Config) {
 
97
        o.Lock()
 
98
        if o.observer != nil {
 
99
                o.observer <- cfg
 
100
        }
 
101
        o.Unlock()
 
102
}
 
103
 
 
104
// Kill implements worker.Worker.Kill.
 
105
func (p *provisioner) Kill() {
 
106
        p.catacomb.Kill(nil)
 
107
}
 
108
 
 
109
// Wait implements worker.Worker.Wait.
 
110
func (p *provisioner) Wait() error {
 
111
        return p.catacomb.Wait()
 
112
}
 
113
 
 
114
// getToolsFinder returns a ToolsFinder for the provided State.
 
115
// This exists for mocking.
 
116
var getToolsFinder = func(st *apiprovisioner.State) ToolsFinder {
 
117
        return st
 
118
}
 
119
 
 
120
// getStartTask creates a new worker for the provisioner,
 
121
func (p *provisioner) getStartTask(harvestMode config.HarvestMode) (ProvisionerTask, error) {
 
122
        auth, err := authentication.NewAPIAuthenticator(p.st)
 
123
        if err != nil {
 
124
                return nil, err
 
125
        }
 
126
        // Start responding to changes in machines, and to any further updates
 
127
        // to the environment config.
 
128
        machineWatcher, err := p.getMachineWatcher()
 
129
        if err != nil {
 
130
                return nil, err
 
131
        }
 
132
        retryWatcher, err := p.getRetryWatcher()
 
133
        if err != nil && !errors.IsNotImplemented(err) {
 
134
                return nil, err
 
135
        }
 
136
        tag := p.agentConfig.Tag()
 
137
        machineTag, ok := tag.(names.MachineTag)
 
138
        if !ok {
 
139
                errors.Errorf("expected names.MachineTag, got %T", tag)
 
140
        }
 
141
 
 
142
        modelCfg, err := p.st.ModelConfig()
 
143
        if err != nil {
 
144
                return nil, errors.Annotate(err, "could not retrieve the model config.")
 
145
        }
 
146
 
 
147
        controllerCfg, err := p.st.ControllerConfig()
 
148
        if err != nil {
 
149
                return nil, errors.Annotate(err, "could not retrieve the controller config.")
 
150
        }
 
151
 
 
152
        secureServerConnection := false
 
153
        if info, ok := p.agentConfig.StateServingInfo(); ok {
 
154
                secureServerConnection = info.CAPrivateKey != ""
 
155
        }
 
156
        task, err := NewProvisionerTask(
 
157
                controllerCfg.ControllerUUID(),
 
158
                machineTag,
 
159
                harvestMode,
 
160
                p.st,
 
161
                p.toolsFinder,
 
162
                machineWatcher,
 
163
                retryWatcher,
 
164
                p.broker,
 
165
                auth,
 
166
                modelCfg.ImageStream(),
 
167
                secureServerConnection,
 
168
                RetryStrategy{retryDelay: retryStrategyDelay, retryCount: retryStrategyCount},
 
169
        )
 
170
        if err != nil {
 
171
                return nil, errors.Trace(err)
 
172
        }
 
173
        return task, nil
 
174
}
 
175
 
 
176
// NewEnvironProvisioner returns a new Provisioner for an environment.
 
177
// When new machines are added to the state, it allocates instances
 
178
// from the environment and allocates them to the new machines.
 
179
func NewEnvironProvisioner(st *apiprovisioner.State, agentConfig agent.Config, environ environs.Environ) (Provisioner, error) {
 
180
        p := &environProvisioner{
 
181
                provisioner: provisioner{
 
182
                        st:          st,
 
183
                        agentConfig: agentConfig,
 
184
                        toolsFinder: getToolsFinder(st),
 
185
                },
 
186
                environ: environ,
 
187
        }
 
188
        p.Provisioner = p
 
189
        p.broker = environ
 
190
        logger.Tracef("Starting environ provisioner for %q", p.agentConfig.Tag())
 
191
 
 
192
        err := catacomb.Invoke(catacomb.Plan{
 
193
                Site: &p.catacomb,
 
194
                Work: p.loop,
 
195
        })
 
196
        if err != nil {
 
197
                return nil, errors.Trace(err)
 
198
        }
 
199
        return p, nil
 
200
}
 
201
 
 
202
func (p *environProvisioner) loop() error {
 
203
        // TODO(mjs channeling axw) - It would be better if there were
 
204
        // APIs to watch and fetch provisioner specific config instead of
 
205
        // watcher for all changes to model config. This would avoid the
 
206
        // need for a full model config.
 
207
        var modelConfigChanges <-chan struct{}
 
208
        modelWatcher, err := p.st.WatchForModelConfigChanges()
 
209
        if err != nil {
 
210
                return loggedErrorStack(errors.Trace(err))
 
211
        }
 
212
        if err := p.catacomb.Add(modelWatcher); err != nil {
 
213
                return errors.Trace(err)
 
214
        }
 
215
        modelConfigChanges = modelWatcher.Changes()
 
216
 
 
217
        modelConfig := p.environ.Config()
 
218
        p.configObserver.notify(modelConfig)
 
219
        harvestMode := modelConfig.ProvisionerHarvestMode()
 
220
        task, err := p.getStartTask(harvestMode)
 
221
        if err != nil {
 
222
                return loggedErrorStack(errors.Trace(err))
 
223
        }
 
224
        if err := p.catacomb.Add(task); err != nil {
 
225
                return errors.Trace(err)
 
226
        }
 
227
 
 
228
        for {
 
229
                select {
 
230
                case <-p.catacomb.Dying():
 
231
                        return p.catacomb.ErrDying()
 
232
                case _, ok := <-modelConfigChanges:
 
233
                        if !ok {
 
234
                                return errors.New("model configuration watcher closed")
 
235
                        }
 
236
                        modelConfig, err := p.st.ModelConfig()
 
237
                        if err != nil {
 
238
                                return errors.Annotate(err, "cannot load model configuration")
 
239
                        }
 
240
                        if err := p.setConfig(modelConfig); err != nil {
 
241
                                return errors.Annotate(err, "loaded invalid model configuration")
 
242
                        }
 
243
                        task.SetHarvestMode(modelConfig.ProvisionerHarvestMode())
 
244
                }
 
245
        }
 
246
}
 
247
 
 
248
func (p *environProvisioner) getMachineWatcher() (watcher.StringsWatcher, error) {
 
249
        return p.st.WatchModelMachines()
 
250
}
 
251
 
 
252
func (p *environProvisioner) getRetryWatcher() (watcher.NotifyWatcher, error) {
 
253
        return p.st.WatchMachineErrorRetry()
 
254
}
 
255
 
 
256
// setConfig updates the environment configuration and notifies
 
257
// the config observer.
 
258
func (p *environProvisioner) setConfig(modelConfig *config.Config) error {
 
259
        if err := p.environ.SetConfig(modelConfig); err != nil {
 
260
                return err
 
261
        }
 
262
        p.configObserver.notify(modelConfig)
 
263
        return nil
 
264
}
 
265
 
 
266
// NewContainerProvisioner returns a new Provisioner. When new machines
 
267
// are added to the state, it allocates instances from the environment
 
268
// and allocates them to the new machines.
 
269
func NewContainerProvisioner(
 
270
        containerType instance.ContainerType,
 
271
        st *apiprovisioner.State,
 
272
        agentConfig agent.Config,
 
273
        broker environs.InstanceBroker,
 
274
        toolsFinder ToolsFinder,
 
275
) (Provisioner, error) {
 
276
 
 
277
        p := &containerProvisioner{
 
278
                provisioner: provisioner{
 
279
                        st:          st,
 
280
                        agentConfig: agentConfig,
 
281
                        broker:      broker,
 
282
                        toolsFinder: toolsFinder,
 
283
                },
 
284
                containerType: containerType,
 
285
        }
 
286
        p.Provisioner = p
 
287
        logger.Tracef("Starting %s provisioner for %q", p.containerType, p.agentConfig.Tag())
 
288
 
 
289
        err := catacomb.Invoke(catacomb.Plan{
 
290
                Site: &p.catacomb,
 
291
                Work: p.loop,
 
292
        })
 
293
        if err != nil {
 
294
                return nil, errors.Trace(err)
 
295
        }
 
296
        return p, nil
 
297
}
 
298
 
 
299
func (p *containerProvisioner) loop() error {
 
300
        modelWatcher, err := p.st.WatchForModelConfigChanges()
 
301
        if err != nil {
 
302
                return errors.Trace(err)
 
303
        }
 
304
        if err := p.catacomb.Add(modelWatcher); err != nil {
 
305
                return errors.Trace(err)
 
306
        }
 
307
 
 
308
        modelConfig, err := p.st.ModelConfig()
 
309
        if err != nil {
 
310
                return err
 
311
        }
 
312
        p.configObserver.notify(modelConfig)
 
313
        harvestMode := modelConfig.ProvisionerHarvestMode()
 
314
 
 
315
        task, err := p.getStartTask(harvestMode)
 
316
        if err != nil {
 
317
                return err
 
318
        }
 
319
        if err := p.catacomb.Add(task); err != nil {
 
320
                return errors.Trace(err)
 
321
        }
 
322
 
 
323
        for {
 
324
                select {
 
325
                case <-p.catacomb.Dying():
 
326
                        return p.catacomb.ErrDying()
 
327
                case _, ok := <-modelWatcher.Changes():
 
328
                        if !ok {
 
329
                                return errors.New("model configuration watch closed")
 
330
                        }
 
331
                        modelConfig, err := p.st.ModelConfig()
 
332
                        if err != nil {
 
333
                                return errors.Annotate(err, "cannot load model configuration")
 
334
                        }
 
335
                        p.configObserver.notify(modelConfig)
 
336
                        task.SetHarvestMode(modelConfig.ProvisionerHarvestMode())
 
337
                }
 
338
        }
 
339
}
 
340
 
 
341
func (p *containerProvisioner) getMachine() (*apiprovisioner.Machine, error) {
 
342
        if p.machine == nil {
 
343
                tag := p.agentConfig.Tag()
 
344
                machineTag, ok := tag.(names.MachineTag)
 
345
                if !ok {
 
346
                        return nil, errors.Errorf("expected names.MachineTag, got %T", tag)
 
347
                }
 
348
                var err error
 
349
                if p.machine, err = p.st.Machine(machineTag); err != nil {
 
350
                        logger.Errorf("%s is not in state", machineTag)
 
351
                        return nil, err
 
352
                }
 
353
        }
 
354
        return p.machine, nil
 
355
}
 
356
 
 
357
func (p *containerProvisioner) getMachineWatcher() (watcher.StringsWatcher, error) {
 
358
        machine, err := p.getMachine()
 
359
        if err != nil {
 
360
                return nil, err
 
361
        }
 
362
        return machine.WatchContainers(p.containerType)
 
363
}
 
364
 
 
365
func (p *containerProvisioner) getRetryWatcher() (watcher.NotifyWatcher, error) {
 
366
        return nil, errors.NotImplementedf("getRetryWatcher")
 
367
}