1
// Copyright 2012, 2013 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
10
"github.com/juju/errors"
11
"github.com/juju/loggo"
12
"gopkg.in/juju/names.v2"
14
"github.com/juju/juju/agent"
15
apiprovisioner "github.com/juju/juju/api/provisioner"
16
"github.com/juju/juju/controller/authentication"
17
"github.com/juju/juju/environs"
18
"github.com/juju/juju/environs/config"
19
"github.com/juju/juju/instance"
20
"github.com/juju/juju/watcher"
21
"github.com/juju/juju/worker"
22
"github.com/juju/juju/worker/catacomb"
25
var logger = loggo.GetLogger("juju.provisioner")
27
// Ensure our structs implement the required Provisioner interface.
28
var _ Provisioner = (*environProvisioner)(nil)
29
var _ Provisioner = (*containerProvisioner)(nil)
32
retryStrategyDelay = 10 * time.Second
33
retryStrategyCount = 3
36
// Provisioner represents a running provisioner worker.
37
type Provisioner interface {
39
getMachineWatcher() (watcher.StringsWatcher, error)
40
getRetryWatcher() (watcher.NotifyWatcher, error)
43
// environProvisioner represents a running provisioning worker for machine nodes
44
// belonging to an environment.
45
type environProvisioner struct {
47
environ environs.Environ
51
// containerProvisioner represents a running provisioning worker for containers
52
// hosted on a machine.
53
type containerProvisioner struct {
55
containerType instance.ContainerType
56
machine *apiprovisioner.Machine
60
// provisioner providers common behaviour for a running provisioning worker.
61
type provisioner struct {
63
st *apiprovisioner.State
64
agentConfig agent.Config
65
broker environs.InstanceBroker
66
toolsFinder ToolsFinder
67
catacomb catacomb.Catacomb
70
// RetryStrategy defines the retry behavior when encountering a retryable
71
// error during provisioning.
73
// TODO(katco): 2016-08-09: lp:1611427
74
type RetryStrategy struct {
75
retryDelay time.Duration
79
// NewRetryStrategy returns a new retry strategy with the specified delay and
80
// count for use with retryable provisioning errors.
81
func NewRetryStrategy(delay time.Duration, count int) RetryStrategy {
88
// configObserver is implemented so that tests can see
89
// when the environment configuration changes.
90
type configObserver struct {
92
observer chan<- *config.Config
95
// notify notifies the observer of a configuration change.
96
func (o *configObserver) notify(cfg *config.Config) {
98
if o.observer != nil {
104
// Kill implements worker.Worker.Kill.
105
func (p *provisioner) Kill() {
109
// Wait implements worker.Worker.Wait.
110
func (p *provisioner) Wait() error {
111
return p.catacomb.Wait()
114
// getToolsFinder returns a ToolsFinder for the provided State.
115
// This exists for mocking.
116
var getToolsFinder = func(st *apiprovisioner.State) ToolsFinder {
120
// getStartTask creates a new worker for the provisioner,
121
func (p *provisioner) getStartTask(harvestMode config.HarvestMode) (ProvisionerTask, error) {
122
auth, err := authentication.NewAPIAuthenticator(p.st)
126
// Start responding to changes in machines, and to any further updates
127
// to the environment config.
128
machineWatcher, err := p.getMachineWatcher()
132
retryWatcher, err := p.getRetryWatcher()
133
if err != nil && !errors.IsNotImplemented(err) {
136
tag := p.agentConfig.Tag()
137
machineTag, ok := tag.(names.MachineTag)
139
errors.Errorf("expected names.MachineTag, got %T", tag)
142
modelCfg, err := p.st.ModelConfig()
144
return nil, errors.Annotate(err, "could not retrieve the model config.")
147
controllerCfg, err := p.st.ControllerConfig()
149
return nil, errors.Annotate(err, "could not retrieve the controller config.")
152
secureServerConnection := false
153
if info, ok := p.agentConfig.StateServingInfo(); ok {
154
secureServerConnection = info.CAPrivateKey != ""
156
task, err := NewProvisionerTask(
157
controllerCfg.ControllerUUID(),
166
modelCfg.ImageStream(),
167
secureServerConnection,
168
RetryStrategy{retryDelay: retryStrategyDelay, retryCount: retryStrategyCount},
171
return nil, errors.Trace(err)
176
// NewEnvironProvisioner returns a new Provisioner for an environment.
177
// When new machines are added to the state, it allocates instances
178
// from the environment and allocates them to the new machines.
179
func NewEnvironProvisioner(st *apiprovisioner.State, agentConfig agent.Config, environ environs.Environ) (Provisioner, error) {
180
p := &environProvisioner{
181
provisioner: provisioner{
183
agentConfig: agentConfig,
184
toolsFinder: getToolsFinder(st),
190
logger.Tracef("Starting environ provisioner for %q", p.agentConfig.Tag())
192
err := catacomb.Invoke(catacomb.Plan{
197
return nil, errors.Trace(err)
202
func (p *environProvisioner) loop() error {
203
// TODO(mjs channeling axw) - It would be better if there were
204
// APIs to watch and fetch provisioner specific config instead of
205
// watcher for all changes to model config. This would avoid the
206
// need for a full model config.
207
var modelConfigChanges <-chan struct{}
208
modelWatcher, err := p.st.WatchForModelConfigChanges()
210
return loggedErrorStack(errors.Trace(err))
212
if err := p.catacomb.Add(modelWatcher); err != nil {
213
return errors.Trace(err)
215
modelConfigChanges = modelWatcher.Changes()
217
modelConfig := p.environ.Config()
218
p.configObserver.notify(modelConfig)
219
harvestMode := modelConfig.ProvisionerHarvestMode()
220
task, err := p.getStartTask(harvestMode)
222
return loggedErrorStack(errors.Trace(err))
224
if err := p.catacomb.Add(task); err != nil {
225
return errors.Trace(err)
230
case <-p.catacomb.Dying():
231
return p.catacomb.ErrDying()
232
case _, ok := <-modelConfigChanges:
234
return errors.New("model configuration watcher closed")
236
modelConfig, err := p.st.ModelConfig()
238
return errors.Annotate(err, "cannot load model configuration")
240
if err := p.setConfig(modelConfig); err != nil {
241
return errors.Annotate(err, "loaded invalid model configuration")
243
task.SetHarvestMode(modelConfig.ProvisionerHarvestMode())
248
func (p *environProvisioner) getMachineWatcher() (watcher.StringsWatcher, error) {
249
return p.st.WatchModelMachines()
252
func (p *environProvisioner) getRetryWatcher() (watcher.NotifyWatcher, error) {
253
return p.st.WatchMachineErrorRetry()
256
// setConfig updates the environment configuration and notifies
257
// the config observer.
258
func (p *environProvisioner) setConfig(modelConfig *config.Config) error {
259
if err := p.environ.SetConfig(modelConfig); err != nil {
262
p.configObserver.notify(modelConfig)
266
// NewContainerProvisioner returns a new Provisioner. When new machines
267
// are added to the state, it allocates instances from the environment
268
// and allocates them to the new machines.
269
func NewContainerProvisioner(
270
containerType instance.ContainerType,
271
st *apiprovisioner.State,
272
agentConfig agent.Config,
273
broker environs.InstanceBroker,
274
toolsFinder ToolsFinder,
275
) (Provisioner, error) {
277
p := &containerProvisioner{
278
provisioner: provisioner{
280
agentConfig: agentConfig,
282
toolsFinder: toolsFinder,
284
containerType: containerType,
287
logger.Tracef("Starting %s provisioner for %q", p.containerType, p.agentConfig.Tag())
289
err := catacomb.Invoke(catacomb.Plan{
294
return nil, errors.Trace(err)
299
func (p *containerProvisioner) loop() error {
300
modelWatcher, err := p.st.WatchForModelConfigChanges()
302
return errors.Trace(err)
304
if err := p.catacomb.Add(modelWatcher); err != nil {
305
return errors.Trace(err)
308
modelConfig, err := p.st.ModelConfig()
312
p.configObserver.notify(modelConfig)
313
harvestMode := modelConfig.ProvisionerHarvestMode()
315
task, err := p.getStartTask(harvestMode)
319
if err := p.catacomb.Add(task); err != nil {
320
return errors.Trace(err)
325
case <-p.catacomb.Dying():
326
return p.catacomb.ErrDying()
327
case _, ok := <-modelWatcher.Changes():
329
return errors.New("model configuration watch closed")
331
modelConfig, err := p.st.ModelConfig()
333
return errors.Annotate(err, "cannot load model configuration")
335
p.configObserver.notify(modelConfig)
336
task.SetHarvestMode(modelConfig.ProvisionerHarvestMode())
341
func (p *containerProvisioner) getMachine() (*apiprovisioner.Machine, error) {
342
if p.machine == nil {
343
tag := p.agentConfig.Tag()
344
machineTag, ok := tag.(names.MachineTag)
346
return nil, errors.Errorf("expected names.MachineTag, got %T", tag)
349
if p.machine, err = p.st.Machine(machineTag); err != nil {
350
logger.Errorf("%s is not in state", machineTag)
354
return p.machine, nil
357
func (p *containerProvisioner) getMachineWatcher() (watcher.StringsWatcher, error) {
358
machine, err := p.getMachine()
362
return machine.WatchContainers(p.containerType)
365
func (p *containerProvisioner) getRetryWatcher() (watcher.NotifyWatcher, error) {
366
return nil, errors.NotImplementedf("getRetryWatcher")