1
// Copyright 2016 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
4
package stringforwarder
8
// StringForwarder is a goroutine-safe type that pipes messages from the
9
// its Forward() method, sending them to callback. The send will not be
10
// blocked by the callback, but will instead discard messages if there
11
// is an incomplete callback in progress. The number of discarded messages
12
// is tracked and returned when the forwarder is stopped.
13
type StringForwarder struct {
21
// New returns a new StringForwarder that sends messages to the callback,
22
// function, dropping messages if the receiver has not yet consumed the
24
func New(callback func(string)) *StringForwarder {
26
// Nothing to forward to, so no need to start the loop().
27
// We'll just count the discardCount.
28
return &StringForwarder{stopped: true}
30
forwarder := &StringForwarder{}
31
forwarder.cond = sync.NewCond(&forwarder.mu)
32
go forwarder.loop(callback)
36
// Forward sends the message to be processed by the callback function,
37
// discarding the message if another message is currently being processed.
38
// The number of discarded messages is recorded for reporting by the Stop
41
// Forward is safe to call from multiple goroutines at once.
42
// Note that if this StringForwarder was created with a nil callback, all
43
// messages will be discarded.
44
func (f *StringForwarder) Forward(msg string) {
46
if f.stopped || f.current != nil {
55
// Stop cleans up the goroutine running behind StringForwarder and returns the
56
// count of discarded messages. Stop is thread-safe and may be called multiple
57
// times - after the first time, it simply returns the current discard count.
58
func (f *StringForwarder) Stop() uint64 {
65
count = f.discardCount
70
// loop invokes forwarded messages with the given callback until stopped.
71
func (f *StringForwarder) loop(callback func(string)) {
75
for !f.stopped && f.current == nil {
81
f.invokeCallback(callback, *f.current)
86
// invokeCallback invokes the given callback with a message,
87
// unlocking the forwarder's mutex for the duration of the
89
func (f *StringForwarder) invokeCallback(callback func(string), msg string) {