118
130
AuthGetter func(string) string
120
132
AddresseeChecker AddresseeChecking
133
BroadcastCh chan *BroadcastNotification
134
NotificationsCh chan AddressedNotification
123
137
// ClientSession holds a client<->server session and its configuration.
124
type ClientSession struct {
138
type ClientSession interface {
140
State() ClientSessionState
141
HasConnectivity(bool)
142
KeepConnection() error
146
type clientSession struct {
127
149
ClientSessionConfig
145
167
proto protocol.Protocol
146
168
pingInterval time.Duration
147
169
retrier util.AutoRedialer
148
retrierLock sync.Mutex
153
BroadcastCh chan *BroadcastNotification
154
NotificationsCh chan AddressedNotification
172
stateLock sync.RWMutex
173
state ClientSessionState
157
176
// autoredial knobs
158
177
shouldDelayP *uint32
159
178
lastAutoRedial time.Time
160
redialDelay func(*ClientSession) time.Duration
179
redialDelay func(*clientSession) time.Duration
161
180
redialJitter func(time.Duration) time.Duration
162
181
redialDelays []time.Duration
163
182
redialDelaysIdx int
183
// connection events, and cookie reset requests, come in over here
185
// last seen connection event is here
187
// connection events are handled by this
188
connHandler func(bool)
189
// autoredial goes over here (xxx spurious goroutine involved)
191
// main loop errors out through here (possibly another spurious goroutine)
193
// main loop errors are handled by this
194
errHandler func(error)
166
func redialDelay(sess *ClientSession) time.Duration {
199
func redialDelay(sess *clientSession) time.Duration {
167
200
if sess.ShouldDelay() {
168
201
t := sess.redialDelays[sess.redialDelaysIdx]
169
202
if len(sess.redialDelays) > sess.redialDelaysIdx+1 {
179
212
func NewSession(serverAddrSpec string, conf ClientSessionConfig,
180
213
deviceId string, seenStateFactory func() (seenstate.SeenState, error),
181
log logger.Logger) (*ClientSession, error) {
182
state := uint32(Disconnected)
214
log logger.Logger) (*clientSession, error) {
183
215
seenState, err := seenStateFactory()
200
232
Protocolator: protocol.NewProtocol0,
201
233
SeenState: seenState,
202
234
TLS: &tls.Config{},
204
236
timeSince: time.Since,
205
237
shouldDelayP: &shouldDelay,
206
redialDelay: redialDelay,
238
redialDelay: redialDelay, // NOTE there are tests that use calling sess.redialDelay as an indication of calling autoRedial!
207
239
redialDelays: util.Timeouts(),
209
241
sess.redialJitter = sess.Jitter
216
248
sess.TLS.RootCAs = cp
250
sess.doneCh = make(chan uint32, 1)
251
sess.stopCh = make(chan struct{})
252
sess.cmdCh = make(chan sessCmd)
253
sess.errCh = make(chan error, 1)
255
// to be overridden by tests
256
sess.connHandler = sess.handleConn
257
sess.errHandler = sess.handleErr
221
func (sess *ClientSession) ShouldDelay() bool {
262
func (sess *clientSession) ShouldDelay() bool {
222
263
return atomic.LoadUint32(sess.shouldDelayP) != 0
225
func (sess *ClientSession) setShouldDelay() {
266
func (sess *clientSession) setShouldDelay() {
226
267
atomic.StoreUint32(sess.shouldDelayP, uint32(1))
229
func (sess *ClientSession) clearShouldDelay() {
270
func (sess *clientSession) clearShouldDelay() {
230
271
atomic.StoreUint32(sess.shouldDelayP, uint32(0))
233
func (sess *ClientSession) State() ClientSessionState {
234
return ClientSessionState(atomic.LoadUint32(sess.stateP))
237
func (sess *ClientSession) setState(state ClientSessionState) {
238
sess.Log.Debugf("session.setState: %s -> %s", ClientSessionState(atomic.LoadUint32(sess.stateP)), state)
239
atomic.StoreUint32(sess.stateP, uint32(state))
242
func (sess *ClientSession) setConnection(conn net.Conn) {
274
func (sess *clientSession) State() ClientSessionState {
275
sess.stateLock.RLock()
276
defer sess.stateLock.RUnlock()
280
func (sess *clientSession) setState(state ClientSessionState) {
281
sess.stateLock.Lock()
282
defer sess.stateLock.Unlock()
283
sess.Log.Debugf("session.setState: %s -> %s", sess.state, state)
287
func (sess *clientSession) setConnection(conn net.Conn) {
243
288
sess.connLock.Lock()
244
289
defer sess.connLock.Unlock()
245
290
sess.Connection = conn
248
func (sess *ClientSession) getConnection() net.Conn {
293
func (sess *clientSession) getConnection() net.Conn {
249
294
sess.connLock.RLock()
250
295
defer sess.connLock.RUnlock()
251
296
return sess.Connection
254
func (sess *ClientSession) setCookie(cookie string) {
299
func (sess *clientSession) setCookie(cookie string) {
255
300
sess.connLock.Lock()
256
301
defer sess.connLock.Unlock()
257
302
sess.cookie = cookie
260
func (sess *ClientSession) getCookie() string {
305
func (sess *clientSession) getCookie() string {
261
306
sess.connLock.RLock()
262
307
defer sess.connLock.RUnlock()
263
308
return sess.cookie
266
func (sess *ClientSession) ClearCookie() {
268
defer sess.connLock.Unlock()
311
func (sess *clientSession) ResetCookie() {
312
sess.cmdCh <- cmdResetCookie
315
func (sess *clientSession) resetCookie() {
272
320
// getHosts sets deliveryHosts possibly querying a remote endpoint
273
func (sess *ClientSession) getHosts() error {
321
func (sess *clientSession) getHosts() error {
274
322
if sess.getHost != nil {
275
323
if sess.deliveryHosts != nil && sess.timeSince(sess.deliveryHostsTimestamp) < sess.HostsCachingExpiryTime {
295
343
// addAuthorization gets the authorization blob to send to the server
296
344
// and adds it to the session.
297
func (sess *ClientSession) addAuthorization() error {
345
func (sess *clientSession) addAuthorization() error {
298
346
if sess.AuthGetter != nil {
299
347
sess.Log.Debugf("adding authorization")
300
348
sess.auth = sess.AuthGetter(sess.AuthURL)
366
func (sess *ClientSession) stopRedial() {
367
sess.retrierLock.Lock()
368
defer sess.retrierLock.Unlock()
414
func (sess *clientSession) stopRedial() {
369
415
if sess.retrier != nil {
370
416
sess.retrier.Stop()
371
417
sess.retrier = nil
375
func (sess *ClientSession) AutoRedial(doneCh chan uint32) {
421
func (sess *clientSession) autoRedial() {
376
422
sess.stopRedial()
377
423
if time.Since(sess.lastAutoRedial) < 2*time.Second {
378
424
sess.setShouldDelay()
380
time.Sleep(sess.redialDelay(sess))
381
sess.retrierLock.Lock()
382
defer sess.retrierLock.Unlock()
426
// xxx should we really wait on the caller goroutine?
427
delay := sess.redialDelay(sess)
428
sess.Log.Debugf("session redial delay: %v, wait", delay)
430
sess.Log.Debugf("session redial delay: %v, cont", delay)
383
431
if sess.retrier != nil {
384
432
panic("session AutoRedial: unexpected non-nil retrier.")
386
434
sess.retrier = util.NewAutoRedialer(sess)
387
435
sess.lastAutoRedial = time.Now()
389
sess.retrierLock.Lock()
390
retrier := sess.retrier
391
sess.retrierLock.Unlock()
393
sess.Log.Debugf("session autoredialer skipping retry: retrier has been set to nil.")
436
go func(retrier util.AutoRedialer) {
396
437
sess.Log.Debugf("session autoredialier launching Redial goroutine")
397
doneCh <- retrier.Redial()
401
func (sess *ClientSession) Close() {
406
func (sess *ClientSession) doClose() {
438
// if the redialer has been stopped before calling Redial(), it'll return 0.
439
sess.doneCh <- retrier.Redial()
443
func (sess *clientSession) doClose(resetCookie bool) {
407
444
sess.connLock.Lock()
408
445
defer sess.connLock.Unlock()
449
sess.closeConnection()
450
sess.setState(Disconnected)
453
func (sess *clientSession) closeConnection() {
454
// *must be called with connLock held*
409
455
if sess.Connection != nil {
410
456
sess.Connection.Close()
411
457
// we ignore Close errors, on purpose (the thinking being that
635
680
// run calls connect, and if it works it calls start, and if it works
636
681
// it runs loop in a goroutine, and ships its return value over ErrCh.
637
func (sess *ClientSession) run(closer func(), authChecker, hostGetter, connecter, starter, looper func() error) error {
682
func (sess *clientSession) run(closer func(bool), authChecker, hostGetter, connecter, starter, looper func() error) error {
639
684
if err := authChecker(); err != nil {
649
sess.ErrCh = make(chan error, 1)
650
sess.BroadcastCh = make(chan *BroadcastNotification)
651
sess.NotificationsCh = make(chan AddressedNotification)
652
go func() { sess.ErrCh <- looper() }()
694
go func() { sess.errCh <- looper() }()
658
700
// This Jitter returns a random time.Duration somewhere in [-spread, spread].
659
func (sess *ClientSession) Jitter(spread time.Duration) time.Duration {
701
func (sess *clientSession) Jitter(spread time.Duration) time.Duration {
661
703
panic("spread must be non-negative")
667
709
// Dial takes the session from newly created (or newly disconnected)
668
710
// to running the main loop.
669
func (sess *ClientSession) Dial() error {
711
func (sess *clientSession) Dial() error {
670
712
if sess.Protocolator == nil {
671
713
// a missing protocolator means you've willfully overridden
672
714
// it; returning an error here would prompt AutoRedial to just
676
718
return sess.run(sess.doClose, sess.addAuthorization, sess.getHosts, sess.connect, sess.start, sess.loop)
721
func (sess *clientSession) shutdown() {
722
sess.Log.Infof("session shutting down.")
724
defer sess.connLock.Unlock()
726
sess.closeConnection()
729
func (sess *clientSession) doKeepConnection() {
732
case cmd := <-sess.cmdCh:
735
sess.connHandler(true)
737
sess.connHandler(false)
744
case n := <-sess.doneCh:
745
// if n == 0, the redialer aborted. If you do
746
// anything other than log it, keep that in mind.
747
sess.Log.Debugf("connected after %d attempts.", n)
748
case err := <-sess.errCh:
754
func (sess *clientSession) handleConn(hasConn bool) {
755
sess.lastConn = hasConn
757
// Note this does not depend on the current state! That's because Dial
758
// starts with doClose, which gets you to Disconnected even if you're
759
// connected, and you can call Close when Disconnected without it
769
func (sess *clientSession) handleErr(err error) {
770
sess.Log.Errorf("session error'ed out with %v", err)
771
// State() == Error mostly defends interrupting an ongoing
772
// autoRedial if we went quickly already through hasConn =
773
// false => hasConn = true
774
if sess.State() == Error && sess.lastConn {
779
func (sess *clientSession) KeepConnection() error {
780
sess.stateLock.Lock()
781
defer sess.stateLock.Unlock()
782
if sess.state != Pristine {
783
return errors.New("don't call KeepConnection() on a non-pristine session.")
785
sess.state = Disconnected
787
go sess.doKeepConnection()
792
func (sess *clientSession) StopKeepConnection() {
793
sess.setState(Shutdown)
797
func (sess *clientSession) HasConnectivity(hasConn bool) {
799
sess.cmdCh <- cmdConnect
801
sess.cmdCh <- cmdDisconnect
680
806
rand.Seed(time.Now().Unix()) // good enough for us (we're not using it for crypto)