~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/worker/mongoupgrader/worker.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2015 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package mongoupgrader
 
5
 
 
6
import (
 
7
        "net"
 
8
        "strconv"
 
9
 
 
10
        "github.com/juju/errors"
 
11
 
 
12
        "github.com/juju/juju/mongo"
 
13
        "github.com/juju/juju/state"
 
14
        "github.com/juju/juju/worker"
 
15
        "github.com/juju/replicaset"
 
16
)
 
17
 
 
18
// StopMongo represents a function that can issue a stop
 
19
// to a running mongo service.
 
20
type StopMongo func(mongo.Version, bool) error
 
21
 
 
22
// New returns a worker or err in case of failure.
 
23
// this worker takes care of watching the state of machine's upgrade
 
24
// mongo information and change agent conf accordingly.
 
25
func New(st *state.State, machineID string, maybeStopMongo StopMongo) (worker.Worker, error) {
 
26
        upgradeWorker := func(stopch <-chan struct{}) error {
 
27
                return upgradeMongoWatcher(st, stopch, machineID, maybeStopMongo)
 
28
        }
 
29
        return worker.NewSimpleWorker(upgradeWorker), nil
 
30
}
 
31
 
 
32
func upgradeMongoWatcher(st *state.State, stopch <-chan struct{}, machineID string, maybeStopMongo StopMongo) error {
 
33
        m, err := st.Machine(machineID)
 
34
        if err != nil {
 
35
                return errors.Annotatef(err, "cannot start watcher for machine %q", machineID)
 
36
        }
 
37
        watch := m.Watch()
 
38
        defer func() {
 
39
                watch.Kill()
 
40
                watch.Wait()
 
41
        }()
 
42
 
 
43
        for {
 
44
                select {
 
45
                case <-watch.Changes():
 
46
                        if err := m.Refresh(); err != nil {
 
47
                                return errors.Annotate(err, "cannot refresh machine information")
 
48
                        }
 
49
                        if !m.IsManager() {
 
50
                                continue
 
51
                        }
 
52
                        expectedVersion, err := m.StopMongoUntilVersion()
 
53
                        if err != nil {
 
54
                                return errors.Annotate(err, "cannot obtain minimum version of mongo")
 
55
                        }
 
56
                        if expectedVersion == mongo.Mongo24 {
 
57
                                continue
 
58
                        }
 
59
                        var isMaster bool
 
60
                        isMaster, err = mongo.IsMaster(st.MongoSession(), m)
 
61
                        if err != nil {
 
62
                                return errors.Annotatef(err, "cannot determine if machine %q is master", machineID)
 
63
                        }
 
64
 
 
65
                        err = maybeStopMongo(expectedVersion, isMaster)
 
66
                        if err != nil {
 
67
                                return errors.Annotate(err, "cannot determine if mongo must be stopped")
 
68
                        }
 
69
                        if !isMaster {
 
70
                                addrs := make([]string, len(m.Addresses()))
 
71
                                ssi, err := st.StateServingInfo()
 
72
                                if err != nil {
 
73
                                        return errors.Annotate(err, "cannot obtain state serving info to stop mongo")
 
74
                                }
 
75
                                for i, addr := range m.Addresses() {
 
76
                                        addrs[i] = net.JoinHostPort(addr.Value, strconv.Itoa(ssi.StatePort))
 
77
                                }
 
78
                                if err := replicaset.Remove(st.MongoSession(), addrs...); err != nil {
 
79
                                        return errors.Annotatef(err, "cannot remove %q from replicaset", m.Id())
 
80
                                }
 
81
                                if err := m.SetStopMongoUntilVersion(mongo.Mongo24); err != nil {
 
82
                                        return errors.Annotate(err, "cannot reset stop mongo flag")
 
83
                                }
 
84
                        }
 
85
                case <-stopch:
 
86
                        return nil
 
87
                }
 
88
        }
 
89
}