~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/worker/uniter/remotestate/relationunits.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2015 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package remotestate
 
5
 
 
6
import (
 
7
        "github.com/juju/errors"
 
8
 
 
9
        "github.com/juju/juju/watcher"
 
10
        "github.com/juju/juju/worker"
 
11
        "github.com/juju/juju/worker/catacomb"
 
12
)
 
13
 
 
14
type relationUnitsWatcher struct {
 
15
        catacomb   catacomb.Catacomb
 
16
        relationId int
 
17
        changes    watcher.RelationUnitsChannel
 
18
        out        chan<- relationUnitsChange
 
19
}
 
20
 
 
21
type relationUnitsChange struct {
 
22
        relationId int
 
23
        watcher.RelationUnitsChange
 
24
}
 
25
 
 
26
// newRelationUnitsWatcher creates a new worker that takes values from the
 
27
// supplied watcher's Changes chan, annotates them with the supplied relation
 
28
// id, and delivers then on the supplied out chan.
 
29
//
 
30
// The caller releases responsibility for stopping the supplied watcher and
 
31
// waiting for errors, *whether or not this method succeeds*.
 
32
func newRelationUnitsWatcher(
 
33
        relationId int,
 
34
        watcher watcher.RelationUnitsWatcher,
 
35
        out chan<- relationUnitsChange,
 
36
) (*relationUnitsWatcher, error) {
 
37
        ruw := &relationUnitsWatcher{
 
38
                relationId: relationId,
 
39
                changes:    watcher.Changes(),
 
40
                out:        out,
 
41
        }
 
42
        err := catacomb.Invoke(catacomb.Plan{
 
43
                Site: &ruw.catacomb,
 
44
                Work: ruw.loop,
 
45
                Init: []worker.Worker{watcher},
 
46
        })
 
47
        if err != nil {
 
48
                return nil, errors.Trace(err)
 
49
        }
 
50
        return ruw, nil
 
51
}
 
52
 
 
53
// Kill is part of the worker.Worker interface.
 
54
func (w *relationUnitsWatcher) Kill() {
 
55
        w.catacomb.Kill(nil)
 
56
}
 
57
 
 
58
// Wait is part of the worker.Worker interface.
 
59
func (w *relationUnitsWatcher) Wait() error {
 
60
        return w.catacomb.Wait()
 
61
}
 
62
 
 
63
func (w *relationUnitsWatcher) loop() error {
 
64
        for {
 
65
                select {
 
66
                case <-w.catacomb.Dying():
 
67
                        return w.catacomb.ErrDying()
 
68
                case change, ok := <-w.changes:
 
69
                        if !ok {
 
70
                                return errors.New("watcher closed channel")
 
71
                        }
 
72
                        select {
 
73
                        case <-w.catacomb.Dying():
 
74
                                return w.catacomb.ErrDying()
 
75
                        case w.out <- relationUnitsChange{w.relationId, change}:
 
76
                        }
 
77
                }
 
78
        }
 
79
}