1
// Copyright 2015 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
7
"github.com/juju/errors"
9
"github.com/juju/juju/watcher"
10
"github.com/juju/juju/worker"
11
"github.com/juju/juju/worker/catacomb"
14
type relationUnitsWatcher struct {
15
catacomb catacomb.Catacomb
17
changes watcher.RelationUnitsChannel
18
out chan<- relationUnitsChange
21
type relationUnitsChange struct {
23
watcher.RelationUnitsChange
26
// newRelationUnitsWatcher creates a new worker that takes values from the
27
// supplied watcher's Changes chan, annotates them with the supplied relation
28
// id, and delivers then on the supplied out chan.
30
// The caller releases responsibility for stopping the supplied watcher and
31
// waiting for errors, *whether or not this method succeeds*.
32
func newRelationUnitsWatcher(
34
watcher watcher.RelationUnitsWatcher,
35
out chan<- relationUnitsChange,
36
) (*relationUnitsWatcher, error) {
37
ruw := &relationUnitsWatcher{
38
relationId: relationId,
39
changes: watcher.Changes(),
42
err := catacomb.Invoke(catacomb.Plan{
45
Init: []worker.Worker{watcher},
48
return nil, errors.Trace(err)
53
// Kill is part of the worker.Worker interface.
54
func (w *relationUnitsWatcher) Kill() {
58
// Wait is part of the worker.Worker interface.
59
func (w *relationUnitsWatcher) Wait() error {
60
return w.catacomb.Wait()
63
func (w *relationUnitsWatcher) loop() error {
66
case <-w.catacomb.Dying():
67
return w.catacomb.ErrDying()
68
case change, ok := <-w.changes:
70
return errors.New("watcher closed channel")
73
case <-w.catacomb.Dying():
74
return w.catacomb.ErrDying()
75
case w.out <- relationUnitsChange{w.relationId, change}: