~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/worker/peergrouper/worker.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2014 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package peergrouper
 
5
 
 
6
import (
 
7
        "fmt"
 
8
        "time"
 
9
 
 
10
        "github.com/juju/errors"
 
11
        "github.com/juju/loggo"
 
12
        "github.com/juju/replicaset"
 
13
 
 
14
        "github.com/juju/juju/instance"
 
15
        "github.com/juju/juju/network"
 
16
        "github.com/juju/juju/state"
 
17
        "github.com/juju/juju/status"
 
18
        "github.com/juju/juju/worker"
 
19
        "github.com/juju/juju/worker/catacomb"
 
20
)
 
21
 
 
22
var logger = loggo.GetLogger("juju.worker.peergrouper")
 
23
 
 
24
type stateInterface interface {
 
25
        Machine(id string) (stateMachine, error)
 
26
        WatchControllerInfo() state.NotifyWatcher
 
27
        WatchControllerStatusChanges() state.StringsWatcher
 
28
        ControllerInfo() (*state.ControllerInfo, error)
 
29
        MongoSession() mongoSession
 
30
        Space(id string) (SpaceReader, error)
 
31
        SetOrGetMongoSpaceName(spaceName network.SpaceName) (network.SpaceName, error)
 
32
        SetMongoSpaceState(mongoSpaceState state.MongoSpaceStates) error
 
33
}
 
34
 
 
35
type stateMachine interface {
 
36
        Id() string
 
37
        InstanceId() (instance.Id, error)
 
38
        Status() (status.StatusInfo, error)
 
39
        Refresh() error
 
40
        Watch() state.NotifyWatcher
 
41
        WantsVote() bool
 
42
        HasVote() bool
 
43
        SetHasVote(hasVote bool) error
 
44
        APIHostPorts() []network.HostPort
 
45
        MongoHostPorts() []network.HostPort
 
46
}
 
47
 
 
48
type mongoSession interface {
 
49
        CurrentStatus() (*replicaset.Status, error)
 
50
        CurrentMembers() ([]replicaset.Member, error)
 
51
        Set([]replicaset.Member) error
 
52
}
 
53
 
 
54
type publisherInterface interface {
 
55
        // publish publishes information about the given controllers
 
56
        // to whomsoever it may concern. When it is called there
 
57
        // is no guarantee that any of the information has actually changed.
 
58
        publishAPIServers(apiServers [][]network.HostPort, instanceIds []instance.Id) error
 
59
}
 
60
 
 
61
var (
 
62
        // If we fail to set the mongo replica set members,
 
63
        // we start retrying with the following interval,
 
64
        // before exponentially backing off with each further
 
65
        // attempt.
 
66
        initialRetryInterval = 2 * time.Second
 
67
 
 
68
        // maxRetryInterval holds the maximum interval
 
69
        // between retry attempts.
 
70
        maxRetryInterval = 5 * time.Minute
 
71
 
 
72
        // pollInterval holds the interval at which the replica set
 
73
        // members will be updated even in the absence of changes
 
74
        // to State. This enables us to make changes to members
 
75
        // that are triggered by changes to member status.
 
76
        pollInterval = 1 * time.Minute
 
77
)
 
78
 
 
79
// pgWorker is a worker which watches the controller machines in state
 
80
// as well as the MongoDB replicaset configuration, adding and
 
81
// removing controller machines as they change or are added and
 
82
// removed.
 
83
type pgWorker struct {
 
84
        catacomb catacomb.Catacomb
 
85
 
 
86
        // st represents the State. It is an interface so we can swap
 
87
        // out the implementation during testing.
 
88
        st stateInterface
 
89
 
 
90
        // machineChanges receives events from the machineTrackers when
 
91
        // controller machines change in ways that are relevant to the
 
92
        // peergrouper.
 
93
        machineChanges chan struct{}
 
94
 
 
95
        // machineTrackers holds the workers which track the machines we
 
96
        // are currently watching (all the controller machines).
 
97
        machineTrackers map[string]*machineTracker
 
98
 
 
99
        // publisher holds the implementation of the API
 
100
        // address publisher.
 
101
        publisher publisherInterface
 
102
 
 
103
        providerSupportsSpaces bool
 
104
}
 
