~rogpeppe/juju-core/azure

« back to all changes in this revision

Viewing changes to state/unit.go

[r=wallyworld] New assignment policy is AssignCleanEmpty

Add container support to AssignClean(Empty) policies and make
the default policy AssignCleanEmpty. This allows manual deployment
of charms into existing clean and empty containers/instances, with
the fallback that a new container/instance is created if none are
available. The system behaves the same as previously if a simple
bootstrap and deploy is performed, but if add-machine is used to
create some containers/instances, then those are no longer ignored.

https://codereview.appspot.com/11019044/

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
        "labix.org/v2/mgo/txn"
16
16
 
17
17
        "launchpad.net/juju-core/charm"
 
18
        "launchpad.net/juju-core/constraints"
18
19
        "launchpad.net/juju-core/errors"
19
20
        "launchpad.net/juju-core/instance"
20
21
        "launchpad.net/juju-core/state/api/params"
713
714
 
714
715
var (
715
716
        machineNotAliveErr = stderrors.New("machine is not alive")
 
717
        machineNotCleanErr = stderrors.New("machine is dirty")
716
718
        unitNotAliveErr    = stderrors.New("unit is not alive")
717
719
        alreadyAssignedErr = stderrors.New("unit is already assigned to a machine")
718
720
        inUseErr           = stderrors.New("machine is not unused")
809
811
        return u.assignToMachine(m, false)
810
812
}
811
813
 
812
 
// AssignToNewMachine assigns the unit to a new machine, with constraints
813
 
// determined according to the service and environment constraints at the
814
 
// time of unit creation.
815
 
