~juju-qa/ubuntu/xenial/juju/2.0-rc2

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/apiserver/apiserver.go

  • Committer: Nicholas Skaggs
  • Date: 2016-09-30 14:39:30 UTC
  • mfrom: (1.8.1)
  • Revision ID: nicholas.skaggs@canonical.com-20160930143930-vwwhrefh6ftckccy
import upstream

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
        "strings"
12
12
        "sync"
13
13
        "sync/atomic"
14
 
        "time"
15
14
 
16
15
        "github.com/bmizerany/pat"
17
16
        "github.com/juju/errors"
18
17
        "github.com/juju/loggo"
19
18
        "github.com/juju/utils"
 
19
        "github.com/juju/utils/clock"
 
20
        "golang.org/x/crypto/acme/autocert"
20
21
        "golang.org/x/net/websocket"
21
22
        "gopkg.in/juju/names.v2"
22
23
        "gopkg.in/macaroon-bakery.v1/httpbakery"
41
42
// Server holds the server side of the API.
42
43
type Server struct {
43
44
        tomb              tomb.Tomb
 
45
        clock             clock.Clock
44
46
        wg                sync.WaitGroup
45
47
        state             *state.State
46
48
        statePool         *state.StatePool
56
58
        lastConnectionID  uint64
57
59
        newObserver       observer.ObserverFactory
58
60
        connCount         int64
 
61
        certChanged       <-chan params.StateServingInfo
 
62
        tlsConfig         *tls.Config
 
63
 
 
64
        // mu guards the fields below it.
 
65
        mu sync.Mutex
 
66
 
 
67
        // cert holds the current certificate used for tls.Config.
 
68
        cert *tls.Certificate
 
69
 
 
70
        // certDNSNames holds the DNS names associated with cert.
 
71
        certDNSNames []string
59
72
}
60
73
 
61
74
// LoginValidator functions are used to decide whether login requests
65
78
 
66
79
// ServerConfig holds parameters required to set up an API server.
67
80
type ServerConfig struct {
68
 
        Cert        []byte
69
 
        Key         []byte
 
81
        Clock       clock.Clock
 
82
        Cert        string
 
83
        Key         string
70
84
        Tag         names.Tag
71
85
        DataDir     string
72
86
        LogDir      string
73
87
        Validator   LoginValidator
74
 
        CertChanged chan params.StateServingInfo
 
88
        CertChanged <-chan params.StateServingInfo
75
89
 
76
90
        // NewObserver is a function which will return an observer. This
77
91
        // is used per-connection to instantiate a new observer to be
83
97
}
84
98
 
85
99
func (c *ServerConfig) Validate() error {
 
100
        if c.Clock == nil {
 
101
                return errors.NotValidf("missing Clock")
 
102
        }
86
103
        if c.NewObserver == nil {
87
104
                return errors.NotValidf("missing NewObserver")
88
105
        }
90
107
        return nil
91
108
}
92
109
 
93
 
// changeCertListener wraps a TLS net.Listener.
94
 
// It allows connection handshakes to be
95
 
// blocked while the TLS certificate is updated.
96
 
type changeCertListener struct {
97
 
        net.Listener
98
 
        tomb tomb.Tomb
99
 
 
100
 
        // A mutex used to block accept operations.
101
 
        m sync.Mutex
102
 
 
103
 
        // A channel used to pass in new certificate information.
104
 
        certChanged <-chan params.StateServingInfo
105
 
 
106
 
        // The config to update with any new certificate.
107
 
        config *tls.Config
108
 
}
109
 
 
110
 
func newChangeCertListener(lis net.Listener, certChanged <-chan params.StateServingInfo, config *tls.Config) *changeCertListener {
111
 
        cl := &changeCertListener{
112
 
                Listener:    lis,
113
 
                certChanged: certChanged,
114
 
                config:      config,
115
 
        }
116
 
        go func() {
117
 
                defer cl.tomb.Done()
118
 
                cl.tomb.Kill(cl.processCertChanges())
119
 
        }()
120
 
        return cl
121
 
}
122
 
 
123
 
// Accept waits for and returns the next connection to the listener.
124
 
