~juju/juju-core/1.10

« back to all changes in this revision

Viewing changes to state/topology.go

  • Committer: Roger Peppe
  • Date: 2012-09-19 13:40:41 UTC
  • mto: This revision was merged to the branch mainline in revision 527.
  • Revision ID: roger.peppe@canonical.com-20120919134041-1rt4cg7ox52pt4os
mstate: rename to state

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
package state
2
 
 
3
 
import (
4
 
        "errors"
5
 
        "fmt"
6
 
        "launchpad.net/goyaml"
7
 
        "launchpad.net/gozk/zookeeper"
8
 
        "launchpad.net/juju-core/charm"
9
 
        "sort"
10
 
)
11
 
 
12
 
// The protocol version, which is stored in the /topology node under
13
 
// the "version" key. The protocol version should *only* be updated
14
 
// when we know that a version is in fact actually incompatible.
15
 
const topologyVersion = 1
16
 
 
17
 
// topoTopology is used to marshal and unmarshal the content
18
 
// of the /topology node in ZooKeeper.
19
 
type topoTopology struct {
20
 
        Version   int
21
 
        Machines  map[string]*topoMachine
22
 
        Services  map[string]*topoService
23
 
        Relations map[string]*topoRelation
24
 
}
25
 
 
26
 
// topoMachine represents the machine data within the /topology
27
 
// node in ZooKeeper.
28
 
type topoMachine struct {
29
 
}
30
 
 
31
 
// topoService represents the service data within the /topology
32
 
// node in ZooKeeper.
33
 
type topoService struct {
34
 
        Name  string
35
 
        Units map[string]*topoUnit
36
 
}
37
 
 
38
 
// topoUnit represents the unit data within the /topology
39
 
// node in ZooKeeper.
40
 
type topoUnit struct {
41
 
        Machine   string
42
 
        Principal string
43
 
}
44
 
 
45
 
// topoRelation represents the relation data within the
46
 
// /topology node in ZooKeeper.
47
 
type topoRelation struct {
48
 
        Interface string
49
 
        Scope     charm.RelationScope
50
 
        Endpoints []topoEndpoint
51
 
}
52
 
 
53
 
// topoEndpoint represents the data of one
54
 
// endpoint of a relation within the /topology
55
 
// node in ZooKeeper.
56
 
type topoEndpoint struct {
57
 
        Service      string
58
 
        RelationRole RelationRole "relation-role"
59
 
        RelationName string       "relation-name"
60
 
}
61
 
 
62
 
func (u *topoUnit) isPrincipal() bool {
63
 
        return u.Principal == ""
64
 
}
65
 
 
66
 
// check verifies that r is a proper relation.
67
 
func (r *topoRelation) check() error {
68
 
        if len(r.Interface) == 0 {
69
 
                return fmt.Errorf("relation interface is empty")
70
 
        }
71
 
        if len(r.Endpoints) == 0 {
72
 
                return fmt.Errorf("relation has no services")
73
 
        }
74
 
        for _, endpoint := range r.Endpoints {
75
 
                if endpoint.Service == "" {
76
 
                        return fmt.Errorf("relation has service with empty key")
77
 
                }
78
 
                if endpoint.RelationName == "" {
79
 
                        return fmt.Errorf("relation has %s endpoint with empty relation name", endpoint.RelationRole)
80
 
                }
81
 
                counterRole := endpoint.RelationRole.counterpartRole()
82
 
                if !r.hasEndpointWithRole(counterRole) {
83
 
                        return fmt.Errorf("relation has %s but no %s", endpoint.RelationRole, counterRole)
84
 
                }
85
 
        }
86
 
        if len(r.Endpoints) > 2 {
87
 
                return fmt.Errorf("relation with mixed peer, provider, and requirer roles")
88
 
        }
89
 
        return nil
90
 
}
91
 
 
92
 
// hasEndpointWithRole checks if the relation has a service with the given role.
93
 
func (r *topoRelation) hasEndpointWithRole(role RelationRole) bool {
94
 
        for _, endpoint := range r.Endpoints {
95
 
                if endpoint.RelationRole == role {
96
 
                        return true
97
 
                }
98
 
        }
99
 
        return false
100
 
}
101
 
 
102
 
