10
8
"launchpad.net/juju-core/log"
14
11
var ErrShutdown = errors.New("connection is shut down")
16
// A ClientCodec implements writing of RPC requests and reading of RPC
17
// responses for the client side of an RPC session. The client calls
18
// WriteRequest to write a request to the connection and calls
19
// ReadResponseHeader and ReadResponseBody in pairs to read responses.
20
// The client calls Close when finished with the connection.
21
// The params argument to WriteRequest will always be of struct
22
// type; the result argument to ReadResponseBody will always be
23
// a non-nil pointer to a struct.
24
type ClientCodec interface {
25
WriteRequest(req *Request, params interface{}) error
26
ReadResponseHeader(resp *Response) error
27
ReadResponseBody(result interface{}) error
31
// Client represents an RPC Client. There may be multiple outstanding
32
// Calls associated with a single Client, and a Client may be used by
33
// multiple goroutines simultaneously.
39
mutex sync.Mutex // protects the following fields
41
pending map[uint64]*Call
46
// NewClient returns a new Client to handle requests to the set of
47
// services at the other end of the connection. The given codec is used
48
// to encode requests and decode responses.
49
func NewClientWithCodec(codec ClientCodec) *Client {
52
pending: make(map[uint64]*Call),
58
13
// Call represents an active RPC.
69
// ServerError represents an error returned from an RPC server.
70
type ServerError struct {
24
// RequestError represents an error returned from an RPC request.
25
type RequestError struct {
75
func (e *ServerError) Error() string {
76
m := "server error: " + e.Message
30
func (e *RequestError) Error() string {
31
m := "request error: " + e.Message
78
33
m += " (" + e.Code + ")"
83
func (e *ServerError) ErrorCode() string {
38
func (e *RequestError) ErrorCode() string {
87
func (client *Client) Close() error {
89
if client.shutdown || client.closing {
95
return client.codec.Close()
98
func (client *Client) send(call *Call) {
100
defer client.sending.Unlock()
102
// Register this call.
104
if client.shutdown || client.closing {
105
call.Error = ErrShutdown
106
client.mutex.Unlock()
111
reqId := client.reqId
112
client.pending[reqId] = call
113
client.mutex.Unlock()
115
// Encode and send the request.
116
client.request = Request{
120
Request: call.Request,
122
params := call.Params
126
if err := client.codec.WriteRequest(&client.request, params); err != nil {
128
call = client.pending[reqId]
129
delete(client.pending, reqId)
130
client.mutex.Unlock()
138
func (client *Client) readBody(resp interface{}) error {
142
err := client.codec.ReadResponseBody(resp)
144
err = fmt.Errorf("error reading body: %v", err)
149
func (client *Client) input() {
151
var response Response
153
response = Response{}
154
err = client.codec.ReadResponseHeader(&response)
158
reqId := response.RequestId
160
call := client.pending[reqId]
161
delete(client.pending, reqId)
162
client.mutex.Unlock()
166
// We've got no pending call. That usually means that
167
// WriteRequest partially failed, and call was already
168
// removed; response is a server telling us about an
169
// error reading request body. We should still attempt
170
// to read error body, but there's no one to give it to.
171
err = client.readBody(nil)
172
case response.Error != "":
173
// We've got an error response. Give this to the request;
174
// any subsequent requests will get the ReadResponseBody
175
// error if there is one.
176
call.Error = &ServerError{
177
Message: response.Error,
178
Code: response.ErrorCode,
180
err = client.readBody(nil)
183
err = client.readBody(call.Response)
187
// Terminate pending calls.
188
client.sending.Lock()
190
client.shutdown = true
191
closing := client.closing
196
err = io.ErrUnexpectedEOF
199
for _, call := range client.pending {
204
client.mutex.Unlock()
205
client.sending.Unlock()
206
if err != io.EOF && !closing {
207
log.Errorf("rpc: client protocol error: %v", err)
211
func (call *Call) done() {
213
case call.Done <- call:
216
// We don't want to block here. It is the caller's responsibility to make
217
// sure the channel has enough buffer space. See comment in Go().
218
log.Warningf("rpc: discarding Call reply due to insufficient Done chan capacity")
222
42
// Call invokes the named action on the object of the given type with
223
43
// the given id. The returned values will be stored in response, which
224
44
// should be a pointer. If the action fails remotely, the returned
225
// error will be of type ServerError.
226
// The params value may be nil if no parameters are provided;
227
// the response value may be nil to indicate that any result
228
// should be discarded.
229
func (c *Client) Call(objType, id, action string, params, response interface{}) error {
230
call := <-c.Go(objType, id, action, params, response, make(chan *Call, 1)).Done
45
// error will be of type RequestError. The params value may be nil if
46
// no parameters are provided; the response value may be nil to indicate
47
// that any result should be discarded.
48
func (conn *Conn) Call(objType, id, action string, params, response interface{}) error {
49
call := <-conn.Go(objType, id, action, params, response, make(chan *Call, 1)).Done
235
54
// the invocation. The done channel will signal when the call is complete by returning
236
55
// the same Call object. If done is nil, Go will allocate a new channel.
237
56
// If non-nil, done must be buffered or Go will deliberately panic.
238
func (c *Client) Go(objType, id, request string, args, response interface{}, done chan *Call) *Call {
57
func (conn *Conn) Go(objType, id, request string, args, response interface{}, done chan *Call) *Call {
240
done = make(chan *Call, 10) // buffered.
59
done = make(chan *Call, 1)
242
61
// If caller passes done != nil, it must arrange that
243
62
// done has enough buffer for the number of simultaneous
255
74
Response: response,
81
func (conn *Conn) send(call *Call) {
83
defer conn.sending.Unlock()
85
// Register this call.
86
conn.clientMutex.Lock()
87
if conn.shutdown || conn.closing {
88
call.Error = ErrShutdown
89
conn.clientMutex.Unlock()
95
conn.clientPending[reqId] = call
96
conn.clientMutex.Unlock()
98
// Encode and send the request.
103
Request: call.Request,
105
params := call.Params
109
if err := conn.codec.WriteMessage(hdr, params); err != nil {
110
conn.clientMutex.Lock()
111
call = conn.clientPending[reqId]
112
delete(conn.clientPending, reqId)
113
conn.clientMutex.Unlock()
121
func (conn *Conn) handleResponse(hdr *Header) error {
122
reqId := hdr.RequestId
123
conn.clientMutex.Lock()
124
call := conn.clientPending[reqId]
125
delete(conn.clientPending, reqId)
126
conn.clientMutex.Unlock()
131
// We've got no pending call. That usually means that
132
// WriteHeader partially failed, and call was already
133
// removed; response is a server telling us about an
134
// error reading request body. We should still attempt
135
// to read error body, but there's no one to give it to.
136
err = conn.readBody(nil, false)
137
case hdr.Error != "":
138
// We've got an error response. Give this to the request;
139
// any subsequent requests will get the ReadResponseBody
140
// error if there is one.
141
call.Error = &RequestError{
145
err = conn.readBody(nil, false)
148
err = conn.readBody(call.Response, false)
154
func (call *Call) done() {
156
case call.Done <- call:
159
// We don't want to block here. It is the caller's responsibility to make
160
// sure the channel has enough buffer space. See comment in Go().
161
log.Errorf("rpc: discarding Call reply due to insufficient Done chan capacity")