func (cl *changeCertListener) Accept() (net.Conn, error) {
125
 
        conn, err := cl.Listener.Accept()
126
 
        if err != nil {
127
 
                return nil, err
128
 
        }
129
 
        cl.m.Lock()
130
 
        defer cl.m.Unlock()
131
 
 
132
 
        // make a copy of cl.config so that update certificate does not mutate
133
 
        // the config passed to the tls.Server for this conn.
134
 
        config := *cl.config
135
 
        return tls.Server(conn, &config), nil
136
 
}
137
 
 
138
 
// Close closes the listener.
139
 
func (cl *changeCertListener) Close() error {
140
 
        cl.tomb.Kill(nil)
141
 
        return cl.Listener.Close()
142
 
}
143
 
 
144
 
// processCertChanges receives new certificate information and
145
 
// calls a method to update the listener's certificate.
146
 
func (cl *changeCertListener) processCertChanges() error {
147
 
        for {
148
 
                select {
149
 
                case info := <-cl.certChanged:
150
 
                        if info.Cert != "" {
151
 
                                cl.updateCertificate([]byte(info.Cert), []byte(info.PrivateKey))
152
 
                        }
153
 
                case <-cl.tomb.Dying():
154
 
                        return tomb.ErrDying
155
 
                }
156
 
        }
157
 
}
158
 
 
159
 
// updateCertificate generates a new TLS certificate and assigns it
160
 
// to the TLS listener.
161
 
func (cl *changeCertListener) updateCertificate(cert, key []byte) {
162
 
        cl.m.Lock()
163
 
        defer cl.m.Unlock()
164
 
        if tlsCert, err := tls.X509KeyPair(cert, key); err != nil {
165
 
                logger.Errorf("cannot create new TLS certificate: %v", err)
166
 
        } else {
167
 
                logger.Infof("updating api server certificate")
168
 
                x509Cert, err := x509.ParseCertificate(tlsCert.Certificate[0])
169
 
                if err == nil {
170
 
                        var addr []string
171
 
                        for _, ip := range x509Cert.IPAddresses {
172
 
                                addr = append(addr, ip.String())
173
 
                        }
174
 
                        logger.Infof("new certificate addresses: %v", strings.Join(addr, ", "))
175
 
                }
176
 
                cl.config.Certificates = []tls.Certificate{tlsCert}
177
 
        }
178
 
}
179
 
 
180
110
// NewServer serves the given state by accepting requests on the given
181
111
// listener, using the given certificate and key (in PEM format) for
182
112
// authentication.
192
122
        // server needs to run before mongo upgrades have happened and
193
123
        // any state manipulation may be be relying on features of the
194
124
        // database added by upgrades. Here be dragons.
195
 
        l, ok := lis.(*net.TCPListener)
196
 
        if !ok {
197
 
                return nil, errors.Errorf("listener is not of type *net.TCPListener: %T", lis)
198
 
        }
199
 
        srv, err := newServer(s, l, cfg)
 
125
        srv, err := newServer(s, lis, cfg)
200
126
        if err != nil {
201
127
                // There is no running server around to close the listener.
202
128
                lis.Close()
205
131
        return srv, nil
206
132
}
207
133
 
208
 