// topology is an internal helper that handles the content
103
 
// of the /topology node in ZooKeeper.
104
 
type topology struct {
105
 
        topology *topoTopology
106
 
}
107
 
 
108
 
// readTopology connects ZooKeeper, retrieves the data as YAML,
109
 
// parses it and returns it.
110
 
func readTopology(zk *zookeeper.Conn) (*topology, error) {
111
 
        yaml, _, err := zk.Get(zkTopologyPath)
112
 
        if err != nil {
113
 
                if zookeeper.IsError(err, zookeeper.ZNONODE) {
114
 
                        // No topology node, so return empty topology.
115
 
                        return parseTopology("")
116
 
                }
117
 
                return nil, err
118
 
        }
119
 
        return parseTopology(yaml)
120
 
}
121
 
 
122
 
// dump returns the topology as YAML.
123
 
func (t *topology) dump() (string, error) {
124
 
        yaml, err := goyaml.Marshal(t.topology)
125
 
        if err != nil {
126
 
                return "", err
127
 
        }
128
 
        return string(yaml), nil
129
 
}
130
 
 
131
 
// Version returns the version of the topology.
132
 
func (t *topology) Version() int {
133
 
        return t.topology.Version
134
 
}
135
 
 
136
 
// AddMachine adds a new machine to the topology.
137
 
func (t *topology) AddMachine(key string) error {
138
 
        if t.topology.Machines == nil {
139
 
                t.topology.Machines = make(map[string]*topoMachine)
140
 
        } else if t.HasMachine(key) {
141
 
                return fmt.Errorf("attempted to add duplicated machine %q", key)
142
 
        }
143
 
        t.topology.Machines[key] = &topoMachine{}
144
 
        return nil
145
 
}
146
 
 
147
 
// RemoveMachine removes the machine with key from the topology.
148
 
func (t *topology) RemoveMachine(key string) error {
149
 
        ok, err := t.MachineHasUnits(key)
150
 
        if err != nil {
151
 
                return err
152
 
        }
153
 
        if ok {
154
 
                return fmt.Errorf("cannot remove machine %q while units are assigned", key)
155
 
        }
156
 
        // Machine exists and has no units, so remove it.
157
 
        delete(t.topology.Machines, key)
158
 
        return nil
159
 
}
160
 
 
161
 
// MachineKeys returns all machine keys.
162
 
func (t *topology) MachineKeys() []string {
163
 
        keys := []string{}
164
 
        for key, _ := range t.topology.Machines {
165
 
                keys = append(keys, key)
166
 
        }
167
 
        sort.Strings(keys)
168
 
        return keys
169
 
}
170
 
 
171
 
// HasMachine returns whether a machine with key exists.
172
 
func (t *topology) HasMachine(key string) bool {
173
 
        return t.topology.Machines[key] != nil
174
 
}
175
 
 
176
 
// MachineHasUnits returns whether the machine with key has any units assigned to it.
177
 
func (t *topology) MachineHasUnits(key string) (bool, error) {
178
 
        _, err := t.machine(key)
179
 
        if err != nil {
180
 
                return false, err
181
 
        }
182
 
        for _, service := range t.topology.Services {
183
 
                for _, unit := range service.Units {
184
 
                        if unit.Machine == key {
185
 
                                return true, nil
186
 
                        }
187
 
                }
188
 
        }
189
 
        return false, nil
190
 
}
191
 
 
192
 
// AddService adds a new service to the topology.
193
 
func (t *topology) AddService(key, name string) error {
194
 
        if t.topology.Services == nil {
195
 
                t.topology.Services = make(map[string]*topoService)
196
 
        }
197
 
        if t.HasService(key) {
198
 
                return fmt.Errorf("attempted to add duplicated service %q", key)
199
 
        }
200
 
        if _, err := t.ServiceKey(name); err == nil {
201
 
                return fmt.Errorf("service name %q already in use", name)
202
 
        }
203
 
        t.topology.Services[key] = &topoService{
204
 
                Name:  name,
205
 
                Units: make(map[string]*topoUnit),
206
 
        }
207
 
        return nil
208
 
}
209
 
 
210
 
