~rogpeppe/juju-core/azure

« back to all changes in this revision

Viewing changes to rpc/client.go

  • Committer: William Reade
  • Date: 2012-01-20 21:32:53 UTC
  • mto: This revision was merged to the branch mainline in revision 47.
  • Revision ID: fwereade@gmail.com-20120120213253-csks5e12opl8t1rq
hefty rearrangement, few actual changes

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
// Copyright 2012, 2013 Canonical Ltd.
2
 
// Licensed under the AGPLv3, see LICENCE file for details.
3
 
 
4
 
package rpc
5
 
 
6
 
import (
7
 
        "errors"
8
 
 
9
 
        "launchpad.net/juju-core/log"
10
 
)
11
 
 
12
 
var ErrShutdown = errors.New("connection is shut down")
13
 
 
14
 
// Call represents an active RPC.
15
 
type Call struct {
16
 
        Type     string
17
 
        Id       string
18
 
        Request  string
19
 
        Params   interface{}
20
 
        Response interface{}
21
 
        Error    error
22
 
        Done     chan *Call
23
 
}
24
 
 
25
 
// RequestError represents an error returned from an RPC request.
26
 
type RequestError struct {
27
 
        Message string
28
 
        Code    string
29
 
}
30
 
 
31
 
func (e *RequestError) Error() string {
32
 
        m := "request error: " + e.Message
33
 
        if e.Code != "" {
34
 
                m += " (" + e.Code + ")"
35
 
        }
36
 
        return m
37
 
}
38
 
 
39
 
func (e *RequestError) ErrorCode() string {
40
 
        return e.Code
41
 
}
42
 
 
43
 
func (conn *Conn) send(call *Call) {
44
 
        conn.sending.Lock()
45
 
        defer conn.sending.Unlock()
46
 
 
47
 
        // Register this call.
48
 
        conn.mutex.Lock()
49
 
        if conn.dead == nil {
50
 
                panic("rpc: call made when connection not started")
51
 
        }
52
 
        if conn.closing || conn.shutdown {
53
 
                call.Error = ErrShutdown
54
 
                conn.mutex.Unlock()
55
 
                call.done()
56
 
                return
57
 
        }
58
 
        conn.reqId++
59
 
        reqId := conn.reqId
60
 
        conn.clientPending[reqId] = call
61
 
        conn.mutex.Unlock()
62
 
 
63
 
        // Encode and send the request.
64
 
        hdr := &Header{
65
 
                RequestId: reqId,
66
 
                Type:      call.Type,
67
 
                Id:        call.Id,
68
 
                Request:   call.Request,
69
 
        }
70
 
        params := call.Params
71
 
        if params == nil {
72
 
                params = struct{}{}
73
 
        }
74
 
        if err := conn.codec.WriteMessage(hdr, params); err != nil {
75
 
                conn.mutex.Lock()
76
 
                call = conn.clientPending[reqId]
77
 
                delete(conn.clientPending, reqId)
78
 
                conn.mutex.Unlock()
79
 
                if call != nil {
80
 
                        call.Error = err
81
 
                        call.done()
82
 
                }
83
 
        }
84
 
}
85
 
 
86
 
func (conn *Conn) handleResponse(hdr *Header) error {
87
 
        reqId := hdr.RequestId
88
 
        conn.mutex.Lock()
89
 
        call := conn.clientPending[reqId]
90
 
        delete(conn.clientPending, reqId)
91
 
        conn.mutex.Unlock()
92
 
 
93
 
        var err error
94
 
        switch {
95
 
        case call == nil:
96
 
                // We've got no pending call. That usually means that
97
 
                // WriteHeader partially failed, and call was already
98
 
                // removed; response is a server telling us about an
99
 
                // error reading request body. We should still attempt
100
 
                // to read error body, but there's no one to give it to.
101
 
                err = conn.readBody(nil, false)
102
 
        case hdr.Error != "":
103
 
                // We've got an error response. Give this to the request;
104
 
                // any subsequent requests will get the ReadResponseBody
105
 
                // error if there is one.
106
 
                call.Error = &RequestError{
107
 
                        Message: hdr.Error,
108
 
                        Code:    hdr.ErrorCode,
109
 
                }
110
 
                err = conn.readBody(nil, false)
111
 
                call.done()
112
 
        default:
113
 
                err = conn.readBody(call.Response, false)
114
 
                call.done()
115
 
        }
116
 
        return err
117
 
}
118
 
 
119
 
func (call *Call) done() {
120
 
        select {
121
 
        case call.Done <- call:
122
 
                // ok
123
 
        default:
124
 
                // We don't want to block here.  It is the caller's responsibility to make
125
 
                // sure the channel has enough buffer space. See comment in Go().
126
 
                log.Errorf("rpc: discarding Call reply due to insufficient Done chan capacity")
127
 
        }
128
 
}
129
 
 
130
 
// Call invokes the named action on the object of the given type with
131
 
// the given id.  The returned values will be stored in response, which
132
 
// should be a pointer.  If the action fails remotely, the returned
133
 
// error will be of type RequestError.  The params value may be nil if
134
 
// no parameters are provided; the response value may be nil to indicate
135
 
// that any result should be discarded.
136
 
func (conn *Conn) Call(objType, id, action string, params, response interface{}) error {
137
 
        call := <-conn.Go(objType, id, action, params, response, make(chan *Call, 1)).Done
138
 
        return call.Error
139
 
}
140
 
 
141
 
// Go invokes the request asynchronously.  It returns the Call structure representing
142
 
// the invocation.  The done channel will signal when the call is complete by returning
143
 
// the same Call object.  If done is nil, Go will allocate a new channel.
144
 
// If non-nil, done must be buffered or Go will deliberately panic.
145
 
func (conn *Conn) Go(objType, id, request string, args, response interface{}, done chan *Call) *Call {
146
 
        if done == nil {
147
 
                done = make(chan *Call, 1)
148
 
        } else {
149
 
                // If caller passes done != nil, it must arrange that
150
 
                // done has enough buffer for the number of simultaneous
151
 
                // RPCs that will be using that channel.  If the channel
152
 
                // is totally unbuffered, it's best not to run at all.
153
 
                if cap(done) == 0 {
154
 
                        panic("launchpad.net/juju-core/rpc: done channel is unbuffered")
155
 
                }
156
 
        }
157
 
        call := &Call{
158
 
                Type:     objType,
159
 
                Id:       id,
160
 
                Request:  request,
161
 
                Params:   args,
162
 
                Response: response,
163
 
                Done:     done,
164
 
        }
165
 
        conn.send(call)
166
 
        return call
167
 
}