func newServer(s *state.State, lis *net.TCPListener, cfg ServerConfig) (_ *Server, err error) {
209
 
        tlsCert, err := tls.X509KeyPair(cfg.Cert, cfg.Key)
210
 
        if err != nil {
211
 
                return nil, err
212
 
        }
213
 
        // TODO(rog) check that *srvRoot is a valid type for using
214
 
        // as an RPC server.
215
 
        tlsConfig := utils.SecureTLSConfig()
216
 
        tlsConfig.Certificates = []tls.Certificate{tlsCert}
217
 
 
 
134
func newServer(s *state.State, lis net.Listener, cfg ServerConfig) (_ *Server, err error) {
218
135
        stPool := cfg.StatePool
219
136
        if stPool == nil {
220
137
                stPool = state.NewStatePool(s)
221
138
        }
222
139
 
223
140
        srv := &Server{
 
141
                clock:       cfg.Clock,
 
142
                lis:         lis,
224
143
                newObserver: cfg.NewObserver,
225
144
                state:       s,
226
145
                statePool:   stPool,
227
 
                lis:         newChangeCertListener(lis, cfg.CertChanged, tlsConfig),
228
146
                tag:         cfg.Tag,
229
147
                dataDir:     cfg.DataDir,
230
148
                logDir:      cfg.LogDir,
233
151
                adminAPIFactories: map[int]adminAPIFactory{
234
152
                        3: newAdminAPIV3,
235
153
                },
 
154
                certChanged: cfg.CertChanged,
236
155
        }
 
156
 
 
157
        srv.tlsConfig = srv.newTLSConfig()
 
158
        srv.lis = tls.NewListener(lis, srv.tlsConfig)
 
159
 
237
160
        srv.authCtxt, err = newAuthContext(s)
238
161
        if err != nil {
239
162
                return nil, errors.Trace(err)
240
163
        }
 
164
        if err := srv.updateCertificate(cfg.Cert, cfg.Key); err != nil {
 
165
                return nil, errors.Annotatef(err, "cannot set initial certificate")
 
166
        }
241
167
        go srv.run()
242
168
        return srv, nil
243
169
}
244
170
 
 
171
func (srv *Server) newTLSConfig() *tls.Config {
 
172
        m := autocert.Manager{
 
173
                Prompt: autocert.AcceptTOS,
 
174
                Cache:  srv.state.AutocertCache(),
 
175
                // TODO(rogpeppe): use whitelist policy for HostPolicy.
 
176
                // TODO(rogpeppe): allow a different URL to be specified (for example
 
177
                // to use the letencrypt staging endpoint).
 
178
        }
 
179
        tlsConfig := utils.SecureTLSConfig()
 
180
        tlsConfig.GetCertificate = func(clientHello *tls.ClientHelloInfo) (*tls.Certificate, error) {
 
181
                // Get the locally created certificate and whether it's appropriate
 
182
                // for the SNI name. If not, we'll try to get an acme cert and
 
183
                // fall back to the local certificate if that fails.
 
184
                cert, shouldUse := srv.localCertificate(clientHello.ServerName)
 
185
                if shouldUse {
 
186
                        return cert, nil
 
187
                }
 
188
                acmeCert, err := m.GetCertificate(clientHello)
 
189
                if err == nil {
 
190
                        return acmeCert, nil
 
191
                }
 
192
                logger.Errorf("cannot get autocert certificate for %q: %v", clientHello.ServerName, err)
 
193
                return cert, nil
 
194
        }
 
195
        return tlsConfig
 
196
}
 
197
 
245
198
func (srv *Server) ConnectionCount() int64 {
246
199
        return atomic.LoadInt64(&srv.connCount)
247
200
}
295
248
                srv.tomb.Kill(srv.expireLocalLoginInteractions())
296
249
        }()
297
250
 
 
251
        srv.wg.Add(1)
 
252
        go func() {
 
253
                defer srv.wg.Done()
 
254
                srv.tomb.Kill(srv.processCertChanges())
 
255
        }()
 
256
 
298
257
        // for pat based handlers, they are matched in-order of being
299
258
        // registered, first match wins. So more specific ones have to be
300
259
        // registered first.
304
263
        }
305
264
 
306
265
        go func() {
307
 
                addr := srv.lis.Addr() // not valid after addr closed
308
 
                logger.Debugf("Starting API http server on address %q", addr)
309
 
                err := http.Serve(srv.lis, mux)
310
 
                // normally logging an error at debug level would be grounds for a beating,
 
266
                logger.Debugf("Starting API http server on address %q", srv.lis.Addr())
 
267
                httpSrv := &http.Server{
 
268
                        Handler:   mux,
 
269
                        TLSConfig: srv.tlsConfig,
 
270
                }
 
271
                err := httpSrv.Serve(srv.lis)
 
272
                // Normally logging an error at debug level would be grounds for a beating,
311
273
                // however in this case the error is *expected* to be non nil, and does not
312
274
                // affect the operation of the apiserver, but for completeness log it anyway.
313
275
                logger.Debugf("API http server exited, final error was: %v", err)
441
403
                select {
442
404
                case <-srv.tomb.Dying():
443
405
                        return tomb.ErrDying
444
 
                case <-time.After(authentication.LocalLoginInteractionTimeout):
 
406
                case <-srv.clock.After(authentication.LocalLoginInteractionTimeout):
445
407
                        now := srv.authCtxt.clock.Now()
446
408
                        srv.authCtxt.localUserInteractions.Expire(now)
447
409
                }
587
549
}
588
550
 
