1
// launchpad.net/juju/state
3
// Copyright (c) 2011-2012 Canonical Ltd.
10
"launchpad.net/gozk/zookeeper"
14
// The protocol version, which is stored in the /topology node under
15
// the "version" key. The protocol version should *only* be updated
16
// when we know that a version is in fact actually incompatible.
17
const topologyVersion = 1
19
// zkTopology is used to marshal and unmarshal the content
20
// of the /topology node in ZooKeeper.
21
type zkTopology struct {
23
Services map[string]*zkService
24
UnitSequence map[string]int "unit-sequence"
27
// zkService represents the service data within the /topology
29
type zkService struct {
31
Units map[string]*zkUnit
34
// zkUnit represents the unit data within the /topology
41
// topology is an internal helper that handles the content
42
// of the /topology node in ZooKeeper.
43
type topology struct {
47
// readTopology connects ZooKeeper, retrieves the data as YAML,
48
// parses it and returns it.
49
func readTopology(zk *zookeeper.Conn) (*topology, error) {
50
yaml, _, err := zk.Get("/topology")
54
return parseTopology(yaml)
57
// dump returns the topology as YAML.
58
func (t *topology) dump() (string, error) {
59
topologyYaml, err := goyaml.Marshal(t.topology)
63
return string(topologyYaml), nil
66
// version returns the version of the topology.
67
func (t *topology) version() int {
68
return t.topology.Version
71
// hasService returns true if a service with the given key exists.
72
func (t *topology) hasService(key string) bool {
73
return t.topology.Services[key] != nil
76
// serviceKey returns the key of the service with the given name.
77
func (t *topology) serviceKey(name string) (string, error) {
78
for key, svc := range t.topology.Services {
83
return "", fmt.Errorf("service with name %q cannot be found", name)
86
// hasUnit returns true if a unit with given service and unit keys exists.
87
func (t *topology) hasUnit(serviceKey, unitKey string) bool {
88
if t.hasService(serviceKey) {
89
return t.topology.Services[serviceKey].Units[unitKey] != nil
94
// addUnit adds a new unit and returns the sequence number. This
95
// sequence number will be increased monotonically for each service.
96
func (t *topology) addUnit(serviceKey, unitKey string) (int, error) {
97
if err := t.assertService(serviceKey); err != nil {
100
// Check if unit key is unused.
101
for key, svc := range t.topology.Services {
102
if _, ok := svc.Units[unitKey]; ok {
103
return -1, fmt.Errorf("unit %q already in use in servie %q", unitKey, key)
106
// Add unit and increase sequence number.
107
svc := t.topology.Services[serviceKey]
108
sequenceNo := t.topology.UnitSequence[svc.Name]
109
svc.Units[unitKey] = &zkUnit{Sequence: sequenceNo}
110
t.topology.UnitSequence[svc.Name] += 1
111
return sequenceNo, nil
114
// removeUnit removes a unit from a service.
115
func (t *topology) removeUnit(serviceKey, unitKey string) error {
116
if err := t.assertUnit(serviceKey, unitKey); err != nil {
119
delete(t.topology.Services[serviceKey].Units, unitKey)
123
// unitKeys returns the unit keys for all units of
124
// the service with the given service key in alphabetical order.
125
func (t *topology) unitKeys(serviceKey string) ([]string, error) {
126
if err := t.assertService(serviceKey); err != nil {
130
svc := t.topology.Services[serviceKey]
131
for key, _ := range svc.Units {
132
keys = append(keys, key)
138
// unitName returns the name of a unit by its service key and its own key.
139
func (t *topology) unitName(serviceKey, unitKey string) (string, error) {
140
if err := t.assertUnit(serviceKey, unitKey); err != nil {
143
svc := t.topology.Services[serviceKey]
144
unit := svc.Units[unitKey]
145
return fmt.Sprintf("%s/%d", svc.Name, unit.Sequence), nil
148
// unitKeyFromSequence returns the key of a unit by the its service key
149
// and its sequence number.
150
func (t *topology) unitKeyFromSequence(serviceKey string, sequenceNo int) (string, error) {
151
if err := t.assertService(serviceKey); err != nil {
154
svc := t.topology.Services[serviceKey]
155
for key, unit := range svc.Units {
156
if unit.Sequence == sequenceNo {
160
return "", fmt.Errorf("unit with sequence number %d cannot be found", sequenceNo)
163
// unitMachineKey returns the key of an assigned machine of the unit. An empty
164
// key means there is no machine assigned.
165
func (t *topology) unitMachineKey(serviceKey, unitKey string) (string, error) {
166
if err := t.assertUnit(serviceKey, unitKey); err != nil {
169
unit := t.topology.Services[serviceKey].Units[unitKey]
170
return unit.Machine, nil
173
// unassignUnitFromMachine unassigns the unit from its current machine.
174
func (t *topology) unassignUnitFromMachine(serviceKey, unitKey string) error {
175
if err := t.assertUnit(serviceKey, unitKey); err != nil {
178
unit := t.topology.Services[serviceKey].Units[unitKey]
179
if unit.Machine == "" {
180
return fmt.Errorf("unit %q in service %q is not assigned to a machine", unitKey, serviceKey)
186
// assertService checks if a service exists.
187
func (t *topology) assertService(serviceKey string) error {
188
if _, ok := t.topology.Services[serviceKey]; !ok {
189
return fmt.Errorf("service with key %q cannot be found", serviceKey)
194
// assertUnit checks if a service with a unit exists.
195
func (t *topology) assertUnit(serviceKey, unitKey string) error {
196
if err := t.assertService(serviceKey); err != nil {
199
svc := t.topology.Services[serviceKey]
200
if _, ok := svc.Units[unitKey]; !ok {
201
return fmt.Errorf("unit with key %q cannot be found", unitKey)
206
// parseTopology returns the topology represented by yaml.
207
func parseTopology(yaml string) (*topology, error) {
208
t := &topology{topology: &zkTopology{Version: topologyVersion}}
209
if err := goyaml.Unmarshal([]byte(yaml), t.topology); err != nil {
212
if t.topology.Version != topologyVersion {
213
return nil, fmt.Errorf("incompatible topology versions: got %d, want %d", t.topology.Version, topologyVersion)
218
// retryTopologyChange tries to change the topology with f.
219
// This function can read and modify the topology instance,
220
// and after it returns the modified topology will be
221
// persisted into the /topology node. Note that this f must
222
// have no side-effects, since it may be called multiple times
223
// depending on conflict situations.
224
func retryTopologyChange(zk *zookeeper.Conn, f func(t *topology) error) error {
225
change := func(yaml string, stat *zookeeper.Stat) (string, error) {
227
it := &topology{topology: &zkTopology{Version: 1}}
229
if it, err = parseTopology(yaml); err != nil {
233
// Apply the passed function.
234
if err = f(it); err != nil {
239
return zk.RetryChange("/topology", 0, zookeeper.WorldACL(zookeeper.PERM_ALL), change)