~rogpeppe/juju-core/470-1.16-wallyworld-provisioner-safe-mode

« back to all changes in this revision

Viewing changes to worker/provisioner/provisioner_task.go

  • Committer: Roger Peppe
  • Date: 2013-11-27 18:33:23 UTC
  • Revision ID: roger.peppe@canonical.com-20131127183323-uzhskuad5wi4ux99
worker/provisioner: cherry pick changes from safe-mode changes in trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
        Stop() error
28
28
        Dying() <-chan struct{}
29
29
        Err() error
 
30
 
 
31
        // SetSafeMode sets a flag to indicate whether the provisioner task
 
32
        // runs in safe mode or not. In safe mode, any running instances
 
33
        // which do no exist in state are allowed to keep running rather than
 
34
        // being shut down.
 
35
        SetSafeMode(safeMode bool)
30
36
}
31
37
 
32
38
type Watcher interface {
41
47
 
42
48
func NewProvisionerTask(
43
49
        machineTag string,
 
50
        safeMode bool,
44
51
        machineGetter MachineGetter,
45
52
        watcher Watcher,
46
53
        broker environs.InstanceBroker,
52
59
                machineWatcher: watcher,
53
60
                broker:         broker,
54
61
                auth:           auth,
 
62
                safeMode:       safeMode,
 
63
                safeModeChan:   make(chan bool, 1),
55
64
                machines:       make(map[string]*apiprovisioner.Machine),
56
65
        }
57
66
        go func() {
69
78
        tomb           tomb.Tomb
70
79
        auth           AuthenticationProvider
71
80
 
 
81
        safeMode     bool
 
82
        safeModeChan chan bool
 
83
 
72
84
        // instance id -> instance
73
85
        instances map[instance.Id]instance.Instance
74
86
        // machine id -> machine
102
114
        logger.Infof("Starting up provisioner task %s", task.machineTag)
103
115
        defer watcher.Stop(task.machineWatcher, &task.tomb)
104
116
 
 
117
        // Don't allow the safe mode to change until we have
 
118
        // read at least one set of changes, which will populate
 
119
        // the task.machines map. Otherwise we will potentially
 
120
        // see all legitimate instances as unknown.
 
121
        var safeModeChan chan bool
 
122
 
105
123
        // When the watcher is started, it will have the initial changes be all
106
124
        // the machines that are relevant. Also, since this is available straight
107
125
        // away, we know there will be some changes right off the bat.
114
132
                        if !ok {
115
133
                                return watcher.MustErr(task.machineWatcher)
116
134
                        }
 
135
                        logger.Infof("got watcher changes: %v", ids)
117
136
                        // TODO(dfc; lp:1042717) fire process machines periodically to shut down unknown
118
137
                        // instances.
119
138
                        if err := task.processMachines(ids); err != nil {
120
 
                                logger.Errorf("Process machines failed: %v", err)
121
 
                                return err
 
139
                                return fmt.Errorf("failed to process updated machines: %v", err)
 
140
                        }
 
141
                        // We've seen a set of changes. Enable safe mode change.
 
142
                        safeModeChan = task.safeModeChan
 
143
                case safeMode := <-safeModeChan:
 
144
                        if safeMode == task.safeMode {
 
145
                                break
 
146
                        }
 
147
                        logger.Infof("safe mode changed to %v", safeMode)
 
148
                        task.safeMode = safeMode
 
149
                        if !safeMode {
 
150
                                // Safe mode has been disabled, so process current machines
 
151
                                // so that unknown machines will be immediately dealt with.
 
152
                                if err := task.processMachines(nil); err != nil {
 
153
                                        return fmt.Errorf("failed to process machines after safe mode disabled: %v", err)
 
154
                                }
122
155
                        }
123
156
                }
124
157
        }
125
158
}
126
159
 
 
160
// SetSafeMode implements ProvisionerTask.SetSafeMode().
 
161
func (task *provisionerTask) SetSafeMode(safeMode bool) {
 
162
        select {
 
163
        case task.safeModeChan <- safeMode:
 
164
        case <-task.Dying():
 
165
        }
 
166
}
 
167
 
