4
"code.google.com/p/go.net/websocket"
8
"launchpad.net/juju-core/log"
9
"launchpad.net/juju-core/rpc"
10
"launchpad.net/juju-core/state"
25
// Serve serves the given state by accepting requests
26
// on the given listener, using the given certificate
27
// and key (in PEM format) for authentication.
28
func NewServer(s *state.State, addr string, cert, key []byte) (*Server, error) {
29
lis, err := net.Listen("tcp", addr)
33
log.Printf("state/api: listening on %q", addr)
34
tlsCert, err := tls.X509KeyPair(cert, key)
42
srv.rpcSrv, err = rpc.NewServer(&srvRoot{}, serverError)
43
lis = tls.NewListener(lis, &tls.Config{
44
Certificates: []tls.Certificate{tlsCert},
50
// Dead returns a channel that signals when the server has exited.
51
func (srv *Server) Dead() <-chan struct{} {
52
return srv.tomb.Dead()
55
// Stop stops the server and returns when all requests that
56
// it is running have completed.
57
func (srv *Server) Stop() error {
59
return srv.tomb.Wait()
62
func (srv *Server) run(lis net.Listener) {
64
defer srv.wg.Wait() // wait for any outstanding requests to complete.
71
handler := websocket.Handler(func(conn *websocket.Conn) {
74
// If we've got to this stage and the tomb is still
75
// alive, we know that any tomb.Kill must occur after we
76
// have called wg.Add, so we avoid the possibility of a
77
// handler goroutine running after Stop has returned.
78
if srv.tomb.Err() != tomb.ErrStillAlive {
81
if err := srv.serveConn(conn); err != nil {
82
log.Printf("state/api: error serving RPCs: %v", err)
85
// The error from http.Serve is not interesting.
86
http.Serve(lis, handler)
89
// Addr returns the address that the server is listening on.
90
func (srv *Server) Addr() string {
91
return srv.addr.String()
94
func (srv *Server) serveConn(conn *websocket.Conn) error {
95
msgs := make(chan serverReq)
96
go readRequests(conn, msgs)
99
// Wait for readRequests to see the closed connection and quit.
103
return srv.rpcSrv.ServeCodec(&serverCodec{
107
}, newStateServer(srv, conn))
110
var logRequests = true
112
func readRequests(conn *websocket.Conn, c chan<- serverReq) {
117
req = serverReq{} // avoid any potential cross-message contamination.
119
var m json.RawMessage
120
err = websocket.JSON.Receive(conn, &m)
122
log.Debugf("api: <- %s", m)
123
err = json.Unmarshal(m, &req)
125
log.Debugf("api: <- error: %v", err)
128
err = websocket.JSON.Receive(conn, &req)
134
log.Printf("api: error receiving request: %v", err)
141
type serverReq struct {
146
Params json.RawMessage
149
type serverResp struct {
151
Error string `json:",omitempty"`
152
ErrorCode string `json:",omitempty"`
153
Response interface{} `json:",omitempty"`
156
type serverCodec struct {
159
msgs <-chan serverReq
163
func (c *serverCodec) ReadRequestHeader(req *rpc.Request) error {
166
case c.req, ok = <-c.msgs:
170
case <-c.srv.tomb.Dying():
173
req.RequestId = c.req.RequestId
174
req.Type = c.req.Type
176
req.Request = c.req.Request
180
func (c *serverCodec) ReadRequestBody(body interface{}) error {
184
return json.Unmarshal(c.req.Params, body)
187
func (c *serverCodec) WriteResponse(resp *rpc.Response, body interface{}) error {
189
RequestId: resp.RequestId,
191
ErrorCode: resp.ErrorCode,
195
data, err := json.Marshal(r)
197
log.Debugf("api: -> marshal error: %v", err)
200
log.Debugf("api: -> %s", data)
202
return websocket.JSON.Send(c.conn, r)