~sidnei/juju-core/lxc-clone-with-overlayfs

« back to all changes in this revision

Viewing changes to rpc/client.go

  • Committer: Roger Peppe
  • Date: 2013-05-12 07:06:19 UTC
  • mto: This revision was merged to the branch mainline in revision 1216.
  • Revision ID: roger.peppe@canonical.com-20130512070619-hbplw4d380deocf2
rpc: refactor to be symmetrical

Show diffs side-by-side

added added

removed removed

Lines of Context:
5
5
 
6
6
import (
7
7
        "errors"
8
 
        "fmt"
9
 
        "io"
10
8
        "launchpad.net/juju-core/log"
11
 
        "sync"
12
9
)
13
10
 
14
11
var ErrShutdown = errors.New("connection is shut down")
15
12
 
16
 
// A ClientCodec implements writing of RPC requests and reading of RPC
17
 
// responses for the client side of an RPC session.  The client calls
18
 
// WriteRequest to write a request to the connection and calls
19
 
// ReadResponseHeader and ReadResponseBody in pairs to read responses.
20
 
// The client calls Close when finished with the connection.
21
 
// The params argument to WriteRequest will always be of struct
22
 
// type; the result argument to ReadResponseBody will always be
23
 
// a non-nil pointer to a struct.
24
 
type ClientCodec interface {
25
 
        WriteRequest(req *Request, params interface{}) error
26
 
        ReadResponseHeader(resp *Response) error
27
 
        ReadResponseBody(result interface{}) error
28
 
        Close() error
29
 
}
30
 
 
31
 
// Client represents an RPC Client.  There may be multiple outstanding
32
 
// Calls associated with a single Client, and a Client may be used by
33
 
// multiple goroutines simultaneously.
34
 
type Client struct {
35
 
        sending sync.Mutex
36
 
        codec   ClientCodec
37
 
        request Request
38
 
 
39
 
        mutex    sync.Mutex // protects the following fields
40
 
        reqId    uint64
41
 
        pending  map[uint64]*Call
42
 
        closing  bool
43
 
        shutdown bool
44
 
}
45
 
 
46
 
// NewClient returns a new Client to handle requests to the set of
47
 
// services at the other end of the connection.  The given codec is used
48
 
// to encode requests and decode responses.
49
 
func NewClientWithCodec(codec ClientCodec) *Client {
50
 
        client := &Client{
51
 
                codec:   codec,
52
 
                pending: make(map[uint64]*Call),
53
 
        }
54
 
        go client.input()
55
 
        return client
56
 
}
57
 
 
58
13
// Call represents an active RPC.
59
14
type Call struct {
60
15
        Type     string
66
21
        Done     chan *Call
67
22
}
68
23
 
69
 
// ServerError represents an error returned from an RPC server.
70
 
