1
// Copyright 2016 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
8
// Observer can be implemented to find out about requests occurring in
9
// an RPC conn, for example to print requests for logging
10
// purposes. The calls should not block or interact with the Conn
11
// object as that can cause delays to the RPC server or deadlock.
12
type Observer interface {
14
// ServerRequest informs the Observer of a request made
15
// to the Conn. If the request was not recognized or there was
16
// an error reading the body, body will be nil.
18
// ServerRequest is called just before the server method
20
ServerRequest(hdr *Header, body interface{})
22
// ServerReply informs the RequestNotifier of a reply sent to a
23
// server request. The given Request gives details of the call
24
// that was made; the given Header and body are the header and
25
// body sent as reply.
27
// ServerReply is called just before the reply is written.
28
ServerReply(req Request, hdr *Header, body interface{})
31
// NewObserverMultiplexer returns a new ObserverMultiplexer
32
// with the provided RequestNotifiers.
33
func NewObserverMultiplexer(rpcObservers ...Observer) *ObserverMultiplexer {
34
return &ObserverMultiplexer{
35
rpcObservers: rpcObservers,
39
// ObserverMultiplexer multiplexes calls to an arbitrary number of
41
type ObserverMultiplexer struct {
42
rpcObservers []Observer
45
// ServerReply implements Observer.
46
func (m *ObserverMultiplexer) ServerReply(req Request, hdr *Header, body interface{}) {
47
mapConcurrent(func(n Observer) { n.ServerReply(req, hdr, body) }, m.rpcObservers)
50
// ServerRequest implements Observer.
51
func (m *ObserverMultiplexer) ServerRequest(hdr *Header, body interface{}) {
52
mapConcurrent(func(n Observer) { n.ServerRequest(hdr, body) }, m.rpcObservers)
55
// mapConcurrent calls fn on all observers concurrently and then waits
56
// for all calls to exit before returning.
57
func mapConcurrent(fn func(Observer), requestNotifiers []Observer) {
59
wg.Add(len(requestNotifiers))
62
for _, n := range requestNotifiers {
63
go func(notifier Observer) {