~themue/pyjuju/go-state-auth

« back to all changes in this revision

Viewing changes to state/topology.go

  • Committer: Frank Mueller
  • Date: 2012-01-12 08:05:19 UTC
  • mfrom: (32.1.7 go-initial-state-adding)
  • Revision ID: frank.mueller@canonical.com-20120112080519-ytw2ap2o1ogbe018
Initial adding of the state model to the Go port of juju

As a first step in adding the Go port the state can be opened and first
operations ond Service and Unit can be done. Also needed first topology
functionality is part of this branch.

R=
CC=
https://codereview.appspot.com/5502043

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// launchpad.net/juju/state
 
2
//
 
3
// Copyright (c) 2011-2012 Canonical Ltd.
 
4
 
 
5
package state
 
6
 
 
7
import (
 
8
        "fmt"
 
9
        "launchpad.net/goyaml"
 
10
        "launchpad.net/gozk/zookeeper"
 
11
        "sort"
 
12
)
 
13
 
 
14
// The protocol version, which is stored in the /topology node under
 
15
// the "version" key. The protocol version should *only* be updated
 
16
// when we know that a version is in fact actually incompatible.
 
17
const topologyVersion = 1
 
18
 
 
19
// zkTopology is used to marshal and unmarshal the content
 
20
// of the /topology node in ZooKeeper.
 
21
type zkTopology struct {
 
22
        Version      int
 
23
        Services     map[string]*zkService
 
24
        UnitSequence map[string]int "unit-sequence"
 
25
}
 
26
 
 
27
// zkService represents the service data within the /topology
 
28
// node in ZooKeeper.
 
29
type zkService struct {
 
30
        Name  string
 
31
        Units map[string]*zkUnit
 
32
}
 
33
 
 
34
// zkUnit represents the unit data within the /topology
 
35
// node in ZooKeeper.
 
36
type zkUnit struct {
 
37
        Sequence int
 
38
        Machine  string
 
39
}
 
40
 
 
41
// topology is an internal helper that handles the content
 
42
// of the /topology node in ZooKeeper.
 
43
type topology struct {
 
44
        topology *zkTopology
 
45
}
 
46
 
 
47
// readTopology connects ZooKeeper, retrieves the data as YAML,
 
48
// parses it and returns it.
 
49
func readTopology(zk *zookeeper.Conn) (*topology, error) {
 
50
        yaml, _, err := zk.Get("/topology")
 
51
        if err != nil {
 
52
                return nil, err
 
53
        }
 
54
        return parseTopology(yaml)
 
55
}
 
56
 
 
57
// dump returns the topology as YAML.
 
58
func (t *topology) dump() (string, error) {
 
59
        topologyYaml, err := goyaml.Marshal(t.topology)
 
60
        if err != nil {
 
61
                return "", err
 
62
        }
 
63
        return string(topologyYaml), nil
 
64
}
 
65
 
 
66
// version returns the version of the topology.
 
67
func (t *topology) version() int {
 
68
        return t.topology.Version
 
69
}
 
70
 
 
71
// hasService returns true if a service with the given key exists.
 
72
func (t *topology) hasService(key string) bool {
 
73
        return t.topology.Services[key] != nil
 
74
}
 
75
 
 
76
// serviceKey returns the key of the service with the given name.
 
77
func (t *topology) serviceKey(name string) (string, error) {
 
78
        for key, svc := range t.topology.Services {
 
79
                if svc.Name == name {
 
80
                        return key, nil
 
81
                }
 
82
        }
 
83
        return "", fmt.Errorf("service with name %q cannot be found", name)
 
84
}
 
85
 
 
86
// hasUnit returns true if a unit with given service and unit keys exists.
 
87
func (t *topology) hasUnit(serviceKey, unitKey string) bool {
 
88
        if t.hasService(serviceKey) {
 
89
                return t.topology.Services[serviceKey].Units[unitKey] != nil
 
90
        }
 
91
        return false
 
92
}
 
93
 
 
94
// addUnit adds a new unit and returns the sequence number. This
 
95
// sequence number will be increased monotonically for each service.
 
96
func (t *topology) addUnit(serviceKey, unitKey string) (int, error) {
 
97
        if err := t.assertService(serviceKey); err != nil {
 
98
                return -1, err
 
99
        }
 
100
        // Check if unit key is unused.
 
101
        for key, svc := range t.topology.Services {
 
102
                if _, ok := svc.Units[unitKey]; ok {
 
103
                        return -1, fmt.Errorf("unit %q already in use in servie %q", unitKey, key)
 
104
                }
 
105
        }
 
106
        // Add unit and increase sequence number.
 
107
        svc := t.topology.Services[serviceKey]
 
108
        sequenceNo := t.topology.UnitSequence[svc.Name]
 
109
        svc.Units[unitKey] = &zkUnit{Sequence: sequenceNo}
 
110
        t.topology.UnitSequence[svc.Name] += 1
 
111
        return sequenceNo, nil
 
112
}
 
113
 
 
114
// removeUnit removes a unit from a service.
 
115
func (t *topology) removeUnit(serviceKey, unitKey string) error {
 
116
        if err := t.assertUnit(serviceKey, unitKey); err != nil {
 
117
                return err
 
118
        }
 
119
        delete(t.topology.Services[serviceKey].Units, unitKey)
 
120
        return nil
 
121
}
 
122
 
 
123
// unitKeys returns the unit keys for all units of
 