105
 
 
106
// New returns a new worker that maintains the mongo replica set
 
107
// with respect to the given state.
 
108
func New(st *state.State, supportsSpaces bool) (worker.Worker, error) {
 
109
        cfg, err := st.ControllerConfig()
 
110
        if err != nil {
 
111
                return nil, err
 
112
        }
 
113
        shim := &stateShim{
 
114
                State:     st,
 
115
                mongoPort: cfg.StatePort(),
 
116
                apiPort:   cfg.APIPort(),
 
117
        }
 
118
        return newWorker(shim, newPublisher(st), supportsSpaces)
 
119
}
 
120
 
 
121
func newWorker(st stateInterface, pub publisherInterface, supportsSpaces bool) (worker.Worker, error) {
 
122
        w := &pgWorker{
 
123
                st:                     st,
 
124
                machineChanges:         make(chan struct{}),
 
125
                machineTrackers:        make(map[string]*machineTracker),
 
126
                publisher:              pub,
 
127
                providerSupportsSpaces: supportsSpaces,
 
128
        }
 
129
        err := catacomb.Invoke(catacomb.Plan{
 
130
                Site: &w.catacomb,
 
131
                Work: w.loop,
 
132
        })
 
133
        if err != nil {
 
134
                return nil, errors.Trace(err)
 
135
        }
 
136
        return w, nil
 
137
}
 
138
 
 
139
func (w *pgWorker) Kill() {
 
140
        w.catacomb.Kill(nil)
 
141
}
 
142
 
 
143
func (w *pgWorker) Wait() error {
 
144
        return w.catacomb.Wait()
 
145
}
 
146
 
 
147
func (w *pgWorker) loop() error {
 
148
        controllerChanges, err := w.watchForControllerChanges()
 
149
        if err != nil {
 
150
                return errors.Trace(err)
 
151
        }
 
152
 
 
153
        var updateChan <-chan time.Time
 
154
        retryInterval := initialRetryInterval
 
155
 
 
156
        for {
 
157
                select {
 
158
                case <-w.catacomb.Dying():
 
159
                        return w.catacomb.ErrDying()
 
160
                case <-controllerChanges:
 
161
                        changed, err := w.updateControllerMachines()
 
162
                        if err != nil {
 
163
                                return errors.Trace(err)
 
164
                        }
 
165
                        if changed {
 
166
                                // A controller machine was added or removed, update
 
167
                                // the replica set immediately.
 
168
                                // TODO(fwereade): 2016-03-17 lp:1558657
 
169
                                updateChan = time.After(0)
 
170
                        }
 
171
 
 
172
                case <-w.machineChanges:
 
173
                        // One of the controller machines changed, update the
 
174
                        // replica set immediately.
 
175
                        // TODO(fwereade): 2016-03-17 lp:1558657
 
176
                        updateChan = time.After(0)
 
177
 
 
178
                case <-updateChan:
 
179
                        ok := true
 
180
                        servers, instanceIds, err := w.apiPublishInfo()
 
181
                        if err != nil {
 
182
                                return fmt.Errorf("cannot get API server info: %v", err)
 
183
                        }
 
184
                        if err := w.publisher.publishAPIServers(servers, instanceIds); err != nil {
 
185
                                logger.Errorf("cannot publish API server addresses: %v", err)
 
186
                                ok = false
 
187
                        }
 
188
                        if err := w.updateReplicaset(); err != nil {
 
189
                                if _, isReplicaSetError := err.(*replicaSetError); !isReplicaSetError {
 
190
                                        return err
 
191
                                }
 
192
                                logger.Errorf("cannot set replicaset: %v", err)
 
193
                                ok = false
 
194
                        }
 
195
                        if ok {
 
196
                                // Update the replica set members occasionally
 
197
                                // to keep them up to date with the current
 
198
                                // replica set member statuses.
 
199
                                // TODO(fwereade): 2016-03-17 lp:1558657
 
200
                                updateChan = time.After(pollInterval)
 
201
                                retryInterval = initialRetryInterval
 
202
                        } else {
 
203
                                // TODO(fwereade): 2016-03-17 lp:1558657
 
204
                                updateChan = time.After(retryInterval)
 
205
                                retryInterval *= 2
 
206
                                if retryInterval > maxRetryInterval {
 
207
                                        retryInterval = maxRetryInterval
 
208
                                }
 
209
                        }
 
210
 
 
211
                }
 
212
        }
 
213
}
 