func (u *Unit) AssignToNewMachine() (err error) {
816
 
        defer assignContextf(&err, u, "new machine")
817
 
        if u.doc.Principal != "" {
818
 
                return fmt.Errorf("unit is a subordinate")
819
 
        }
820
 
        // Get the ops necessary to create a new machine, and the machine doc that
821
 
        // will be added with those operations (which includes the machine id).
822
 
        cons, err := readConstraints(u.st, u.globalKey())
823
 
        if errors.IsNotFoundError(err) {
824
 
                // Lack of constraints indicates lack of unit.
825
 
                return errors.NotFoundf("unit")
826
 
        } else if err != nil {
827
 
                return err
828
 
        }
829
 
        var containerType instance.ContainerType
830
 
        // Configure to create a new container if required.
831
 
        if cons.Container != nil && *cons.Container != "" && *cons.Container != instance.NONE {
832
 
                containerType = *cons.Container
833
 
        }
834
 
        params := &AddMachineParams{
835
 
                Series:        u.doc.Series,
836
 
                ContainerType: containerType,
837
 
                Jobs:          []MachineJob{JobHostUnits},
838
 
        }
 
814
// assignToNewMachine assigns the unit to a machine created according to the supplied params,
 
815
// with the supplied constraints.
 
816
func (u *Unit) assignToNewMachine(params *AddMachineParams, cons constraints.Value) (err error) {
839
817
        ops, instData, containerParams, err := u.st.addMachineContainerOps(params, cons)
840
818
        if err != nil {
841
819
                return err
842
820
        }
843
821
        mdoc := &machineDoc{
844
822
                Series:        u.doc.Series,
845
 
                ContainerType: string(containerType),
 
823
                ContainerType: string(params.ContainerType),
846
824
                Jobs:          []MachineJob{JobHostUnits},
847
825
                Principals:    []string{u.doc.Name},
848
826
                Clean:         false,
853
831
        }
854
832
        ops = append(ops, machineOps...)
855
833
        isUnassigned := D{{"machineid", ""}}
 
834
        asserts := append(isAliveDoc, isUnassigned...)
 
835
        // Ensure the host machine is really clean.
 
836
        if params.ParentId != "" {
 
837
                ops = append(ops, txn.Op{
 
838
                        C:      u.st.machines.Name,
 
839
                        Id:     params.ParentId,
 
840
                        Assert: D{{"clean", true}},
 
841
                }, txn.Op{
 
842
                        C:      u.st.containerRefs.Name,
 
843
                        Id:     params.ParentId,
 
844
                        Assert: D{hasNoContainersTerm},
 
845
                })
 
846
        }
856
847
        ops = append(ops, txn.Op{
857
848
                C:      u.st.units.Name,
858
849
                Id:     u.doc.Name,
859
 
                Assert: append(isAliveDoc, isUnassigned...),
 
850
                Assert: asserts,
860
851
                Update: D{{"$set", D{{"machineid", mdoc.Id}}}},
861
852
        })
862
853
        err = u.st.runTransaction(ops)
871
862
        // reasons that the transaction could have been aborted are:
872
863
        //  * the unit is no longer alive
873
864
        //  * the unit has been assigned to a different machine
 
865
        //  * the parent machine we want to create a container on was clean but became dirty
874
866
        unit, err := u.st.Unit(u.Name())
875
867
        if err != nil {
876
868
                return err
881
873
        case unit.doc.MachineId != "":
882
874
                return alreadyAssignedErr
883
875
        }
 
876
        if params.ParentId != "" {
 
877
                m, err := u.st.Machine(params.ParentId)
 
878
                if err != nil {
 
879
                        return err
 
880
                }
 
881
                if !m.Clean() {
 
882
                        return machineNotCleanErr
 
883
                }
 
884
                containers, err := m.Containers()
 
885
                if err != nil {
 
886
                        return err
 
887
                }
 
888
                if len(containers) > 0 {
 
889
                        return machineNotCleanErr
 
890
                }
 
891
        }
884
892
        // Other error condition not considered.
885
893
        return fmt.Errorf("unknown error")
886
894
}
887
895
 
 
896
// constraints is a helper function to return a unit's deployment constraints.
 
897
func (u *Unit) constraints() (*constraints.Value, error) {
 
898
        cons, err := readConstraints(u.st, u.globalKey())
 
899
        if errors.IsNotFoundError(err) {
 
900
                // Lack of constraints indicates lack of unit.
 
901
                return nil, errors.NotFoundf("unit")
 
902
        } else if err != nil {
 
903
                return nil, err
 
904
        }
 
905
        return &cons, nil
 
906
}
 
907
 
 
908
// AssignToNewMachineOrContainer assigns the unit to a new machine, with constraints
 
909
// determined according to the service and environment constraints at the time of unit creation.
 
910
// If a container is required, a clean, empty machine instance is required on which to create
 
911
// the container. An existing clean, empty instance is first searched for, and if not found,
 
912
// a new one is created.
 
913
func (u *Unit) AssignToNewMachineOrContainer() (err error) {
 
914
        defer assignContextf(&err, u, "new machine or container")
 
915
        if u.doc.Principal != "" {
 
916
                return fmt.Errorf("unit is a subordinate")
 
917
        }
 
918
        cons, err := u.constraints()
 
919
        if err != nil {
 
920
                return err
 
921
        }
 
922
        if !cons.HasContainer() {
 
923
                return u.AssignToNewMachine()
 
924
        }
 
925
 
 
926
        // Find a clean, empty machine on which to create a container.
 
927
        var host machineDoc
 
928
        hostCons := *cons
 
929
        noContainer := instance.NONE
 
930
        hostCons.Container = &noContainer
 
931
        query, err := u.findCleanMachineQuery(true, &hostCons)
 
932
        if err != nil {
 
933
                return err
 
934
        }
 
935
        err = query.One(&host)
 
936
        if err == mgo.ErrNotFound {
 
937
                // No existing clean, empty machine so create a new one.
 
938
                // The container constraint will be used by AssignToNewMachine to create the required container.
 
939
                return u.AssignToNewMachine()
 
940
        } else if err != nil {
 
941
                return err
 
942
        }
 
943
        params := &AddMachineParams{
 
944
                Series:        u.doc.Series,
 
945
                ParentId:      host.Id,
 
946
                ContainerType: *cons.Container,
 
947
                Jobs:          []MachineJob{JobHostUnits},
 
948
        }
 
949
        err = u.assignToNewMachine(params, *cons)
 
950
        if err == machineNotCleanErr {
 
951
                // The clean machine was used before we got a chance to use it so just
 
952
                // stick the unit on a new machine.
 
953
                return u.AssignToNewMachine()
 
954
        }
 
955
        return err
 
956
}
 
957
 
 
958
// AssignToNewMachine assigns the unit to a new machine, with constraints
 
959
// determined according to the service and environment constraints at the
 
960
// time of unit creation.
 
961
func (u *Unit) AssignToNewMachine() (err error) {
 
962
        defer assignContextf(&err, u, "new machine")
 
963
        if u.doc.Principal != "" {
 
964
                return fmt.Errorf("unit is a subordinate")
 
965
        }
 
966
        // Get the ops necessary to create a new machine, and the machine doc that
 
967
        // will be added with those operations (which includes the machine id).
 
968
        cons, err := u.constraints()
 
969
        if err != nil {
 
970
                return err
 
971
        }
 
972
        var containerType instance.ContainerType
 
973
        // Configure to create a new container if required.
 
974
        if cons.HasContainer() {
 
975
                containerType = *cons.Container
 
976
        }
 
977
        params := &AddMachineParams{
 
978
                Series:        u.doc.Series,
 
979
                ContainerType: containerType,
 
980
                Jobs:          []MachineJob{JobHostUnits},
 
981
        }
 
982
        err = u.assignToNewMachine(params, *cons)
 
983
        return err
 
984
}
 
985
 
888
986
var noCleanMachines = stderrors.New("all eligible machines in use")
889
987
 
890
988
// AssignToCleanMachine assigns u to a machine which is marked as clean. A machine
907
1005
        return u.assignToCleanMaybeEmptyMachine(true)
908
1006
}
909
1007
 
910
 
// assignToCleanMaybeEmptyMachine implements AssignToCleanMachine and AssignToCleanEmptyMachine.
911
 
