7
"launchpad.net/gozk/zookeeper"
8
"launchpad.net/juju-core/charm"
12
// The protocol version, which is stored in the /topology node under
13
// the "version" key. The protocol version should *only* be updated
14
// when we know that a version is in fact actually incompatible.
15
const topologyVersion = 1
17
// topoTopology is used to marshal and unmarshal the content
18
// of the /topology node in ZooKeeper.
19
type topoTopology struct {
21
Machines map[string]*topoMachine
22
Services map[string]*topoService
23
Relations map[string]*topoRelation
26
// topoMachine represents the machine data within the /topology
28
type topoMachine struct {
31
// topoService represents the service data within the /topology
33
type topoService struct {
35
Units map[string]*topoUnit
38
// topoUnit represents the unit data within the /topology
40
type topoUnit struct {
45
// topoRelation represents the relation data within the
46
// /topology node in ZooKeeper.
47
type topoRelation struct {
49
Scope charm.RelationScope
50
Endpoints []topoEndpoint
53
// topoEndpoint represents the data of one
54
// endpoint of a relation within the /topology
56
type topoEndpoint struct {
58
RelationRole RelationRole "relation-role"
59
RelationName string "relation-name"
62
func (u *topoUnit) isPrincipal() bool {
63
return u.Principal == ""
66
// check verifies that r is a proper relation.
67
func (r *topoRelation) check() error {
68
if len(r.Interface) == 0 {
69
return fmt.Errorf("relation interface is empty")
71
if len(r.Endpoints) == 0 {
72
return fmt.Errorf("relation has no services")
74
for _, endpoint := range r.Endpoints {
75
if endpoint.Service == "" {
76
return fmt.Errorf("relation has service with empty key")
78
if endpoint.RelationName == "" {
79
return fmt.Errorf("relation has %s endpoint with empty relation name", endpoint.RelationRole)
81
counterRole := endpoint.RelationRole.counterpartRole()
82
if !r.hasEndpointWithRole(counterRole) {
83
return fmt.Errorf("relation has %s but no %s", endpoint.RelationRole, counterRole)
86
if len(r.Endpoints) > 2 {
87
return fmt.Errorf("relation with mixed peer, provider, and requirer roles")
92
// hasEndpointWithRole checks if the relation has a service with the given role.
93
func (r *topoRelation) hasEndpointWithRole(role RelationRole) bool {
94
for _, endpoint := range r.Endpoints {
95
if endpoint.RelationRole == role {
102
// topology is an internal helper that handles the content
103
// of the /topology node in ZooKeeper.
104
type topology struct {
105
topology *topoTopology
108
// readTopology connects ZooKeeper, retrieves the data as YAML,
109
// parses it and returns it.
110
func readTopology(zk *zookeeper.Conn) (*topology, error) {
111
yaml, _, err := zk.Get(zkTopologyPath)
113
if zookeeper.IsError(err, zookeeper.ZNONODE) {
114
// No topology node, so return empty topology.
115
return parseTopology("")
119
return parseTopology(yaml)
122
// dump returns the topology as YAML.
123
func (t *topology) dump() (string, error) {
124
yaml, err := goyaml.Marshal(t.topology)
128
return string(yaml), nil
131
// Version returns the version of the topology.
132
func (t *topology) Version() int {
133
return t.topology.Version
136
// AddMachine adds a new machine to the topology.
137
func (t *topology) AddMachine(key string) error {
138
if t.topology.Machines == nil {
139
t.topology.Machines = make(map[string]*topoMachine)
140
} else if t.HasMachine(key) {
141
return fmt.Errorf("attempted to add duplicated machine %q", key)
143
t.topology.Machines[key] = &topoMachine{}
147
// RemoveMachine removes the machine with key from the topology.
148
func (t *topology) RemoveMachine(key string) error {
149
ok, err := t.MachineHasUnits(key)
154
return fmt.Errorf("cannot remove machine %q while units are assigned", key)
156
// Machine exists and has no units, so remove it.
157
delete(t.topology.Machines, key)
161
// MachineKeys returns all machine keys.
162
func (t *topology) MachineKeys() []string {
164
for key, _ := range t.topology.Machines {
165
keys = append(keys, key)
171
// HasMachine returns whether a machine with key exists.
172
func (t *topology) HasMachine(key string) bool {
173
return t.topology.Machines[key] != nil
176
// MachineHasUnits returns whether the machine with key has any units assigned to it.
177
func (t *topology) MachineHasUnits(key string) (bool, error) {
178
_, err := t.machine(key)
182
for _, service := range t.topology.Services {
183
for _, unit := range service.Units {
184
if unit.Machine == key {
192
// AddService adds a new service to the topology.
193
func (t *topology) AddService(key, name string) error {
194
if t.topology.Services == nil {
195
t.topology.Services = make(map[string]*topoService)
197
if t.HasService(key) {
198
return fmt.Errorf("attempted to add duplicated service %q", key)
200
if _, err := t.ServiceKey(name); err == nil {
201
return fmt.Errorf("service name %q already in use", name)
203
t.topology.Services[key] = &topoService{
205
Units: make(map[string]*topoUnit),
210
// RemoveService removes a service from the topology.
211
func (t *topology) RemoveService(key string) error {
212
if _, err := t.service(key); err != nil {
215
relations, err := t.RelationsForService(key)
219
if len(relations) > 0 {
220
return fmt.Errorf("cannot remove service %q with active relations", key)
222
delete(t.topology.Services, key)
226
// HasService returns true if a service with the given key exists.
227
func (t *topology) HasService(key string) bool {
228
return t.topology.Services[key] != nil
231
// ServiceKey returns the key of the service with the given name.
232
func (t *topology) ServiceKey(name string) (string, error) {
233
for key, svc := range t.topology.Services {
234
if svc.Name == name {
238
return "", fmt.Errorf("service with name %q not found", name)
241
// ServiceKeys returns all service keys.
242
func (t *topology) ServiceKeys() []string {
244
for key, _ := range t.topology.Services {
245
keys = append(keys, key)
251
// ServiceName returns the name of the service with the given key.
252
func (t *topology) ServiceName(key string) (string, error) {
253
if svc, ok := t.topology.Services[key]; ok {
256
return "", fmt.Errorf("service with key %q not found", key)
259
// HasUnit returns true if a unit with given service and unit keys exists.
260
func (t *topology) HasUnit(unitKey string) bool {
261
_, _, err := t.serviceAndUnit(unitKey)
265
// AddUnit adds a new unit to the topology.
266
func (t *topology) AddUnit(unitKey, principalKey string) error {
267
serviceKey, err := serviceKeyForUnitKey(unitKey)
271
svc, err := t.service(serviceKey)
275
if _, ok := svc.Units[unitKey]; ok {
276
return fmt.Errorf("unit %q already in use", unitKey)
278
if principalKey != "" {
279
_, pUnit, err := t.serviceAndUnit(principalKey)
283
if !pUnit.isPrincipal() {
284
return fmt.Errorf("cannot add unit %q subordinate to subordinate unit %q", unitKey, principalKey)
287
svc.Units[unitKey] = &topoUnit{
288
Principal: principalKey,
293
// RemoveUnit removes a unit from a service.
294
func (t *topology) RemoveUnit(unitKey string) error {
295
svc, _, err := t.serviceAndUnit(unitKey)
299
delete(svc.Units, unitKey)
303
// UnitKeys returns the unit keys for all units of
304
// the service with the given service key, in alphabetical
306
func (t *topology) UnitKeys(serviceKey string) ([]string, error) {
307
svc, err := t.service(serviceKey)
312
for key, _ := range svc.Units {
313
keys = append(keys, key)
319
// UnitName returns the name of the unit with the given key.
320
func (t *topology) UnitName(unitKey string) (string, error) {
321
svc, _, err := t.serviceAndUnit(unitKey)
325
return fmt.Sprintf("%s/%d", svc.Name, keySeq(unitKey)), nil
328
// unitNotAssigned indicates that a unit is not assigned to a machine.
329
var unitNotAssigned = errors.New("unit not assigned to machine")
331
// UnitMachineKey returns the key of an assigned machine of the unit.
332
// If no machine is assigned, the error unitNotAssigned will be returned.
333
func (t *topology) UnitMachineKey(unitKey string) (string, error) {
334
_, unit, err := t.serviceAndUnit(unitKey)
338
// Find the machine key from the unit's principal if it has one.
339
if !unit.isPrincipal() {
340
_, unit, err = t.serviceAndUnit(unit.Principal)
342
return "", fmt.Errorf("cannot find principal unit: %v", err)
345
if unit.Machine == "" {
346
return "", unitNotAssigned
348
return unit.Machine, nil
351
// AssignUnitToMachine assigns a unit and its subordinates to a machine.
352
// It is an error to reassign a unit that is already assigned, and it is
353
// an error to assign a unit of a subordinate service directly to a
355
func (t *topology) AssignUnitToMachine(unitKey, machineKey string) error {
356
_, unit, err := t.serviceAndUnit(unitKey)
360
_, err = t.machine(machineKey)
364
if !unit.isPrincipal() {
365
return errors.New("cannot assign subordinate units directly to machines")
367
if unit.Machine != "" {
368
return fmt.Errorf("unit %q already assigned to machine %q", unitKey, unit.Machine)
370
unit.Machine = machineKey
374
// UnassignUnitFromMachine unassigns the unit and its subordinates
375
// from their current machine.
376
func (t *topology) UnassignUnitFromMachine(unitKey string) error {
377
_, unit, err := t.serviceAndUnit(unitKey)
381
if unit.Machine == "" {
382
return fmt.Errorf("unit %q not assigned to a machine", unitKey)
388
// UnitsForMachine returns the keys of any units that
389
// have been assigned to the machine, in alphabetical order.
390
func (t *topology) UnitsForMachine(machineKey string) []string {
392
principals := make(map[string]bool)
393
for _, svc := range t.topology.Services {
394
for key, u := range svc.Units {
395
if u.isPrincipal() && u.Machine == machineKey {
396
keys = append(keys, key)
397
principals[key] = true
401
// Add all subordinate units
402
for _, svc := range t.topology.Services {
403
for key, u := range svc.Units {
404
if !u.isPrincipal() && principals[u.Principal] {
405
keys = append(keys, key)
413
// Relation returns the relation with key from the topology.
414
func (t *topology) Relation(key string) (*topoRelation, error) {
415
if t.topology.Relations == nil || t.topology.Relations[key] == nil {
416
return nil, fmt.Errorf("relation %q does not exist", key)
418
return t.topology.Relations[key], nil
421
// AddRelation adds a new relation with the given key and relation data.
422
func (t *topology) AddRelation(relationKey string, relation *topoRelation) error {
423
if t.topology.Relations == nil {
424
t.topology.Relations = make(map[string]*topoRelation)
426
_, ok := t.topology.Relations[relationKey]
428
return fmt.Errorf("relation key %q already in use", relationKey)
430
// Check if the relation definition and the service keys are valid.
431
if err := relation.check(); err != nil {
434
for _, endpoint := range relation.Endpoints {
435
if _, err := t.service(endpoint.Service); err != nil {
439
t.topology.Relations[relationKey] = relation
443
// RelationKeys returns the keys for all relations in the topology.
444
func (t *topology) RelationKeys() []string {
446
for key, _ := range t.topology.Relations {
447
keys = append(keys, key)
453
// RemoveRelation removes the relation with key from the topology.
454
func (t *topology) RemoveRelation(key string) error {
455
if _, err := t.relation(key); err != nil {
458
delete(t.topology.Relations, key)
462
// HasRelation returns true if a relation exists with the supplied key.
463
func (t *topology) HasRelation(key string) bool {
464
_, ok := t.topology.Relations[key]
468
// RelationsForService returns all relations that the service
469
// with key is part of.
470
func (t *topology) RelationsForService(key string) (map[string]*topoRelation, error) {
471
if _, err := t.service(key); err != nil {
474
relations := make(map[string]*topoRelation)
475
for relationKey, relation := range t.topology.Relations {
476
for _, endpoint := range relation.Endpoints {
477
if endpoint.Service == key {
478
relations[relationKey] = relation
483
return relations, nil
486
// noRelationFound indicates that an attempt to look up a relation failed.
487
var noRelationFound = errors.New("relation does not exist")
489
// RelationKey returns the key for the relation established between the
490
// provided endpoints. If no matching relation is found, noRelationFound
492
func (t *topology) RelationKey(endpoints ...RelationEndpoint) (string, error) {
493
switch len(endpoints) {
497
if endpoints[0].Interface != endpoints[1].Interface {
498
return "", noRelationFound
501
return "", fmt.Errorf("illegal number of relation endpoints provided")
503
serviceEndpoint := map[string]RelationEndpoint{}
504
for _, endpoint := range endpoints {
505
serviceKey, err := t.ServiceKey(endpoint.ServiceName)
507
return "", noRelationFound
509
serviceEndpoint[serviceKey] = endpoint
511
for relationKey, relation := range t.topology.Relations {
512
if relation.Interface != endpoints[0].Interface {
515
if len(relation.Endpoints) != len(endpoints) {
519
for _, tendpoint := range relation.Endpoints {
520
endpoint, ok := serviceEndpoint[tendpoint.Service]
521
if !ok || tendpoint.RelationName != endpoint.RelationName {
527
// All endpoints tested positive.
528
return relationKey, nil
531
return "", noRelationFound
534
// machine returns the machine with the given key.
535
func (t *topology) machine(machineKey string) (*topoMachine, error) {
536
if m, ok := t.topology.Machines[machineKey]; ok {
539
return nil, fmt.Errorf("machine with key %q not found", machineKey)
542
// service returns the service for the given key.
543
func (t *topology) service(serviceKey string) (*topoService, error) {
544
if svc, ok := t.topology.Services[serviceKey]; ok {
547
return nil, fmt.Errorf("service with key %q not found", serviceKey)
550
// serviceAndUnit returns the service and unit for the given unit key.
551
func (t *topology) serviceAndUnit(unitKey string) (*topoService, *topoUnit, error) {
552
serviceKey, err := serviceKeyForUnitKey(unitKey)
556
svc, err := t.service(serviceKey)
560
if unit, ok := svc.Units[unitKey]; ok {
561
return svc, unit, nil
563
return nil, nil, fmt.Errorf("unit with key %q not found", unitKey)
566
// relation returns the relation for the given key
567
func (t *topology) relation(relationKey string) (*topoRelation, error) {
568
if t, ok := t.topology.Relations[relationKey]; ok {
571
return nil, fmt.Errorf("relation with key %q not found", relationKey)
574
// parseTopology returns the topology represented by yaml.
575
func parseTopology(yaml string) (*topology, error) {
576
t := &topology{topology: &topoTopology{Version: topologyVersion}}
577
if err := goyaml.Unmarshal([]byte(yaml), t.topology); err != nil {
580
if t.topology.Version != topologyVersion {
581
return nil, fmt.Errorf("incompatible topology versions: got %d, want %d",
582
t.topology.Version, topologyVersion)
587
// retryTopologyChange tries to change the topology with f.
588
// This function can read and modify the topology instance,
589
// and after it returns the modified topology will be
590
// persisted into the /topology node. Note that this f must
591
// have no side-effects, since it may be called multiple times
592
// depending on conflict situations.
593
func retryTopologyChange(zk *zookeeper.Conn, f func(t *topology) error) error {
594
change := func(yaml string, stat *zookeeper.Stat) (string, error) {
596
it := &topology{topology: &topoTopology{Version: 1}}
598
if it, err = parseTopology(yaml); err != nil {
602
// Apply the passed function.
603
if err = f(it); err != nil {
608
return zk.RetryChange(zkTopologyPath, 0, zkPermAll, change)