214
 
 
215
// watchForControllerChanges starts two watchers pertaining to changes
 
216
// to the controllers, returning a channel which will receive events
 
217
// if either watcher fires.
 
218
func (w *pgWorker) watchForControllerChanges() (<-chan struct{}, error) {
 
219
        controllerInfoWatcher := w.st.WatchControllerInfo()
 
220
        if err := w.catacomb.Add(controllerInfoWatcher); err != nil {
 
221
                return nil, errors.Trace(err)
 
222
        }
 
223
 
 
224
        controllerStatusWatcher := w.st.WatchControllerStatusChanges()
 
225
        if err := w.catacomb.Add(controllerStatusWatcher); err != nil {
 
226
                return nil, errors.Trace(err)
 
227
        }
 
228
 
 
229
        out := make(chan struct{})
 
230
        go func() {
 
231
                for {
 
232
                        select {
 
233
                        case <-w.catacomb.Dying():
 
234
                                return
 
235
                        case <-controllerInfoWatcher.Changes():
 
236
                                out <- struct{}{}
 
237
                        case <-controllerStatusWatcher.Changes():
 
238
                                out <- struct{}{}
 
239
                        }
 
240
                }
 
241
        }()
 
242
        return out, nil
 
243
}
 
244
 
 
245
// updateControllerMachines updates the peergrouper's current list of
 
246
// controller machines, as well as starting and stopping trackers for
 
247
// them as they are added and removed.
 
248
func (w *pgWorker) updateControllerMachines() (bool, error) {
 
249
        info, err := w.st.ControllerInfo()
 
250
        if err != nil {
 
251
                return false, fmt.Errorf("cannot get controller info: %v", err)
 
252
        }
 
253
 
 
254
        logger.Debugf("controller machines in state: %#v", info.MachineIds)
 
255
        changed := false
 
256
 
 
257
        // Stop machine goroutines that no longer correspond to controller
 
258
        // machines.
 
259
        for _, m := range w.machineTrackers {
 
260
                if !inStrings(m.Id(), info.MachineIds) {
 
261
                        worker.Stop(m)
 
262
                        delete(w.machineTrackers, m.Id())
 
263
                        changed = true
 
264
                }
 
265
        }
 
266
 
 
267
        // Start machines with no watcher
 
268
        for _, id := range info.MachineIds {
 
269
                if _, ok := w.machineTrackers[id]; ok {
 
270
                        continue
 
271
                }
 
272
                logger.Debugf("found new machine %q", id)
 
273
                stm, err := w.st.Machine(id)
 
274
                if err != nil {
 
275
                        if errors.IsNotFound(err) {
 
276
                                // If the machine isn't found, it must have been
 
277
                                // removed and will soon enough be removed
 
278
                                // from the controller list. This will probably
 
279
                                // never happen, but we'll code defensively anyway.
 
280
                                logger.Warningf("machine %q from controller list not found", id)
 
281
                                continue
 
282
                        }
 
283
                        return false, fmt.Errorf("cannot get machine %q: %v", id, err)
 
284
                }
 
285
 
 
286
                // Don't add the machine unless it is "Started"
 
287
                machineStatus, err := stm.Status()
 
288
                if err != nil {
 
289
                        return false, errors.Annotatef(err, "cannot get status for machine %q", id)
 
290
                }
 
291
                if machineStatus.Status == status.StatusStarted {
 
292
                        logger.Debugf("machine %q has started, adding it to peergrouper list", id)
 
293
                        tracker, err := newMachineTracker(stm, w.machineChanges)
 
294
                        if err != nil {
 
295
                                return false, errors.Trace(err)
 
296
                        }
 
297
                        if err := w.catacomb.Add(tracker); err != nil {
 
298
                                return false, errors.Trace(err)
 
299
                        }
 
300
                        w.machineTrackers[id] = tracker
 
301
                        changed = true
 
302
                } else {
 
303
                        logger.Debugf("machine %q not ready: %v", id, machineStatus.Status)
 
304
                }
 
305
 
 
306
        }
 
307
        return changed, nil
 
308
}
 