// RemoveService removes a service from the topology.
211
 
func (t *topology) RemoveService(key string) error {
212
 
        if _, err := t.service(key); err != nil {
213
 
                return err
214
 
        }
215
 
        relations, err := t.RelationsForService(key)
216
 
        if err != nil {
217
 
                return err
218
 
        }
219
 
        if len(relations) > 0 {
220
 
                return fmt.Errorf("cannot remove service %q with active relations", key)
221
 
        }
222
 
        delete(t.topology.Services, key)
223
 
        return nil
224
 
}
225
 
 
226
 
// HasService returns true if a service with the given key exists.
227
 
func (t *topology) HasService(key string) bool {
228
 
        return t.topology.Services[key] != nil
229
 
}
230
 
 
231
 
// ServiceKey returns the key of the service with the given name.
232
 
func (t *topology) ServiceKey(name string) (string, error) {
233
 
        for key, svc := range t.topology.Services {
234
 
                if svc.Name == name {
235
 
                        return key, nil
236
 
                }
237
 
        }
238
 
        return "", fmt.Errorf("service with name %q not found", name)
239
 
}
240
 
 
241
 
// ServiceKeys returns all service keys.
242
 
func (t *topology) ServiceKeys() []string {
243
 
        keys := []string{}
244
 
        for key, _ := range t.topology.Services {
245
 
                keys = append(keys, key)
246
 
        }
247
 
        sort.Strings(keys)
248
 
        return keys
249
 
}
250
 
 
251
 
// ServiceName returns the name of the service with the given key.
252
 
func (t *topology) ServiceName(key string) (string, error) {
253
 
        if svc, ok := t.topology.Services[key]; ok {
254
 
                return svc.Name, nil
255
 
        }
256
 
        return "", fmt.Errorf("service with key %q not found", key)
257
 
}
258
 
 
259
 
// HasUnit returns true if a unit with given service and unit keys exists.
260
 
func (t *topology) HasUnit(unitKey string) bool {
261
 
        _, _, err := t.serviceAndUnit(unitKey)
262
 
        return err == nil
263
 
}
264
 
 
265
 
// AddUnit adds a new unit to the topology.
266
 
func (t *topology) AddUnit(unitKey, principalKey string) error {
267
 
        serviceKey, err := serviceKeyForUnitKey(unitKey)
268
 
        if err != nil {
269
 
                return err
270
 
        }
271
 
        svc, err := t.service(serviceKey)
272
 
        if err != nil {
273
 
                return err
274
 
        }
275
 
        if _, ok := svc.Units[unitKey]; ok {
276
 
                return fmt.Errorf("unit %q already in use", unitKey)
277
 
        }
278
 
        if principalKey != "" {
279
 
                _, pUnit, err := t.serviceAndUnit(principalKey)
280
 
                if err != nil {
281
 
                        return err
282
 
                }
283
 
                if !pUnit.isPrincipal() {
284
 
                        return fmt.Errorf("cannot add unit %q subordinate to subordinate unit %q", unitKey, principalKey)
285
 
                }
286
 
        }
287
 
        svc.Units[unitKey] = &topoUnit{
288
 
                Principal: principalKey,
289
 
        }
290
 
        return nil
291
 
}
292
 
 
293
 
// RemoveUnit removes a unit from a service.
294
 
func (t *topology) RemoveUnit(unitKey string) error {
295
 
        svc, _, err := t.serviceAndUnit(unitKey)
296
 
        if err != nil {
297
 
                return err
298
 
        }
299
 
        delete(svc.Units, unitKey)
300
 
        return nil
301
 
}
302
 
 
303
 
// UnitKeys returns the unit keys for all units of
304
 
// the service with the given service key, in alphabetical
305
 
// order.
306
 
func (t *topology) UnitKeys(serviceKey string) ([]string, error) {
307
 
        svc, err := t.service(serviceKey)
308
 
        if err != nil {
309
 
                return nil, err
310
 
        }
311
 
        keys := []string{}
312
 
        for key, _ := range svc.Units {
313
 
                keys = append(keys, key)
314
 
        }
315
 
        sort.Strings(keys)
316
 
        return keys, nil
317
 
}
318
 
 
319
 
