1
// Copyright 2014 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
10
"github.com/juju/errors"
11
"github.com/juju/loggo"
12
"github.com/juju/replicaset"
14
"github.com/juju/juju/instance"
15
"github.com/juju/juju/network"
16
"github.com/juju/juju/state"
17
"github.com/juju/juju/status"
18
"github.com/juju/juju/worker"
19
"github.com/juju/juju/worker/catacomb"
22
var logger = loggo.GetLogger("juju.worker.peergrouper")
24
type stateInterface interface {
25
Machine(id string) (stateMachine, error)
26
WatchControllerInfo() state.NotifyWatcher
27
WatchControllerStatusChanges() state.StringsWatcher
28
ControllerInfo() (*state.ControllerInfo, error)
29
MongoSession() mongoSession
30
Space(id string) (SpaceReader, error)
31
SetOrGetMongoSpaceName(spaceName network.SpaceName) (network.SpaceName, error)
32
SetMongoSpaceState(mongoSpaceState state.MongoSpaceStates) error
35
type stateMachine interface {
37
InstanceId() (instance.Id, error)
38
Status() (status.StatusInfo, error)
40
Watch() state.NotifyWatcher
43
SetHasVote(hasVote bool) error
44
APIHostPorts() []network.HostPort
45
MongoHostPorts() []network.HostPort
48
type mongoSession interface {
49
CurrentStatus() (*replicaset.Status, error)
50
CurrentMembers() ([]replicaset.Member, error)
51
Set([]replicaset.Member) error
54
type publisherInterface interface {
55
// publish publishes information about the given controllers
56
// to whomsoever it may concern. When it is called there
57
// is no guarantee that any of the information has actually changed.
58
publishAPIServers(apiServers [][]network.HostPort, instanceIds []instance.Id) error
62
// If we fail to set the mongo replica set members,
63
// we start retrying with the following interval,
64
// before exponentially backing off with each further
66
initialRetryInterval = 2 * time.Second
68
// maxRetryInterval holds the maximum interval
69
// between retry attempts.
70
maxRetryInterval = 5 * time.Minute
72
// pollInterval holds the interval at which the replica set
73
// members will be updated even in the absence of changes
74
// to State. This enables us to make changes to members
75
// that are triggered by changes to member status.
76
pollInterval = 1 * time.Minute
79
// pgWorker is a worker which watches the controller machines in state
80
// as well as the MongoDB replicaset configuration, adding and
81
// removing controller machines as they change or are added and
83
type pgWorker struct {
84
catacomb catacomb.Catacomb
86
// st represents the State. It is an interface so we can swap
87
// out the implementation during testing.
90
// machineChanges receives events from the machineTrackers when
91
// controller machines change in ways that are relevant to the
93
machineChanges chan struct{}
95
// machineTrackers holds the workers which track the machines we
96
// are currently watching (all the controller machines).
97
machineTrackers map[string]*machineTracker
99
// publisher holds the implementation of the API
100
// address publisher.
101
publisher publisherInterface
103
providerSupportsSpaces bool
106
// New returns a new worker that maintains the mongo replica set
107
// with respect to the given state.
108
func New(st *state.State, supportsSpaces bool) (worker.Worker, error) {
109
cfg, err := st.ControllerConfig()
115
mongoPort: cfg.StatePort(),
116
apiPort: cfg.APIPort(),
118
return newWorker(shim, newPublisher(st), supportsSpaces)
121
func newWorker(st stateInterface, pub publisherInterface, supportsSpaces bool) (worker.Worker, error) {
124
machineChanges: make(chan struct{}),
125
machineTrackers: make(map[string]*machineTracker),
127
providerSupportsSpaces: supportsSpaces,
129
err := catacomb.Invoke(catacomb.Plan{
134
return nil, errors.Trace(err)
139
func (w *pgWorker) Kill() {
143
func (w *pgWorker) Wait() error {
144
return w.catacomb.Wait()
147
func (w *pgWorker) loop() error {
148
controllerChanges, err := w.watchForControllerChanges()
150
return errors.Trace(err)
153
var updateChan <-chan time.Time
154
retryInterval := initialRetryInterval
158
case <-w.catacomb.Dying():
159
return w.catacomb.ErrDying()
160
case <-controllerChanges:
161
changed, err := w.updateControllerMachines()
163
return errors.Trace(err)
166
// A controller machine was added or removed, update
167
// the replica set immediately.
168
// TODO(fwereade): 2016-03-17 lp:1558657
169
updateChan = time.After(0)
172
case <-w.machineChanges:
173
// One of the controller machines changed, update the
174
// replica set immediately.
175
// TODO(fwereade): 2016-03-17 lp:1558657
176
updateChan = time.After(0)
180
servers, instanceIds, err := w.apiPublishInfo()
182
return fmt.Errorf("cannot get API server info: %v", err)
184
if err := w.publisher.publishAPIServers(servers, instanceIds); err != nil {
185
logger.Errorf("cannot publish API server addresses: %v", err)
188
if err := w.updateReplicaset(); err != nil {
189
if _, isReplicaSetError := err.(*replicaSetError); !isReplicaSetError {
192
logger.Errorf("cannot set replicaset: %v", err)
196
// Update the replica set members occasionally
197
// to keep them up to date with the current
198
// replica set member statuses.
199
// TODO(fwereade): 2016-03-17 lp:1558657
200
updateChan = time.After(pollInterval)
201
retryInterval = initialRetryInterval
203
// TODO(fwereade): 2016-03-17 lp:1558657
204
updateChan = time.After(retryInterval)
206
if retryInterval > maxRetryInterval {
207
retryInterval = maxRetryInterval
215
// watchForControllerChanges starts two watchers pertaining to changes
216
// to the controllers, returning a channel which will receive events
217
// if either watcher fires.
218
func (w *pgWorker) watchForControllerChanges() (<-chan struct{}, error) {
219
controllerInfoWatcher := w.st.WatchControllerInfo()
220
if err := w.catacomb.Add(controllerInfoWatcher); err != nil {
221
return nil, errors.Trace(err)
224
controllerStatusWatcher := w.st.WatchControllerStatusChanges()
225
if err := w.catacomb.Add(controllerStatusWatcher); err != nil {
226
return nil, errors.Trace(err)
229
out := make(chan struct{})
233
case <-w.catacomb.Dying():
235
case <-controllerInfoWatcher.Changes():
237
case <-controllerStatusWatcher.Changes():
245
// updateControllerMachines updates the peergrouper's current list of
246
// controller machines, as well as starting and stopping trackers for
247
// them as they are added and removed.
248
func (w *pgWorker) updateControllerMachines() (bool, error) {
249
info, err := w.st.ControllerInfo()
251
return false, fmt.Errorf("cannot get controller info: %v", err)
254
logger.Debugf("controller machines in state: %#v", info.MachineIds)
257
// Stop machine goroutines that no longer correspond to controller
259
for _, m := range w.machineTrackers {
260
if !inStrings(m.Id(), info.MachineIds) {
262
delete(w.machineTrackers, m.Id())
267
// Start machines with no watcher
268
for _, id := range info.MachineIds {
269
if _, ok := w.machineTrackers[id]; ok {
272
logger.Debugf("found new machine %q", id)
273
stm, err := w.st.Machine(id)
275
if errors.IsNotFound(err) {
276
// If the machine isn't found, it must have been
277
// removed and will soon enough be removed
278
// from the controller list. This will probably
279
// never happen, but we'll code defensively anyway.
280
logger.Warningf("machine %q from controller list not found", id)
283
return false, fmt.Errorf("cannot get machine %q: %v", id, err)
286
// Don't add the machine unless it is "Started"
287
machineStatus, err := stm.Status()
289
return false, errors.Annotatef(err, "cannot get status for machine %q", id)
291
if machineStatus.Status == status.StatusStarted {
292
logger.Debugf("machine %q has started, adding it to peergrouper list", id)
293
tracker, err := newMachineTracker(stm, w.machineChanges)
295
return false, errors.Trace(err)
297
if err := w.catacomb.Add(tracker); err != nil {
298
return false, errors.Trace(err)
300
w.machineTrackers[id] = tracker
303
logger.Debugf("machine %q not ready: %v", id, machineStatus.Status)
310
func inStrings(t string, ss []string) bool {
311
for _, s := range ss {
319
func (w *pgWorker) apiPublishInfo() ([][]network.HostPort, []instance.Id, error) {
320
servers := make([][]network.HostPort, 0, len(w.machineTrackers))
321
instanceIds := make([]instance.Id, 0, len(w.machineTrackers))
322
for _, m := range w.machineTrackers {
323
if len(m.APIHostPorts()) == 0 {
326
instanceId, err := m.stm.InstanceId()
330
instanceIds = append(instanceIds, instanceId)
331
servers = append(servers, m.APIHostPorts())
334
return servers, instanceIds, nil
337
// peerGroupInfo collates current session information about the
338
// mongo peer group with information from state machines.
339
func (w *pgWorker) peerGroupInfo() (*peerGroupInfo, error) {
340
session := w.st.MongoSession()
341
info := &peerGroupInfo{}
343
status, err := session.CurrentStatus()
345
return nil, fmt.Errorf("cannot get replica set status: %v", err)
347
info.statuses = status.Members
348
info.members, err = session.CurrentMembers()
350
return nil, fmt.Errorf("cannot get replica set members: %v", err)
352
info.machineTrackers = w.machineTrackers
354
spaceName, err := w.getMongoSpace(mongoAddresses(info.machineTrackers))
358
info.mongoSpace = spaceName
363
func mongoAddresses(machines map[string]*machineTracker) [][]network.Address {
364
addresses := make([][]network.Address, len(machines))
366
for _, m := range machines {
367
for _, hp := range m.MongoHostPorts() {
368
addresses[i] = append(addresses[i], hp.Address)
375
// getMongoSpace updates info with the space that Mongo servers should exist in.
376
func (w *pgWorker) getMongoSpace(addrs [][]network.Address) (network.SpaceName, error) {
377
unset := network.SpaceName("")
379
stateInfo, err := w.st.ControllerInfo()
381
return unset, errors.Annotate(err, "cannot get state server info")
384
switch stateInfo.MongoSpaceState {
385
case state.MongoSpaceUnknown:
386
if !w.providerSupportsSpaces {
387
err := w.st.SetMongoSpaceState(state.MongoSpaceUnsupported)
389
return unset, errors.Annotate(err, "cannot set Mongo space state")
394
// We want to find a space that contains all Mongo servers so we can
395
// use it to look up the IP address of each Mongo server to be used
396
// to set up the peer group.
397
spaceStats := generateSpaceStats(addrs)
398
if spaceStats.LargestSpaceContainsAll == false {
399
err := w.st.SetMongoSpaceState(state.MongoSpaceInvalid)
401
return unset, errors.Annotate(err, "cannot set Mongo space state")
403
logger.Warningf("couldn't find a space containing all peer group machines")
406
spaceName, err := w.st.SetOrGetMongoSpaceName(spaceStats.LargestSpace)
408
return unset, errors.Annotate(err, "error setting/getting Mongo space")
410
return spaceName, nil
413
case state.MongoSpaceValid:
414
space, err := w.st.Space(stateInfo.MongoSpaceName)
416
return unset, errors.Annotate(err, "looking up space")
418
return network.SpaceName(space.Name()), nil
424
// replicaSetError holds an error returned as a result
425
// of calling replicaset.Set. As this is expected to fail
426
// in the normal course of things, it needs special treatment.
427
type replicaSetError struct {
431
// updateReplicaset sets the current replica set members, and applies the
432
// given voting status to machines in the state.
433
func (w *pgWorker) updateReplicaset() error {
434
info, err := w.peerGroupInfo()
436
return errors.Annotate(err, "cannot get peergrouper info")
438
members, voting, err := desiredPeerGroup(info)
440
return fmt.Errorf("cannot compute desired peer group: %v", err)
443
logger.Debugf("desired peer group members: %#v", members)
445
logger.Debugf("no change in desired peer group (voting %#v)", voting)
448
// We cannot change the HasVote flag of a machine in state at exactly
449
// the same moment as changing its voting status in the replica set.
451
// Thus we need to be careful that a machine which is actually a voting
452
// member is not seen to not have a vote, because otherwise
453
// there is nothing to prevent the machine being removed.
455
// To avoid this happening, we make sure when we call SetReplicaSet,
456
// that the voting status of machines is the union of both old
457
// and new voting machines - that is the set of HasVote machines
458
// is a superset of all the actual voting machines.
460
// Only after the call has taken place do we reset the voting status
461
// of the machines that have lost their vote.
463
// If there's a crash, the voting status may not reflect the
464
// actual voting status for a while, but when things come
465
// back on line, it will be sorted out, as desiredReplicaSet
466
// will return the actual voting status.
468
// Note that we potentially update the HasVote status of the machines even
469
// if the members have not changed.
470
var added, removed []*machineTracker
471
for m, hasVote := range voting {
473
case hasVote && !m.stm.HasVote():
474
added = append(added, m)
475
case !hasVote && m.stm.HasVote():
476
removed = append(removed, m)
479
if err := setHasVote(added, true); err != nil {
480
return errors.Annotate(err, "cannot set HasVote added")
483
if err := w.st.MongoSession().Set(members); err != nil {
484
// We've failed to set the replica set, so revert back
485
// to the previous settings.
486
if err1 := setHasVote(added, false); err1 != nil {
487
logger.Errorf("cannot revert machine voting after failure to change replica set: %v", err1)
489
return &replicaSetError{err}
491
logger.Infof("successfully changed replica set to %#v", members)
493
if err := setHasVote(removed, false); err != nil {
494
return errors.Annotate(err, "cannot set HasVote removed")
499
// setHasVote sets the HasVote status of all the given
500
// machines to hasVote.
501
func setHasVote(ms []*machineTracker, hasVote bool) error {
505
logger.Infof("setting HasVote=%v on machines %v", hasVote, ms)
506
for _, m := range ms {
507
if err := m.stm.SetHasVote(hasVote); err != nil {
508
return fmt.Errorf("cannot set voting status of %q to %v: %v", m.Id(), hasVote, err)
514
// allSpaceStats holds a SpaceStats for both API and Mongo machines
515
type allSpaceStats struct {
516
APIMachines spaceStats
517
MongoMachines spaceStats
520
// SpaceStats holds information useful when choosing which space to pick an
522
type spaceStats struct {
523
SpaceRefCount map[network.SpaceName]int
524
LargestSpace network.SpaceName
526
LargestSpaceContainsAll bool
529
// generateSpaceStats takes a list of machine addresses and returns information
530
// about what spaces are referenced by those machines.
531
func generateSpaceStats(addresses [][]network.Address) spaceStats {
533
stats.SpaceRefCount = make(map[network.SpaceName]int)
535
for i := range addresses {
536
for _, addr := range addresses[i] {
537
v := stats.SpaceRefCount[addr.SpaceName]
539
stats.SpaceRefCount[addr.SpaceName] = v
541
if v > stats.LargestSpaceSize {
542
stats.LargestSpace = addr.SpaceName
543
stats.LargestSpaceSize = v
548
stats.LargestSpaceContainsAll = stats.LargestSpaceSize == len(addresses)