func (u *Unit) assignToCleanMaybeEmptyMachine(requireEmpty bool) (m *Machine, err error) {
 
1008
var hasContainerTerm = bson.DocElem{
 
1009
        "$and", []D{
 
1010
                {{"children", D{{"$not", D{{"$size", 0}}}}}},
 
1011
                {{"children", D{{"$exists", true}}}},
 
1012
        }}
 
1013
 
 
1014
var hasNoContainersTerm = bson.DocElem{
 
1015
        "$or", []D{
 
1016
                {{"children", D{{"$size", 0}}}},
 
1017
                {{"children", D{{"$exists", false}}}},
 
1018
        }}
 
1019
 
 
1020
// findCleanMachineQuery returns a Mongo query to find clean (and possibly empty) machines with
 
1021
// characteristics matching the specified constraints.
 
1022
func (u *Unit) findCleanMachineQuery(requireEmpty bool, cons *constraints.Value) (*mgo.Query, error) {
 
1023
        // TODO(wallyworld) - add support for constraints besides just container type
 
1024
        var containerType instance.ContainerType
 
1025
        if cons.Container != nil {
 
1026
                containerType = *cons.Container
 
1027
        }
912
1028
        // Select all machines that can accept principal units and are clean.
913
1029
        var containerRefs []machineContainers
914
1030
        // If we need empty machines, first build up a list of machine ids which have containers
915
1031
        // so we can exclude those.
916
1032
        if requireEmpty {
917
 
                err = u.st.containerRefs.Find(D{
918
 
                        {"$and", []D{
919
 
                                {{
920
 
                                        "children",
921
 
                                        D{{"$not", D{{"$size", 0}}}},
922
 
                                }},
923
 
                                {{
924
 
                                        "children",
925
 
                                        D{{"$exists", true}},
926
 
                                }},
927
 
                        },
928
 
                        }},
929
 
                ).Select(bson.M{"_id": 1}).All(&containerRefs)
 
1033
                err := u.st.containerRefs.Find(D{hasContainerTerm}).Select(bson.M{"_id": 1}).All(&containerRefs)
930
1034
                if err != nil {
931
1035
                        return nil, err
932
1036
                }
935
1039
        for i, cref := range containerRefs {
936
1040
                machinesWithContainers[i] = cref.Id
937
1041
        }
938
 
        query := u.st.machines.Find(D{
 
1042
        terms := D{
939
1043
                {"life", Alive},
940
1044
                {"series", u.doc.Series},
941
 
                {"jobs", JobHostUnits},
942
 
                {"clean", D{{"$ne", false}}},
 
1045
                {"jobs", []MachineJob{JobHostUnits}},
 
1046
                {"clean", true},
943
1047
                {"_id", D{{"$nin", machinesWithContainers}}},
944
 
        })
 
1048
        }
 
1049
        // Add the container filter term if necessary.
 
1050
        if containerType == instance.NONE {
 
1051
                terms = append(terms, bson.DocElem{"containertype", ""})
 
1052
        } else if containerType != "" {
 
1053
                terms = append(terms, bson.DocElem{"containertype", string(containerType)})
 
1054
        }
 
1055
        return u.st.machines.Find(terms), nil
 
1056
}
 
1057
 
 
1058
// assignToCleanMaybeEmptyMachine implements AssignToCleanMachine and AssignToCleanEmptyMachine.
 
1059
// A 'machine' may be a machine instance or container depending on the service constraints.
 
1060
func (u *Unit) assignToCleanMaybeEmptyMachine(requireEmpty bool) (m *Machine, err error) {
 
1061
        context := "clean"
 
1062
        if requireEmpty {
 
1063
                context += ", empty"
 
1064
        }
 
1065
        context += " machine"
 
1066
 
 
1067
        if u.doc.Principal != "" {
 
1068
                err = fmt.Errorf("unit is a subordinate")
 
1069
                assignContextf(&err, u, context)
 
1070
                return nil, err
 
1071
        }
 
1072
 
 
1073
        // Get the unit constraints to see what deployment requirements we have to adhere to.
 
1074
        cons, err := u.constraints()
 
1075
        if err != nil {
 
1076
                assignContextf(&err, u, context)
 
1077
                return nil, err
 
1078
        }
 
1079
        query, err := u.findCleanMachineQuery(requireEmpty, cons)
 
1080
        if err != nil {
 
1081
                assignContextf(&err, u, context)
 
1082
                return nil, err
 
1083
        }
945
1084
 
946
1085
        // TODO use Batch(1). See https://bugs.launchpad.net/mgo/+bug/1053509
947
1086
        // TODO(rog) Fix so this is more efficient when there are concurrent uses.
950
1089
        // middle.
951
1090
        iter := query.Batch(2).Prefetch(0).Iter()
952
1091
        var mdoc machineDoc
953
 
        context := "clean"
954
 
        if requireEmpty {
955
 
                context += ", empty"
956
 
        }
957
 
        context += " machine"
958
1092
        for iter.Next(&mdoc) {
959
1093
                m := newMachine(u.st, &mdoc)
960
1094
                err := u.assignToMachine(m, true)