// UnitName returns the name of the unit with the given key.
320
 
func (t *topology) UnitName(unitKey string) (string, error) {
321
 
        svc, _, err := t.serviceAndUnit(unitKey)
322
 
        if err != nil {
323
 
                return "", err
324
 
        }
325
 
        return fmt.Sprintf("%s/%d", svc.Name, keySeq(unitKey)), nil
326
 
}
327
 
 
328
 
// unitNotAssigned indicates that a unit is not assigned to a machine.
329
 
var unitNotAssigned = errors.New("unit not assigned to machine")
330
 
 
331
 
// UnitMachineKey returns the key of an assigned machine of the unit.
332
 
// If no machine is assigned, the error unitNotAssigned will be returned.
333
 
func (t *topology) UnitMachineKey(unitKey string) (string, error) {
334
 
        _, unit, err := t.serviceAndUnit(unitKey)
335
 
        if err != nil {
336
 
                return "", err
337
 
        }
338
 
        // Find the machine key from the unit's principal if it has one.
339
 
        if !unit.isPrincipal() {
340
 
                _, unit, err = t.serviceAndUnit(unit.Principal)
341
 
                if err != nil {
342
 
                        return "", fmt.Errorf("cannot find principal unit: %v", err)
343
 
                }
344
 
        }
345
 
        if unit.Machine == "" {
346
 
                return "", unitNotAssigned
347
 
        }
348
 
        return unit.Machine, nil
349
 
}
350
 
 
351
 
// AssignUnitToMachine assigns a unit and its subordinates to a machine.
352
 
// It is an error to reassign a unit that is already assigned, and it is
353
 
// an error to assign a unit of a subordinate service directly to a
354
 
// machine.
355
 
func (t *topology) AssignUnitToMachine(unitKey, machineKey string) error {
356
 
        _, unit, err := t.serviceAndUnit(unitKey)
357
 
        if err != nil {
358
 
                return err
359
 
        }
360
 
        _, err = t.machine(machineKey)
361
 
        if err != nil {
362
 
                return err
363
 
        }
364
 
        if !unit.isPrincipal() {
365
 
                return errors.New("cannot assign subordinate units directly to machines")
366
 
        }
367
 
        if unit.Machine != "" {
368
 
                return fmt.Errorf("unit %q already assigned to machine %q", unitKey, unit.Machine)
369
 
        }
370
 
        unit.Machine = machineKey
371
 
        return nil
372
 
}
373
 
 
374
 
// UnassignUnitFromMachine unassigns the unit and its subordinates
375
 
// from their current machine.
376
 
func (t *topology) UnassignUnitFromMachine(unitKey string) error {
377
 
        _, unit, err := t.serviceAndUnit(unitKey)
378
 
        if err != nil {
379
 
                return err
380
 
        }
381
 
        if unit.Machine == "" {
382
 
                return fmt.Errorf("unit %q not assigned to a machine", unitKey)
383
 
        }
384
 
        unit.Machine = ""
385
 
        return nil
386
 
}
387
 
 
388
 
// UnitsForMachine returns the keys of any units that
389
 
// have been assigned to the machine, in alphabetical order.
390
 
func (t *topology) UnitsForMachine(machineKey string) []string {
391
 
        var keys []string
392
 
        principals := make(map[string]bool)
393
 
        for _, svc := range t.topology.Services {
394
 
                for key, u := range svc.Units {
395
 
                        if u.isPrincipal() && u.Machine == machineKey {
396
 
                                keys = append(keys, key)
397
 
                                principals[key] = true
398
 
                        }
399
 
                }
400
 
        }
401
 
        // Add all subordinate units
402
 
        for _, svc := range t.topology.Services {
403
 
                for key, u := range svc.Units {
404
 
                        if !u.isPrincipal() && principals[u.Principal] {
405
 
                                keys = append(keys, key)
406
 
                        }
407
 
                }
408
 
        }
409
 
        sort.Strings(keys)
410
 
        return keys
411
 
}
412
 
 
413
 
// Relation returns the relation with key from the topology.
414
 