309
 
 
310
func inStrings(t string, ss []string) bool {
 
311
        for _, s := range ss {
 
312
                if s == t {
 
313
                        return true
 
314
                }
 
315
        }
 
316
        return false
 
317
}
 
318
 
 
319
func (w *pgWorker) apiPublishInfo() ([][]network.HostPort, []instance.Id, error) {
 
320
        servers := make([][]network.HostPort, 0, len(w.machineTrackers))
 
321
        instanceIds := make([]instance.Id, 0, len(w.machineTrackers))
 
322
        for _, m := range w.machineTrackers {
 
323
                if len(m.APIHostPorts()) == 0 {
 
324
                        continue
 
325
                }
 
326
                instanceId, err := m.stm.InstanceId()
 
327
                if err != nil {
 
328
                        return nil, nil, err
 
329
                }
 
330
                instanceIds = append(instanceIds, instanceId)
 
331
                servers = append(servers, m.APIHostPorts())
 
332
 
 
333
        }
 
334
        return servers, instanceIds, nil
 
335
}
 
336
 
 
337
// peerGroupInfo collates current session information about the
 
338
// mongo peer group with information from state machines.
 
339
func (w *pgWorker) peerGroupInfo() (*peerGroupInfo, error) {
 
340
        session := w.st.MongoSession()
 
341
        info := &peerGroupInfo{}
 
342
        var err error
 
343
        status, err := session.CurrentStatus()
 
344
        if err != nil {
 
345
                return nil, fmt.Errorf("cannot get replica set status: %v", err)
 
346
        }
 
347
        info.statuses = status.Members
 
348
        info.members, err = session.CurrentMembers()
 
349
        if err != nil {
 
350
                return nil, fmt.Errorf("cannot get replica set members: %v", err)
 
351
        }
 
352
        info.machineTrackers = w.machineTrackers
 
353
 
 
354
        spaceName, err := w.getMongoSpace(mongoAddresses(info.machineTrackers))
 
355
        if err != nil {
 
356
                return nil, err
 
357
        }
 
358
        info.mongoSpace = spaceName
 
359
 
 
360
        return info, nil
 
361
}
 
362
 
 
363
func mongoAddresses(machines map[string]*machineTracker) [][]network.Address {
 
364
        addresses := make([][]network.Address, len(machines))
 
365
        i := 0
 
366
        for _, m := range machines {
 
367
                for _, hp := range m.MongoHostPorts() {
 
368
                        addresses[i] = append(addresses[i], hp.Address)
 
369
                }
 
370
                i++
 
371
        }
 
372
        return addresses
 
373
}
 
374
 
 
375
// getMongoSpace updates info with the space that Mongo servers should exist in.
 
