~frankban/juju-core/ann-internals

« back to all changes in this revision

Viewing changes to state/apiserver/server.go

  • Committer: Roger Peppe
  • Date: 2013-03-06 14:44:51 UTC
  • mfrom: (956.3.5 237-clientserver)
  • Revision ID: roger.peppe@canonical.com-20130306144451-1dt7d3j2nbp8zfd4
state/apiserver: new package

This is necessary to break the import cycle that happens
when the api server imports launchpad.net/juju.

This change is purely mechanical - no logic changes.

R=dimitern, TheMue
CC=
https://codereview.appspot.com/7499043

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
package apiserver
 
2
 
 
3
import (
 
4
        "code.google.com/p/go.net/websocket"
 
5
        "crypto/tls"
 
6
        "encoding/json"
 
7
        "io"
 
8
        "launchpad.net/juju-core/log"
 
9
        "launchpad.net/juju-core/rpc"
 
10
        "launchpad.net/juju-core/state"
 
11
        "launchpad.net/tomb"
 
12
        "net"
 
13
        "net/http"
 
14
        "sync"
 
15
)
 
16
 
 
17
type Server struct {
 
18
        tomb   tomb.Tomb
 
19
        wg     sync.WaitGroup
 
20
        state  *state.State
 
21
        addr   net.Addr
 
22
        rpcSrv *rpc.Server
 
23
}
 
24
 
 
25
// Serve serves the given state by accepting requests
 
26
// on the given listener, using the given certificate
 
27
// and key (in PEM format) for authentication.
 
28
func NewServer(s *state.State, addr string, cert, key []byte) (*Server, error) {
 
29
        lis, err := net.Listen("tcp", addr)
 
30
        if err != nil {
 
31
                return nil, err
 
32
        }
 
33
        log.Printf("state/api: listening on %q", addr)
 
34
        tlsCert, err := tls.X509KeyPair(cert, key)
 
35
        if err != nil {
 
36
                return nil, err
 
37
        }
 
38
        srv := &Server{
 
39
                state: s,
 
40
                addr:  lis.Addr(),
 
41
        }
 
42
        srv.rpcSrv, err = rpc.NewServer(&srvRoot{}, serverError)
 
43
        lis = tls.NewListener(lis, &tls.Config{
 
44
                Certificates: []tls.Certificate{tlsCert},
 
45
        })
 
46
        go srv.run(lis)
 
47
        return srv, nil
 
48
}
 
49
 
 
50
// Dead returns a channel that signals when the server has exited.
 
51
func (srv *Server) Dead() <-chan struct{} {
 
52
        return srv.tomb.Dead()
 
53
}
 
54
 
 
55
// Stop stops the server and returns when all requests that
 
56
// it is running have completed.
 
57
func (srv *Server) Stop() error {
 
58
        srv.tomb.Kill(nil)
 
59
        return srv.tomb.Wait()
 
60
}
 
61
 
 
62
func (srv *Server) run(lis net.Listener) {
 
63
        defer srv.tomb.Done()
 
64
        defer srv.wg.Wait() // wait for any outstanding requests to complete.
 
65
        srv.wg.Add(1)
 
66
        go func() {
 
67
                <-srv.tomb.Dying()
 
68
                lis.Close()
 
69
                srv.wg.Done()
 
70
        }()
 
71
        handler := websocket.Handler(func(conn *websocket.Conn) {
 
72
                srv.wg.Add(1)
 
73
                defer srv.wg.Done()
 
74
                // If we've got to this stage and the tomb is still
 
75
                // alive, we know that any tomb.Kill must occur after we
 
76
                // have called wg.Add, so we avoid the possibility of a
 
77
                // handler goroutine running after Stop has returned.
 
78
                if srv.tomb.Err() != tomb.ErrStillAlive {
 
79
                        return
 
80
                }
 
81
                if err := srv.serveConn(conn); err != nil {
 
82
                        log.Printf("state/api: error serving RPCs: %v", err)
 
83
                }
 
84
        })
 
85
        // The error from http.Serve is not interesting.
 
86
        http.Serve(lis, handler)
 
87
}
 