func (t *topology) Relation(key string) (*topoRelation, error) {
415
 
        if t.topology.Relations == nil || t.topology.Relations[key] == nil {
416
 
                return nil, fmt.Errorf("relation %q does not exist", key)
417
 
        }
418
 
        return t.topology.Relations[key], nil
419
 
}
420
 
 
421
 
// AddRelation adds a new relation with the given key and relation data.
422
 
func (t *topology) AddRelation(relationKey string, relation *topoRelation) error {
423
 
        if t.topology.Relations == nil {
424
 
                t.topology.Relations = make(map[string]*topoRelation)
425
 
        }
426
 
        _, ok := t.topology.Relations[relationKey]
427
 
        if ok {
428
 
                return fmt.Errorf("relation key %q already in use", relationKey)
429
 
        }
430
 
        // Check if the relation definition and the service keys are valid.
431
 
        if err := relation.check(); err != nil {
432
 
                return err
433
 
        }
434
 
        for _, endpoint := range relation.Endpoints {
435
 
                if _, err := t.service(endpoint.Service); err != nil {
436
 
                        return err
437
 
                }
438
 
        }
439
 
        t.topology.Relations[relationKey] = relation
440
 
        return nil
441
 
}
442
 
 
443
 
// RelationKeys returns the keys for all relations in the topology.
444
 
func (t *topology) RelationKeys() []string {
445
 
        keys := []string{}
446
 
        for key, _ := range t.topology.Relations {
447
 
                keys = append(keys, key)
448
 
        }
449
 
        sort.Strings(keys)
450
 
        return keys
451
 
}
452
 
 
453
 
// RemoveRelation removes the relation with key from the topology.
454
 
func (t *topology) RemoveRelation(key string) error {
455
 
        if _, err := t.relation(key); err != nil {
456
 
                return err
457
 
        }
458
 
        delete(t.topology.Relations, key)
459
 
        return nil
460
 
}
461
 
 
462
 
// HasRelation returns true if a relation exists with the supplied key.
463
 
func (t *topology) HasRelation(key string) bool {
464
 
        _, ok := t.topology.Relations[key]
465
 
        return ok
466
 
}
467
 
 
468
 
// RelationsForService returns all relations that the service
469
 
// with key is part of.
470
 
func (t *topology) RelationsForService(key string) (map[string]*topoRelation, error) {
471
 
        if _, err := t.service(key); err != nil {
472
 
                return nil, err
473
 
        }
474
 
        relations := make(map[string]*topoRelation)
475
 
        for relationKey, relation := range t.topology.Relations {
476
 
                for _, endpoint := range relation.Endpoints {
477
 
                        if endpoint.Service == key {
478
 
                                relations[relationKey] = relation
479
 
                                break
480
 
                        }
481
 
                }
482
 
        }
483
 
        return relations, nil
484
 
}
485
 
 
486
 
// noRelationFound indicates that an attempt to look up a relation failed.
487
 
var noRelationFound = errors.New("relation does not exist")
488
 
 
489
 
// RelationKey returns the key for the relation established between the
490
 
// provided endpoints. If no matching relation is found, noRelationFound
491
 
// will be return.
492
 
func (t *topology) RelationKey(endpoints ...RelationEndpoint) (string, error) {
493
 
        switch len(endpoints) {
494
 
        case 1:
495
 
                // Just pass.
496
 
        case 2:
497
 
                if endpoints[0].Interface != endpoints[1].Interface {
498
 
                        return "", noRelationFound
499
 
                }
500
 
        default:
501
 
                return "", fmt.Errorf("illegal number of relation endpoints provided")
502
 
        }
503
 
        serviceEndpoint := map[string]RelationEndpoint{}
504
 
        for _, endpoint := range endpoints {
505
 
                serviceKey, err := t.ServiceKey(endpoint.ServiceName)
506
 
                if err != nil {
507
 
                        return "", noRelationFound
508
 
                }
509
 
                serviceEndpoint[serviceKey] = endpoint
510
 
        }
511
 
        for relationKey, relation := range t.topology.Relations {
512
 
                if relation.Interface != endpoints[0].Interface {
513
 
                        continue
514
 
                }
515
 
                if len(relation.Endpoints) != len(endpoints) {
516
 
                        continue
517
 
                }
518
 
                found := true
519
 
                for _, tendpoint := range relation.Endpoints {
520
 
                        endpoint, ok := serviceEndpoint[tendpoint.Service]
521
 
                        if !ok || tendpoint.RelationName != endpoint.RelationName {
522
 
                                found = false
523
 
                                break
524
 
                        }
525
 
                }
526
 
                if found {
527
 
                        // All endpoints tested positive.
528
 
                        return relationKey, nil
529
 
                }
530
 
        }
531
 
        return "", noRelationFound
532
 
}
533
 
 
534
 