376
func (w *pgWorker) getMongoSpace(addrs [][]network.Address) (network.SpaceName, error) {
 
377
        unset := network.SpaceName("")
 
378
 
 
379
        stateInfo, err := w.st.ControllerInfo()
 
380
        if err != nil {
 
381
                return unset, errors.Annotate(err, "cannot get state server info")
 
382
        }
 
383
 
 
384
        switch stateInfo.MongoSpaceState {
 
385
        case state.MongoSpaceUnknown:
 
386
                if !w.providerSupportsSpaces {
 
387
                        err := w.st.SetMongoSpaceState(state.MongoSpaceUnsupported)
 
388
                        if err != nil {
 
389
                                return unset, errors.Annotate(err, "cannot set Mongo space state")
 
390
                        }
 
391
                        return unset, nil
 
392
                }
 
393
 
 
394
                // We want to find a space that contains all Mongo servers so we can
 
395
                // use it to look up the IP address of each Mongo server to be used
 
396
                // to set up the peer group.
 
397
                spaceStats := generateSpaceStats(addrs)
 
398
                if spaceStats.LargestSpaceContainsAll == false {
 
399
                        err := w.st.SetMongoSpaceState(state.MongoSpaceInvalid)
 
400
                        if err != nil {
 
401
                                return unset, errors.Annotate(err, "cannot set Mongo space state")
 
402
                        }
 
403
                        logger.Warningf("couldn't find a space containing all peer group machines")
 
404
                        return unset, nil
 
405
                } else {
 
406
                        spaceName, err := w.st.SetOrGetMongoSpaceName(spaceStats.LargestSpace)
 
407
                        if err != nil {
 
408
                                return unset, errors.Annotate(err, "error setting/getting Mongo space")
 
409
                        }
 
410
                        return spaceName, nil
 
411
                }
 
412
 
 
413
        case state.MongoSpaceValid:
 
414
                space, err := w.st.Space(stateInfo.MongoSpaceName)
 
415
                if err != nil {
 
416
                        return unset, errors.Annotate(err, "looking up space")
 
417
                }
 
418
                return network.SpaceName(space.Name()), nil
 
419
        }
 
420
 
 
421
        return unset, nil
 
422
}
 
423
 
 
424
// replicaSetError holds an error returned as a result
 
425
// of calling replicaset.Set. As this is expected to fail
 
426
// in the normal course of things, it needs special treatment.
 
427
type replicaSetError struct {
 
428
        error
 
429
}
 
430
 
 
431
// updateReplicaset sets the current replica set members, and applies the
 
432
// given voting status to machines in the state.
 