type ServerError struct {
 
24
// RequestError represents an error returned from an RPC request.
 
25
type RequestError struct {
71
26
        Message string
72
27
        Code    string
73
28
}
74
29
 
75
 
func (e *ServerError) Error() string {
76
 
        m := "server error: " + e.Message
 
30
func (e *RequestError) Error() string {
 
31
        m := "request error: " + e.Message
77
32
        if e.Code != "" {
78
33
                m += " (" + e.Code + ")"
79
34
        }
80
35
        return m
81
36
}
82
37
 
83
 
func (e *ServerError) ErrorCode() string {
 
38
func (e *RequestError) ErrorCode() string {
84
39
        return e.Code
85
40
}
86
41
 
87
 
func (client *Client) Close() error {
88
 
        client.mutex.Lock()
89
 
        if client.shutdown || client.closing {
90
 
                client.mutex.Unlock()
91
 
                return ErrShutdown
92
 
        }
93
 
        client.closing = true
94
 
        client.mutex.Unlock()
95
 
        return client.codec.Close()
96
 
}
97
 
 
98
 
func (client *Client) send(call *Call) {
99
 
        client.sending.Lock()
100
 
        defer client.sending.Unlock()
101
 
 
102
 
        // Register this call.
103
 
        client.mutex.Lock()
104
 
        if client.shutdown || client.closing {
105
 
                call.Error = ErrShutdown
106
 
                client.mutex.Unlock()
107
 
                call.done()
108
 
                return
109
 
        }
110
 
        client.reqId++
111
 
        reqId := client.reqId
112
 
        client.pending[reqId] = call
113
 
        client.mutex.Unlock()
114
 
 
115
 
        // Encode and send the request.
116
 
        client.request = Request{
117
 
                RequestId: reqId,
118
 
                Type:      call.Type,
119
 
                Id:        call.Id,
120
 
                Request:   call.Request,
121
 
        }
122
 
        params := call.Params
123
 
        if params == nil {
124
 
                params = struct{}{}
125
 
        }
126
 
        if err := client.codec.WriteRequest(&client.request, params); err != nil {
127
 
                client.mutex.Lock()
128
 
                call = client.pending[reqId]
129
 
                delete(client.pending, reqId)
130
 
                client.mutex.Unlock()
131
 
                if call != nil {
132
 
                        call.Error = err
133
 
                        call.done()
134
 
                }
135
 
        }
136
 
}
137
 
 
138
 
func (client *Client) readBody(resp interface{}) error {
139
 
        if resp == nil {
140
 
                resp = &struct{}{}
141
 
        }
142
 
        err := client.codec.ReadResponseBody(resp)
143
 
        if err != nil {
144
 
                err = fmt.Errorf("error reading body: %v", err)
145
 
        }
146
 
        return err
147
 
}
148
 
 
149
 
func (client *Client) input() {
150
 
        var err error
151
 
        var response Response
152
 
        for err == nil {
153
 
                response = Response{}
154
 
                err = client.codec.ReadResponseHeader(&response)
155
 
                if err != nil {
156
 
                        break
157
 
                }
158
 
                reqId := response.RequestId
159
 
                client.mutex.Lock()
160
 
                call := client.pending[reqId]
161
 
                delete(client.pending, reqId)
162
 
                client.mutex.Unlock()
163
 
 
164
 
                switch {
165
 
                case call == nil:
166
 
                        // We've got no pending call. That usually means that
167
 
                        // WriteRequest partially failed, and call was already
168
 
                        // removed; response is a server telling us about an
169
 
                        // error reading request body. We should still attempt
170
 
                        // to read error body, but there's no one to give it to.
171
 
                        err = client.readBody(nil)
172
 
                case response.Error != "":
173
 
                        // We've got an error response. Give this to the request;
174
 
                        // any subsequent requests will get the ReadResponseBody
175
 
                        // error if there is one.
176
 
                        call.Error = &ServerError{
177
 
                                Message: response.Error,
178
 
                                Code:    response.ErrorCode,
179
 
                        }
180
 
                        err = client.readBody(nil)
181
 
                        call.done()
182
 
                default:
183
 
                        err = client.readBody(call.Response)
184
 
                        call.done()
185
 
                }
186
 
        }
187
 
        // Terminate pending calls.
188
 
        client.sending.Lock()
189
 
        client.mutex.Lock()
190
 
        client.shutdown = true
191
 
        closing := client.closing
192
 
        if err == io.EOF {
193
 
                if closing {
194
 
                        err = ErrShutdown
195
 
                } else {
196
 
                        err = io.ErrUnexpectedEOF
197
 
                }
198
 
        }
199
 
        for _, call := range client.pending {
200
 
                call.Error = err
201
 
                call.done()
202
 
        }
203
 
        client.pending = nil
204
 
        client.mutex.Unlock()
205
 
        client.sending.Unlock()
206
 
        if err != io.EOF && !closing {
207
 
                log.Errorf("rpc: client protocol error: %v", err)
208
 
        }
209
 
}
210
 
 
211
 
func (call *Call) done() {
212
 
        select {
213
 
        case call.Done <- call:
214
 
                // ok
215
 
        default:
216
 
                // We don't want to block here.  It is the caller's responsibility to make
217
 
                // sure the channel has enough buffer space. See comment in Go().
218
 
                log.Warningf("rpc: discarding Call reply due to insufficient Done chan capacity")
219
 
        }
220
 
}
221
 
 
222
42
// Call invokes the named action on the object of the given type with
223
43
// the given id.  The returned values will be stored in response, which
224
44
// should be a pointer.  If the action fails remotely, the returned
225
 
// error will be of type ServerError.
226
 
// The params value may be nil if no parameters are provided;
227
 
// the response value may be nil to indicate that any result
228
 
// should be discarded.
229
 
func (c *Client) Call(objType, id, action string, params, response interface{}) error {
230
 
        call := <-c.Go(objType, id, action, params, response, make(chan *Call, 1)).Done
 
45
// error will be of type RequestError.  The params value may be nil if
 
46
// no parameters are provided; the response value may be nil to indicate
 
47
// that any result should be discarded.
 
48
func (conn *Conn) Call(objType, id, action string, params, response interface{}) error {
 
49
        call := <-conn.Go(objType, id, action, params, response, make(chan *Call, 1)).Done
231
50
        return call.Error
232
51
}
233
52
 
235
54
// the invocation.  The done channel will signal when the call is complete by returning
236
55
// the same Call object.  If done is nil, Go will allocate a new channel.
237
56
// If non-nil, done must be buffered or Go will deliberately panic.
238
 
func (c *Client) Go(objType, id, request string, args, response interface{}, done chan *Call) *Call {
 
57
func (conn *Conn) Go(objType, id, request string, args, response interface{}, done chan *Call) *Call {
239
58
        if done == nil {
240
 
                done = make(chan *Call, 10) // buffered.
 
59
                done = make(chan *Call, 1)
241
60
        } else {
242
61
                // If caller passes done != nil, it must arrange that
243
62
                // done has enough buffer for the number of simultaneous
255
74
                Response: response,
256
75
                Done:     done,
257
76
        }
258
 
        c.send(call)
 
77
        conn.send(call)
259
78
        return call
260
79
}
 
80
 
 
81
func (conn *Conn) send(call *Call) {
 
82
        conn.sending.Lock()
 
83
        defer conn.sending.Unlock()
 
84
 
 
85
        // Register this call.
 
86
        conn.clientMutex.Lock()
 
87
        if conn.shutdown || conn.closing {
 
88
                call.Error = ErrShutdown
 
89
                conn.clientMutex.Unlock()
 
90
                call.done()
 
91
                return
 
92
        }
 
93
        conn.reqId++
 
94
        reqId := conn.reqId
 
95
        conn.clientPending[reqId] = call
 
96
        conn.clientMutex.Unlock()
 
97
 
 
98
        // Encode and send the request.
 
99
        hdr := &Header{
 
100
                RequestId: reqId,
 
101
                Type:      call.Type,
 
102
                Id:        call.Id,
 
103
                Request:   call.Request,
 
104
        }
 
105
        params := call.Params
 
106
        if params == nil {
 
107
                params = struct{}{}
 
108
        }
 
109
        if err := conn.codec.WriteMessage(hdr, params); err != nil {
 
110
                conn.clientMutex.Lock()
 
111
                call = conn.clientPending[reqId]
 
112
                delete(conn.clientPending, reqId)
 
113
                conn.clientMutex.Unlock()
 
114
                if call != nil {
 
115
                        call.Error = err
 
116
                        call.done()
 
117
                }
 
118
        }
 
119
}
 
120
 
 
121
func (conn *Conn) handleResponse(hdr *Header) error {
 
122
        reqId := hdr.RequestId
 
123
        conn.clientMutex.Lock()
 
124
        call := conn.clientPending[reqId]
 
125
        delete(conn.clientPending, reqId)
 
126
        conn.clientMutex.Unlock()
 
127
 
 
128
        var err error
 
129
        switch {
 
130
        case call == nil:
 
131
                // We've got no pending call. That usually means that
 
132
                // WriteHeader partially failed, and call was already
 
133
                // removed; response is a server telling us about an
 
134
                // error reading request body. We should still attempt
 
135
                // to read error body, but there's no one to give it to.
 
136
                err = conn.readBody(nil, false)
 
137
        case hdr.Error != "":
 
138
                // We've got an error response. Give this to the request;
 
139
                // any subsequent requests will get the ReadResponseBody
 
140
                // error if there is one.
 
141
                call.Error = &RequestError{
 
142
                        Message: hdr.Error,
 
143
                        Code:    hdr.ErrorCode,
 
144
                }
 
145
                err = conn.readBody(nil, false)
 
146
                call.done()
 
147
        default:
 
148
                err = conn.readBody(call.Response, false)
 
149
                call.done()
 
150
        }
 
151
        return err
 
152
}
 
153
 
 
154
func (call *Call) done() {
 
155
        select {
 
156
        case call.Done <- call:
 
157
                // ok
 
158
        default:
 
159
                // We don't want to block here.  It is the caller's responsibility to make
 
160
                // sure the channel has enough buffer space. See comment in Go().
 
161
                log.Errorf("rpc: discarding Call reply due to insufficient Done chan capacity")
 
162
        }
 
163
}