// machine returns the machine with the given key.
535
 
func (t *topology) machine(machineKey string) (*topoMachine, error) {
536
 
        if m, ok := t.topology.Machines[machineKey]; ok {
537
 
                return m, nil
538
 
        }
539
 
        return nil, fmt.Errorf("machine with key %q not found", machineKey)
540
 
}
541
 
 
542
 
// service returns the service for the given key.
543
 
func (t *topology) service(serviceKey string) (*topoService, error) {
544
 
        if svc, ok := t.topology.Services[serviceKey]; ok {
545
 
                return svc, nil
546
 
        }
547
 
        return nil, fmt.Errorf("service with key %q not found", serviceKey)
548
 
}
549
 
 
550
 
// serviceAndUnit returns the service and unit for the given unit key.
551
 
func (t *topology) serviceAndUnit(unitKey string) (*topoService, *topoUnit, error) {
552
 
        serviceKey, err := serviceKeyForUnitKey(unitKey)
553
 
        if err != nil {
554
 
                return nil, nil, err
555
 
        }
556
 
        svc, err := t.service(serviceKey)
557
 
        if err != nil {
558
 
                return nil, nil, err
559
 
        }
560
 
        if unit, ok := svc.Units[unitKey]; ok {
561
 
                return svc, unit, nil
562
 
        }
563
 
        return nil, nil, fmt.Errorf("unit with key %q not found", unitKey)
564
 
}
565
 
 
566
 
// relation returns the relation for the given key
567
 
func (t *topology) relation(relationKey string) (*topoRelation, error) {
568
 
        if t, ok := t.topology.Relations[relationKey]; ok {
569
 
                return t, nil
570
 
        }
571
 
        return nil, fmt.Errorf("relation with key %q not found", relationKey)
572
 
}
573
 
 
574
 
// parseTopology returns the topology represented by yaml.
575
 
func parseTopology(yaml string) (*topology, error) {
576
 
        t := &topology{topology: &topoTopology{Version: topologyVersion}}
577
 
        if err := goyaml.Unmarshal([]byte(yaml), t.topology); err != nil {
578
 
                return nil, err
579
 
        }
580
 
        if t.topology.Version != topologyVersion {
581
 
                return nil, fmt.Errorf("incompatible topology versions: got %d, want %d",
582
 
                        t.topology.Version, topologyVersion)
583
 
        }
584
 
        return t, nil
585
 
}
586
 
 
587
 
// retryTopologyChange tries to change the topology with f.
588
 
// This function can read and modify the topology instance, 
589
 
// and after it returns the modified topology will be
590
 
// persisted into the /topology node. Note that this f must
591
 
// have no side-effects, since it may be called multiple times
592
 
// depending on conflict situations.
593
 
func retryTopologyChange(zk *zookeeper.Conn, f func(t *topology) error) error {
594
 
        change := func(yaml string, stat *zookeeper.Stat) (string, error) {
595
 
                var err error
596
 
                it := &topology{topology: &topoTopology{Version: 1}}
597
 
                if yaml != "" {
598
 
                        if it, err = parseTopology(yaml); err != nil {
599
 
                                return "", err
600
 
                        }
601
 
                }
602
 
                // Apply the passed function.
603
 
                if err = f(it); err != nil {
604
 
                        return "", err
605
 
                }
606
 
                return it.dump()
607
 
        }
608
 
        return zk.RetryChange(zkTopologyPath, 0, zkPermAll, change)
609
 
}