~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/utils/stringforwarder/stringforwarder.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2016 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
package stringforwarder
 
5
 
 
6
import "sync"
 
7
 
 
8
// StringForwarder is a goroutine-safe type that pipes messages from the
 
9
// its Forward() method, sending them to callback.  The send will not be
 
10
// blocked by the callback, but will instead discard messages if there
 
11
// is an incomplete callback in progress. The number of discarded messages
 
12
// is tracked and returned when the forwarder is stopped.
 
13
type StringForwarder struct {
 
14
        mu           sync.Mutex
 
15
        cond         *sync.Cond
 
16
        current      *string
 
17
        stopped      bool
 
18
        discardCount uint64
 
19
}
 
20
 
 
21
// New returns a new StringForwarder that sends messages to the callback,
 
22
// function, dropping messages if the receiver has not yet consumed the
 
23
// last message.
 
24
func New(callback func(string)) *StringForwarder {
 
25
        if callback == nil {
 
26
                // Nothing to forward to, so no need to start the loop().
 
27
                // We'll just count the discardCount.
 
28
                return &StringForwarder{stopped: true}
 
29
        }
 
30
        forwarder := &StringForwarder{}
 
31
        forwarder.cond = sync.NewCond(&forwarder.mu)
 
32
        go forwarder.loop(callback)
 
33
        return forwarder
 
34
}
 
35
 
 
36
// Forward sends the message to be processed by the callback function,
 
37
// discarding the message if another message is currently being processed.
 
38
// The number of discarded messages is recorded for reporting by the Stop
 
39
// method.
 
40
//
 
41
// Forward is safe to call from multiple goroutines at once.
 
42
// Note that if this StringForwarder was created with a nil callback, all
 
43
// messages will be discarded.
 
44
func (f *StringForwarder) Forward(msg string) {
 
45
        f.mu.Lock()
 
46
        if f.stopped || f.current != nil {
 
47
                f.discardCount++
 
48
        } else {
 
49
                f.current = &msg
 
50
                f.cond.Signal()
 
51
        }
 
52
        f.mu.Unlock()
 
53
}
 
54
 
 
55
// Stop cleans up the goroutine running behind StringForwarder and returns the
 
56
// count of discarded messages. Stop is thread-safe and may be called multiple
 
57
// times - after the first time, it simply returns the current discard count.
 
58
func (f *StringForwarder) Stop() uint64 {
 
59
        var count uint64
 
60
        f.mu.Lock()
 
61
        if !f.stopped {
 
62
                f.stopped = true
 
63
                f.cond.Signal()
 
64
        }
 
65
        count = f.discardCount
 
66
        f.mu.Unlock()
 
67
        return count
 
68
}
 
69
 
 
70
// loop invokes forwarded messages with the given callback until stopped.
 
71
func (f *StringForwarder) loop(callback func(string)) {
 
72
        f.mu.Lock()
 
73
        defer f.mu.Unlock()
 
74
        for {
 
75
                for !f.stopped && f.current == nil {
 
76
                        f.cond.Wait()
 
77
                }
 
78
                if f.current == nil {
 
79
                        return
 
80
                }
 
81
                f.invokeCallback(callback, *f.current)
 
82
                f.current = nil
 
83
        }
 
84
}
 
85
 
 
86
// invokeCallback invokes the given callback with a message,
 
87
// unlocking the forwarder's mutex for the duration of the
 
88
// callback.
 
89
func (f *StringForwarder) invokeCallback(callback func(string), msg string) {
 
90
        f.mu.Unlock()
 
91
        defer f.mu.Lock()
 
92
        callback(msg)
 
93
}