124
// the service with the given service key in alphabetical order.
 
125
func (t *topology) unitKeys(serviceKey string) ([]string, error) {
 
126
        if err := t.assertService(serviceKey); err != nil {
 
127
                return nil, err
 
128
        }
 
129
        keys := []string{}
 
130
        svc := t.topology.Services[serviceKey]
 
131
        for key, _ := range svc.Units {
 
132
                keys = append(keys, key)
 
133
        }
 
134
        sort.Strings(keys)
 
135
        return keys, nil
 
136
}
 
137
 
 
138
// unitName returns the name of a unit by its service key and its own key.
 
139
func (t *topology) unitName(serviceKey, unitKey string) (string, error) {
 
140
        if err := t.assertUnit(serviceKey, unitKey); err != nil {
 
141
                return "", err
 
142
        }
 
143
        svc := t.topology.Services[serviceKey]
 
144
        unit := svc.Units[unitKey]
 
145
        return fmt.Sprintf("%s/%d", svc.Name, unit.Sequence), nil
 
146
}
 
147
 
 
148
// unitKeyFromSequence returns the key of a unit by the its service key
 
149
// and its sequence number.
 
150
func (t *topology) unitKeyFromSequence(serviceKey string, sequenceNo int) (string, error) {
 
151
        if err := t.assertService(serviceKey); err != nil {
 
152
                return "", err
 
153
        }
 
154
        svc := t.topology.Services[serviceKey]
 
155
        for key, unit := range svc.Units {
 
156
                if unit.Sequence == sequenceNo {
 
157
                        return key, nil
 
158
                }
 
159
        }
 
160
        return "", fmt.Errorf("unit with sequence number %d cannot be found", sequenceNo)
 
161
}
 
162
 
 
163
// unitMachineKey returns the key of an assigned machine of the unit. An empty
 
164
// key means there is no machine assigned.
 
165
func (t *topology) unitMachineKey(serviceKey, unitKey string) (string, error) {
 
166
        if err := t.assertUnit(serviceKey, unitKey); err != nil {
 
167
                return "", err
 
168
        }
 
169
        unit := t.topology.Services[serviceKey].Units[unitKey]
 
170
        return unit.Machine, nil
 
171
}
 
172
 
 
173
// unassignUnitFromMachine unassigns the unit from its current machine.
 
174
func (t *topology) unassignUnitFromMachine(serviceKey, unitKey string) error {
 
175
        if err := t.assertUnit(serviceKey, unitKey); err != nil {
 
176
                return err
 
177
        }
 
178
        unit := t.topology.Services[serviceKey].Units[unitKey]
 
179
        if unit.Machine == "" {
 
180
                return fmt.Errorf("unit %q in service %q is not assigned to a machine", unitKey, serviceKey)
 
181
        }
 
182
        unit.Machine = ""
 
183
        return nil
 
184
}
 
185
 
 
186
// assertService checks if a service exists.
 
187
func (t *topology) assertService(serviceKey string) error {
 
188
        if _, ok := t.topology.Services[serviceKey]; !ok {
 
189
                return fmt.Errorf("service with key %q cannot be found", serviceKey)
 
190
        }
 
191
        return nil
 
192
}
 
193
 
 
194
// assertUnit checks if a service with a unit exists.
 
195
func (t *topology) assertUnit(serviceKey, unitKey string) error {
 
196
        if err := t.assertService(serviceKey); err != nil {
 
197
                return err
 
198
        }
 
199
        svc := t.topology.Services[serviceKey]
 
200
        if _, ok := svc.Units[unitKey]; !ok {
 
201
                return fmt.Errorf("unit with key %q cannot be found", unitKey)
 
202
        }
 
203
        return nil
 
204
}
 
205
 
 
206
// parseTopology returns the topology represented by yaml.
 
207
func parseTopology(yaml string) (*topology, error) {
 
208
        t := &topology{topology: &zkTopology{Version: topologyVersion}}
 
209
        if err := goyaml.Unmarshal([]byte(yaml), t.topology); err != nil {
 
210
                return nil, err
 
211
        }
 
212
        if t.topology.Version != topologyVersion {
 
213
                return nil, fmt.Errorf("incompatible topology versions: got %d, want %d", t.topology.Version, topologyVersion)
 
214
        }
 
215
        return t, nil
 
216
}
 
217
 
 
218
// retryTopologyChange tries to change the topology with f.
 
219
// This function can read and modify the topology instance, 
 
220
// and after it returns the modified topology will be
 
221
// persisted into the /topology node. Note that this f must
 
222
// have no side-effects, since it may be called multiple times
 
223
// depending on conflict situations.
 
224
func retryTopologyChange(zk *zookeeper.Conn, f func(t *topology) error) error {
 
225
        change := func(yaml string, stat *zookeeper.Stat) (string, error) {
 
226
                var err error
 
227
                it := &topology{topology: &zkTopology{Version: 1}}
 
228
                if yaml != "" {
 
229
                        if it, err = parseTopology(yaml); err != nil {
 
230
                                return "", err
 
231
                        }
 
232
                }
 
233
                // Apply the passed function.
 
234
                if err = f(it); err != nil {
 
235
                        return "", err
 
236
                }
 
237
                return it.dump()
 
238
        }
 
239
        return zk.RetryChange("/topology", 0, zookeeper.WorldACL(zookeeper.PERM_ALL), change)
 
240
}