127
168
func (task *provisionerTask) processMachines(ids []string) error {
128
169
        logger.Tracef("processMachines(%v)", ids)
129
170
        // Populate the tasks maps of current instances and machines.
146
187
        if err != nil {
147
188
                return err
148
189
        }
149
 
 
 
190
        if task.safeMode {
 
191
                logger.Infof("running in safe mode, unknown instances not stopped %v", instanceIds(unknown))
 
192
                unknown = nil
 
193
        }
 
194
        if len(stopping) > 0 {
 
195
                logger.Infof("stopping known instances %v", stopping)
 
196
        }
 
197
        if len(unknown) > 0 {
 
198
                logger.Infof("stopping unknown instances %v", instanceIds(unknown))
 
199
        }
150
200
        // It's important that we stop unknown instances before starting
151
201
        // pending ones, because if we start an instance and then fail to
152
202
        // set its InstanceId on the machine we don't want to start a new
155
205
                return err
156
206
        }
157
207
 
 
208
        // Remove any dead machines from state.
 
209
        for _, machine := range dead {
 
210
                logger.Infof("removing dead machine %q", machine)
 
211
                if err := machine.Remove(); err != nil {
 
212
                        logger.Errorf("failed to remove dead machine %q", machine)
 
213
                }
 
214
                delete(task.machines, machine.Id())
 
215
        }
 
216
 
158
217
        // Start an instance for the pending ones
159
218
        return task.startMachines(pending)
160
219
}
161
220
 
 
221
func instanceIds(instances []instance.Instance) []string {
 
222
        ids := make([]string, 0, len(instances))
 
223
        for _, inst := range instances {
 
224
                ids = append(ids, string(inst.Id()))
 
225
        }
 
226
        return ids
 
227
}
 
228
 
162
229
func (task *provisionerTask) populateMachineMaps(ids []string) error {
163
230
        task.instances = make(map[instance.Id]instance.Instance)
164
231
 
190
257
        return nil
191
258
}
192
259
 
193
 
// pendingOrDead looks up machines with ids and retuns those that do not
 
260
// pendingOrDead looks up machines with ids and returns those that do not
194
261
// have an instance id assigned yet, and also those that are dead.
195
262
func (task *provisionerTask) pendingOrDead(ids []string) (pending, dead []*apiprovisioner.Machine, err error) {
196
263
        for _, id := range ids {
215
282
                        fallthrough
216
283
                case params.Dead:
217
284
                        dead = append(dead, machine)
218
 
                        logger.Infof("removing dead machine %q", machine)
219
 
                        if err := machine.Remove(); err != nil {
220
 
                                logger.Errorf("failed to remove dead machine %q", machine)
221
 
                                return nil, nil, err
222
 
                        }
223
 
                        // now remove it from the machines map
224
 
                        delete(task.machines, machine.Id())
225
285
                        continue
226
286
                }
227
287
                if instId, err := machine.InstanceId(); err != nil {
259
319
        for _, m := range task.machines {
260
320
                if instId, err := m.InstanceId(); err == nil {
261
321
                        delete(instances, instId)
262
 
                } else if !params.IsCodeNotProvisioned(err) {
 
322
                } else if !params.IsCodeNotProvisioned(err) && !params.IsCodeNotFound(err) {
263
323
                        return nil, err
264
324
                }
265
325
        }
266
 
        // Now remove all those instances that we are stopping already, as they
267
 
        // have been removed from the task.machines map.
268
 
        for _, i := range stopping {
269
 
                delete(instances, i.Id())
 
326
        // Now remove all those instances that we are stopping already as we
 
327
        // know about those and don't want to include them in the unknown list.
 
328
        for _, inst := range stopping {
 
329
                delete(instances, inst.Id())
270
330
        }
271
331
        var unknown []instance.Instance
272
 
        for _, i := range instances {
273
 
                unknown = append(unknown, i)
 
332
        for _, inst := range instances {
 
333
                unknown = append(unknown, inst)
274
334
        }
275
 
        logger.Tracef("unknown: %v", unknown)
276
335
        return unknown, nil
277
336
}
278
337
 
300
359
        if len(instances) == 0 {
301
360
                return nil
302
361
        }
303
 
        logger.Debugf("Stopping instances: %v", instances)
304
362
        if err := task.broker.StopInstances(instances); err != nil {
305
363
                logger.Errorf("broker failed to stop instances: %v", err)
306
364
                return err