88
 
 
89
// Addr returns the address that the server is listening on.
 
90
func (srv *Server) Addr() string {
 
91
        return srv.addr.String()
 
92
}
 
93
 
 
94
func (srv *Server) serveConn(conn *websocket.Conn) error {
 
95
        msgs := make(chan serverReq)
 
96
        go readRequests(conn, msgs)
 
97
        defer func() {
 
98
                conn.Close()
 
99
                // Wait for readRequests to see the closed connection and quit.
 
100
                for _ = range msgs {
 
101
                }
 
102
        }()
 
103
        return srv.rpcSrv.ServeCodec(&serverCodec{
 
104
                srv:  srv,
 
105
                conn: conn,
 
106
                msgs: msgs,
 
107
        }, newStateServer(srv, conn))
 
108
}
 
109
 
 
110
var logRequests = true
 
111
 
 
112
func readRequests(conn *websocket.Conn, c chan<- serverReq) {
 
113
        defer close(c)
 
114
        var req serverReq
 
115
        for {
 
116
                var err error
 
117
                req = serverReq{} // avoid any potential cross-message contamination.
 
118
                if logRequests {
 
119
                        var m json.RawMessage
 
120
                        err = websocket.JSON.Receive(conn, &m)
 
121
                        if err == nil {
 
122
                                log.Debugf("api: <- %s", m)
 
123
                                err = json.Unmarshal(m, &req)
 
124
                        } else {
 
125
                                log.Debugf("api: <- error: %v", err)
 
126
                        }
 
127
                } else {
 
128
                        err = websocket.JSON.Receive(conn, &req)
 
129
                }
 
130
                if err == io.EOF {
 
131
                        break
 
132
                }
 
133
                if err != nil {
 
134
                        log.Printf("api: error receiving request: %v", err)
 
135
                        break
 
136
                }
 
137
                c <- req
 
138
        }
 
139
}
 
140
 
 
141
type serverReq struct {
 
142
        RequestId uint64
 
143
        Type      string
 
144
        Id        string
 
145
        Request   string
 
146
        Params    json.RawMessage
 
147
}
 
148
 
 
149
type serverResp struct {
 
150
        RequestId uint64
 
151
        Error     string      `json:",omitempty"`
 
152
        ErrorCode string      `json:",omitempty"`
 
153
        Response  interface{} `json:",omitempty"`
 
154
}
 
155
 
 
156
type serverCodec struct {
 
157
        srv  *Server
 
158
        conn *websocket.Conn
 
159
        msgs <-chan serverReq
 
160
        req  serverReq
 
161
}
 
162
 
 
163
func (c *serverCodec) ReadRequestHeader(req *rpc.Request) error {
 
164
        var ok bool
 
165
        select {
 
166
        case c.req, ok = <-c.msgs:
 
167
                if !ok {
 
168
                        return io.EOF
 
169
                }
 
170
        case <-c.srv.tomb.Dying():
 
171
                return io.EOF
 
172
        }
 
173
        req.RequestId = c.req.RequestId
 
174
        req.Type = c.req.Type
 
175
        req.Id = c.req.Id
 
176
        req.Request = c.req.Request
 
177
        return nil
 
178
}
 
179
 
 
180
func (c *serverCodec) ReadRequestBody(body interface{}) error {
 
181
        if body == nil {
 
182
                return nil
 
183
        }
 
184
        return json.Unmarshal(c.req.Params, body)
 
185
}
 
186
 
 
187
func (c *serverCodec) WriteResponse(resp *rpc.Response, body interface{}) error {
 
188
        r := &serverResp{
 
189
                RequestId: resp.RequestId,
 
190
                Error:     resp.Error,
 
191
                ErrorCode: resp.ErrorCode,
 
192
                Response:  body,
 
193
        }
 
194
        if logRequests {
 
195
                data, err := json.Marshal(r)
 
196
                if err != nil {
 
197
                        log.Debugf("api: -> marshal error: %v", err)
 
198
                        return err
 
199
                }
 
200
                log.Debugf("api: -> %s", data)
 
201
        }
 
202
        return websocket.JSON.Send(c.conn, r)
 
203
}