589
551
func (srv *Server) mongoPinger() error {
590
 
        // TODO(fwereade): 2016-03-17 lp:1558657
591
 
        timer := time.NewTimer(0)
592
552
        session := srv.state.MongoSession().Copy()
593
553
        defer session.Close()
594
554
        for {
595
 
                select {
596
 
                case <-timer.C:
597
 
                case <-srv.tomb.Dying():
598
 
                        return tomb.ErrDying
599
 
                }
600
555
                if err := session.Ping(); err != nil {
601
556
                        logger.Infof("got error pinging mongo: %v", err)
602
557
                        return errors.Annotate(err, "error pinging mongo")
603
558
                }
604
 
                timer.Reset(mongoPingInterval)
605
 
        }
 
559
                select {
 
560
                case <-srv.clock.After(mongoPingInterval):
 
561
                case <-srv.tomb.Dying():
 
562
                        return tomb.ErrDying
 
563
                }
 
564
        }
 
565
}
 
566
 
 
567
// localCertificate returns the local server certificate and reports
 
568
// whether it should be used to serve a connection addressed to the
 
569
// given server name.
 
570
func (srv *Server) localCertificate(serverName string) (*tls.Certificate, bool) {
 
571
        srv.mu.Lock()
 
572
        defer srv.mu.Unlock()
 
573
        if net.ParseIP(serverName) != nil {
 
574
                // IP address connections always use the local certificate.
 
575
                return srv.cert, true
 
576
        }
 
577
        if !strings.Contains(serverName, ".") {
 
578
                // If the server name doesn't contain a period there's no
 
579
                // way we can obtain a certificate for it.
 
580
                // This applies to the common case where "juju-apiserver" is
 
581
                // used as the server name.
 
582
                return srv.cert, true
 
583
        }
 
584
        // Perhaps the server name is explicitly mentioned by the server certificate.
 
585
        for _, name := range srv.certDNSNames {
 
586
                if name == serverName {
 
587
                        return srv.cert, true
 
588
                }
 
589
        }
 
590
        return srv.cert, false
 
591
}
 
592
 
 
593
// processCertChanges receives new certificate information and
 
594
// calls a method to update the listener's certificate.
 
595
func (srv *Server) processCertChanges() error {
 
596
        for {
 
597
                select {
 
598
                case info := <-srv.certChanged:
 
599
                        if info.Cert == "" {
 
600
                                break
 
601
                        }
 
602
                        logger.Infof("received API server certificate")
 
603
                        if err := srv.updateCertificate(info.Cert, info.PrivateKey); err != nil {
 
604
                                logger.Errorf("cannot update certificate: %v", err)
 
605
                        }
 
606
                case <-srv.tomb.Dying():
 
607
                        return tomb.ErrDying
 
608
                }
 
609
        }
 
610
}
 
611
 
 
612
// updateCertificate updates the current CA certificate and key
 
613
// from the given cert and key.
 
614
func (srv *Server) updateCertificate(cert, key string) error {
 
615
        srv.mu.Lock()
 
616
        defer srv.mu.Unlock()
 
617
        tlsCert, err := tls.X509KeyPair([]byte(cert), []byte(key))
 
618
        if err != nil {
 
619
                return errors.Annotatef(err, "cannot create new TLS certificate")
 
620
        }
 
621
        x509Cert, err := x509.ParseCertificate(tlsCert.Certificate[0])
 
622
        if err != nil {
 
623
                return errors.Annotatef(err, "parsing x509 cert")
 
624
        }
 
625
        var addr []string
 
626
        for _, ip := range x509Cert.IPAddresses {
 
627
                addr = append(addr, ip.String())
 
628
        }
 
629
        logger.Infof("new certificate addresses: %v", strings.Join(addr, ", "))
 
630
        srv.cert = &tlsCert
 
631
        srv.certDNSNames = x509Cert.DNSNames
 
632
        return nil
606
633
}
607
634
 
608
635
func serverError(err error) error {