433
func (w *pgWorker) updateReplicaset() error {
 
434
        info, err := w.peerGroupInfo()
 
435
        if err != nil {
 
436
                return errors.Annotate(err, "cannot get peergrouper info")
 
437
        }
 
438
        members, voting, err := desiredPeerGroup(info)
 
439
        if err != nil {
 
440
                return fmt.Errorf("cannot compute desired peer group: %v", err)
 
441
        }
 
442
        if members != nil {
 
443
                logger.Debugf("desired peer group members: %#v", members)
 
444
        } else {
 
445
                logger.Debugf("no change in desired peer group (voting %#v)", voting)
 
446
        }
 
447
 
 
448
        // We cannot change the HasVote flag of a machine in state at exactly
 
449
        // the same moment as changing its voting status in the replica set.
 
450
        //
 
451
        // Thus we need to be careful that a machine which is actually a voting
 
452
        // member is not seen to not have a vote, because otherwise
 
453
        // there is nothing to prevent the machine being removed.
 
454
        //
 
455
        // To avoid this happening, we make sure when we call SetReplicaSet,
 
456
        // that the voting status of machines is the union of both old
 
457
        // and new voting machines - that is the set of HasVote machines
 
458
        // is a superset of all the actual voting machines.
 
459
        //
 
460
        // Only after the call has taken place do we reset the voting status
 
461
        // of the machines that have lost their vote.
 
462
        //
 
463
        // If there's a crash, the voting status may not reflect the
 
464
        // actual voting status for a while, but when things come
 
465
        // back on line, it will be sorted out, as desiredReplicaSet
 
466
        // will return the actual voting status.
 
467
        //
 
468
        // Note that we potentially update the HasVote status of the machines even
 
469
        // if the members have not changed.
 
470
        var added, removed []*machineTracker
 
471
        for m, hasVote := range voting {
 
472
                switch {
 
473
                case hasVote && !m.stm.HasVote():
 
474
                        added = append(added, m)
 
475
                case !hasVote && m.stm.HasVote():
 
476
                        removed = append(removed, m)
 
477
                }
 
478
        }
 
479
        if err := setHasVote(added, true); err != nil {
 
480
                return errors.Annotate(err, "cannot set HasVote added")
 
481
        }
 
482
        if members != nil {
 
483
                if err := w.st.MongoSession().Set(members); err != nil {
 
484
                        // We've failed to set the replica set, so revert back
 
485
                        // to the previous settings.
 
486
                        if err1 := setHasVote(added, false); err1 != nil {
 
487
                                logger.Errorf("cannot revert machine voting after failure to change replica set: %v", err1)
 
488
                        }
 
489
                        return &replicaSetError{err}
 
490
                }
 
491
                logger.Infof("successfully changed replica set to %#v", members)
 
492
        }
 
493
        if err := setHasVote(removed, false); err != nil {
 
494
                return errors.Annotate(err, "cannot set HasVote removed")
 
495
        }
 
496
        return nil
 
497
}
 
498
 
 
499
// setHasVote sets the HasVote status of all the given
 
500
// machines to hasVote.
 
501
func setHasVote(ms []*machineTracker, hasVote bool) error {
 
502
        if len(ms) == 0 {
 
503
                return nil
 
504
        }
 
505
        logger.Infof("setting HasVote=%v on machines %v", hasVote, ms)
 
506
        for _, m := range ms {
 
507
                if err := m.stm.SetHasVote(hasVote); err != nil {
 
508
                        return fmt.Errorf("cannot set voting status of %q to %v: %v", m.Id(), hasVote, err)
 
509
                }
 
510
        }
 
511
        return nil
 
512
}
 
513
 
 
514
// allSpaceStats holds a SpaceStats for both API and Mongo machines
 
515
type allSpaceStats struct {
 
516
        APIMachines   spaceStats
 
517
        MongoMachines spaceStats
 
518
}
 
519
 
 
520
// SpaceStats holds information useful when choosing which space to pick an
 
521
// address from.
 
522
type spaceStats struct {
 
523
        SpaceRefCount           map[network.SpaceName]int
 
524
        LargestSpace            network.SpaceName
 
525
        LargestSpaceSize        int
 
526
        LargestSpaceContainsAll bool
 
527
}
 
528
 
 
529
// generateSpaceStats takes a list of machine addresses and returns information
 
530
// about what spaces are referenced by those machines.
 
531
func generateSpaceStats(addresses [][]network.Address) spaceStats {
 
532
        var stats spaceStats
 
533
        stats.SpaceRefCount = make(map[network.SpaceName]int)
 
534
 
 
535
        for i := range addresses {
 
536
                for _, addr := range addresses[i] {
 
537
                        v := stats.SpaceRefCount[addr.SpaceName]
 
538
                        v++
 
539
                        stats.SpaceRefCount[addr.SpaceName] = v
 
540
 
 
541
                        if v > stats.LargestSpaceSize {
 
542
                                stats.LargestSpace = addr.SpaceName
 
543
                                stats.LargestSpaceSize = v
 
544
                        }
 
545
                }
 
546
        }
 
547
 
 
548
        stats.LargestSpaceContainsAll = stats.LargestSpaceSize == len(addresses)
 
549
 
 
550
        return stats
 
551
}