~mterry/ubuntu-push/wake-screen

« back to all changes in this revision

Viewing changes to client/session/session.go

  • Committer: CI Train Bot
  • Author(s): Samuele Pedroni (Canonical Services Ltd.)
  • Date: 2015-04-03 13:27:41 UTC
  • mfrom: (143.1.2 vivid)
  • Revision ID: ci-train-bot@canonical.com-20150403132741-dzovofwogdhky960
[Roberto Alsina]
* click-hook: report failure if hooks_path doesn't exist. [client]

[Bret Barker]
* add a hacky busy sleep loop to workaround go's sleep not
  accounting for suspended time, more logging (lp:1435109). [client]

[John R. Lenton]
* Refactor code maintaining session (better fix for lp:1390663) [client]

[Samuele Pedroni]
* just delegate whether there's a update-worthy image to the
  system-settings helper and system-image. [client]
* stop waking up for polling if in flight-mode and wireless not
  enabled (lp:1437135). [client]
* don't hold a lock for a long time on handleErrConn, trigger
  autoRedial on Error more actively (lp:1435109). [client]
* disallow RC4 and SSLv3. [server]
Approved by: Roberto Alsina

Show diffs side-by-side

added added

removed removed

Lines of Context:
39
39
        "launchpad.net/ubuntu-push/util"
40
40
)
41
41
 
 
42
type sessCmd uint8
 
43
 
 
44
const (
 
45
        cmdDisconnect sessCmd = iota
 
46
        cmdConnect
 
47
        cmdResetCookie
 
48
)
 
49
 
42
50
var (
43
51
        wireVersionBytes = []byte{protocol.ProtocolWireVersion}
44
52
)
70
78
 
71
79
const (
72
80
        Error ClientSessionState = iota
 
81
        Pristine
73
82
        Disconnected
74
83
        Connected
75
84
        Started
76
85
        Running
 
86
        Shutdown
77
87
        Unknown
78
88
)
79
89
 
83
93
        }
84
94
        return [Unknown]string{
85
95
                "Error",
 
96
                "Pristine",
86
97
                "Disconnected",
87
98
                "Connected",
88
99
                "Started",
89
100
                "Running",
 
101
                "Shutdown",
90
102
        }[s]
91
103
}
92
104
 
118
130
        AuthGetter             func(string) string
119
131
        AuthURL                string
120
132
        AddresseeChecker       AddresseeChecking
 
133
        BroadcastCh            chan *BroadcastNotification
 
134
        NotificationsCh        chan AddressedNotification
121
135
}
122
136
 
123
137
// ClientSession holds a client<->server session and its configuration.
124
 
type ClientSession struct {
 
138
type ClientSession interface {
 
139
        ResetCookie()
 
140
        State() ClientSessionState
 
141
        HasConnectivity(bool)
 
142
        KeepConnection() error
 
143
        StopKeepConnection()
 
144
}
 
145
 
 
146
type clientSession struct {
125
147
        // configuration
126
148
        DeviceId string
127
149
        ClientSessionConfig
145
167
        proto        protocol.Protocol
146
168
        pingInterval time.Duration
147
169
        retrier      util.AutoRedialer
148
 
        retrierLock  sync.Mutex
149
170
        cookie       string
150
171
        // status
151
 
        stateP          *uint32
152
 
        ErrCh           chan error
153
 
        BroadcastCh     chan *BroadcastNotification
154
 
        NotificationsCh chan AddressedNotification
 
172
        stateLock sync.RWMutex
 
173
        state     ClientSessionState
155
174
        // authorization
156
175
        auth string
157
176
        // autoredial knobs
158
177
        shouldDelayP    *uint32
159
178
        lastAutoRedial  time.Time
160
 
        redialDelay     func(*ClientSession) time.Duration
 
179
        redialDelay     func(*clientSession) time.Duration
161
180
        redialJitter    func(time.Duration) time.Duration
162
181
        redialDelays    []time.Duration
163
182
        redialDelaysIdx int
 
183
        // connection events, and cookie reset requests, come in over here
 
184
        cmdCh chan sessCmd
 
185
        // last seen connection event is here
 
186
        lastConn bool
 
187
        // connection events are handled by this
 
188
        connHandler func(bool)
 
189
        // autoredial goes over here (xxx spurious goroutine involved)
 
190
        doneCh chan uint32
 
191
        // main loop errors out through here (possibly another spurious goroutine)
 
192
        errCh chan error
 
193
        // main loop errors are handled by this
 
194
        errHandler func(error)
 
195
        // look, a stopper!
 
196
        stopCh chan struct{}
164
197
}
165
198
 
166
 
func redialDelay(sess *ClientSession) time.Duration {
 
199
func redialDelay(sess *clientSession) time.Duration {
167
200
        if sess.ShouldDelay() {
168
201
                t := sess.redialDelays[sess.redialDelaysIdx]
169
202
                if len(sess.redialDelays) > sess.redialDelaysIdx+1 {
178
211
 
179
212
func NewSession(serverAddrSpec string, conf ClientSessionConfig,
180
213
        deviceId string, seenStateFactory func() (seenstate.SeenState, error),
181
 
        log logger.Logger) (*ClientSession, error) {
182
 
        state := uint32(Disconnected)
 
214
        log logger.Logger) (*clientSession, error) {
183
215
        seenState, err := seenStateFactory()
184
216
        if err != nil {
185
217
                return nil, err
191
223
                getHost = gethosts.New(deviceId, hostsEndpoint, conf.ExchangeTimeout)
192
224
        }
193
225
        var shouldDelay uint32 = 0
194
 
        sess := &ClientSession{
 
226
        sess := &clientSession{
195
227
                ClientSessionConfig: conf,
196
228
                getHost:             getHost,
197
229
                fallbackHosts:       fallbackHosts,
200
232
                Protocolator:        protocol.NewProtocol0,
201
233
                SeenState:           seenState,
202
234
                TLS:                 &tls.Config{},
203
 
                stateP:              &state,
 
235
                state:               Pristine,
204
236
                timeSince:           time.Since,
205
237
                shouldDelayP:        &shouldDelay,
206
 
                redialDelay:         redialDelay,
 
238
                redialDelay:         redialDelay, // NOTE there are tests that use calling sess.redialDelay as an indication of calling autoRedial!
207
239
                redialDelays:        util.Timeouts(),
208
240
        }
209
241
        sess.redialJitter = sess.Jitter
215
247
                }
216
248
                sess.TLS.RootCAs = cp
217
249
        }
 
250
        sess.doneCh = make(chan uint32, 1)
 
251
        sess.stopCh = make(chan struct{})
 
252
        sess.cmdCh = make(chan sessCmd)
 
253
        sess.errCh = make(chan error, 1)
 
254
 
 
255
        // to be overridden by tests
 
256
        sess.connHandler = sess.handleConn
 
257
        sess.errHandler = sess.handleErr
 
258
 
218
259
        return sess, nil
219
260
}
220
261
 
221
 
func (sess *ClientSession) ShouldDelay() bool {
 
262
func (sess *clientSession) ShouldDelay() bool {
222
263
        return atomic.LoadUint32(sess.shouldDelayP) != 0
223
264
}
224
265
 
225
 
func (sess *ClientSession) setShouldDelay() {
 
266
func (sess *clientSession) setShouldDelay() {
226
267
        atomic.StoreUint32(sess.shouldDelayP, uint32(1))
227
268
}
228
269
 
229
 
func (sess *ClientSession) clearShouldDelay() {
 
270
func (sess *clientSession) clearShouldDelay() {
230
271
        atomic.StoreUint32(sess.shouldDelayP, uint32(0))
231
272
}
232
273
 
233
 
func (sess *ClientSession) State() ClientSessionState {
234
 
        return ClientSessionState(atomic.LoadUint32(sess.stateP))
235
 
}
236
 
 
237
 
func (sess *ClientSession) setState(state ClientSessionState) {
238
 
        sess.Log.Debugf("session.setState: %s -> %s", ClientSessionState(atomic.LoadUint32(sess.stateP)), state)
239
 
        atomic.StoreUint32(sess.stateP, uint32(state))
240
 
}
241
 
 
242
 
func (sess *ClientSession) setConnection(conn net.Conn) {
 
274
func (sess *clientSession) State() ClientSessionState {
 
275
        sess.stateLock.RLock()
 
276
        defer sess.stateLock.RUnlock()
 
277
        return sess.state
 
278
}
 
279
 
 
280
func (sess *clientSession) setState(state ClientSessionState) {
 
281
        sess.stateLock.Lock()
 
282
        defer sess.stateLock.Unlock()
 
283
        sess.Log.Debugf("session.setState: %s -> %s", sess.state, state)
 
284
        sess.state = state
 
285
}
 
286
 
 
287
func (sess *clientSession) setConnection(conn net.Conn) {
243
288
        sess.connLock.Lock()
244
289
        defer sess.connLock.Unlock()
245
290
        sess.Connection = conn
246
291
}
247
292
 
248
 
func (sess *ClientSession) getConnection() net.Conn {
 
293
func (sess *clientSession) getConnection() net.Conn {
249
294
        sess.connLock.RLock()
250
295
        defer sess.connLock.RUnlock()
251
296
        return sess.Connection
252
297
}
253
298
 
254
 
func (sess *ClientSession) setCookie(cookie string) {
 
299
func (sess *clientSession) setCookie(cookie string) {
255
300
        sess.connLock.Lock()
256
301
        defer sess.connLock.Unlock()
257
302
        sess.cookie = cookie
258
303
}
259
304
 
260
 
func (sess *ClientSession) getCookie() string {
 
305
func (sess *clientSession) getCookie() string {
261
306
        sess.connLock.RLock()
262
307
        defer sess.connLock.RUnlock()
263
308
        return sess.cookie
264
309
}
265
310
 
266
 
func (sess *ClientSession) ClearCookie() {
267
 
        sess.connLock.Lock()
268
 
        defer sess.connLock.Unlock()
269
 
        sess.cookie = ""
 
311
func (sess *clientSession) ResetCookie() {
 
312
        sess.cmdCh <- cmdResetCookie
 
313
}
 
314
 
 
315
func (sess *clientSession) resetCookie() {
 
316
        sess.stopRedial()
 
317
        sess.doClose(true)
270
318
}
271
319
 
272
320
// getHosts sets deliveryHosts possibly querying a remote endpoint
273
 
func (sess *ClientSession) getHosts() error {
 
321
func (sess *clientSession) getHosts() error {
274
322
        if sess.getHost != nil {
275
323
                if sess.deliveryHosts != nil && sess.timeSince(sess.deliveryHostsTimestamp) < sess.HostsCachingExpiryTime {
276
324
                        return nil
294
342
 
295
343
// addAuthorization gets the authorization blob to send to the server
296
344
// and adds it to the session.
297
 
func (sess *ClientSession) addAuthorization() error {
 
345
func (sess *clientSession) addAuthorization() error {
298
346
        if sess.AuthGetter != nil {
299
347
                sess.Log.Debugf("adding authorization")
300
348
                sess.auth = sess.AuthGetter(sess.AuthURL)
302
350
        return nil
303
351
}
304
352
 
305
 
func (sess *ClientSession) resetHosts() {
 
353
func (sess *clientSession) resetHosts() {
306
354
        sess.deliveryHosts = nil
307
355
}
308
356
 
309
357
// startConnectionAttempt/nextHostToTry help connect iterating over candidate hosts
310
358
 
311
 
func (sess *ClientSession) startConnectionAttempt() {
 
359
func (sess *clientSession) startConnectionAttempt() {
312
360
        if sess.timeSince(sess.lastAttemptTimestamp) > sess.ExpectAllRepairedTime {
313
361
                sess.tryHost = 0
314
362
        }
319
367
        sess.lastAttemptTimestamp = time.Now()
320
368
}
321
369
 
322
 
func (sess *ClientSession) nextHostToTry() string {
 
370
func (sess *clientSession) nextHostToTry() string {
323
371
        if sess.leftToTry == 0 {
324
372
                return ""
325
373
        }
331
379
 
332
380
// we reached the Started state, we can retry with the same host if we
333
381
// have to retry again
334
 
func (sess *ClientSession) started() {
 
382
func (sess *clientSession) started() {
335
383
        sess.tryHost--
336
384
        if sess.tryHost == -1 {
337
385
                sess.tryHost = len(sess.deliveryHosts) - 1
341
389
 
342
390
// connect to a server using the configuration in the ClientSession
343
391
// and set up the connection.
344
 
func (sess *ClientSession) connect() error {
 
392
func (sess *clientSession) connect() error {
345
393
        sess.setShouldDelay()
346
394
        sess.startConnectionAttempt()
347
395
        var err error
363
411
        return nil
364
412
}
365
413
 
366
 
func (sess *ClientSession) stopRedial() {
367
 
        sess.retrierLock.Lock()
368
 
        defer sess.retrierLock.Unlock()
 
414
func (sess *clientSession) stopRedial() {
369
415
        if sess.retrier != nil {
370
416
                sess.retrier.Stop()
371
417
                sess.retrier = nil
372
418
        }
373
419
}
374
420
 
375
 
func (sess *ClientSession) AutoRedial(doneCh chan uint32) {
 
421
func (sess *clientSession) autoRedial() {
376
422
        sess.stopRedial()
377
423
        if time.Since(sess.lastAutoRedial) < 2*time.Second {
378
424
                sess.setShouldDelay()
379
425
        }
380
 
        time.Sleep(sess.redialDelay(sess))
381
 
        sess.retrierLock.Lock()
382
 
        defer sess.retrierLock.Unlock()
 
426
        // xxx should we really wait on the caller goroutine?
 
427
        delay := sess.redialDelay(sess)
 
428
        sess.Log.Debugf("session redial delay: %v, wait", delay)
 
429
        time.Sleep(delay)
 
430
        sess.Log.Debugf("session redial delay: %v, cont", delay)
383
431
        if sess.retrier != nil {
384
432
                panic("session AutoRedial: unexpected non-nil retrier.")
385
433
        }
386
434
        sess.retrier = util.NewAutoRedialer(sess)
387
435
        sess.lastAutoRedial = time.Now()
388
 
        go func() {
389
 
                sess.retrierLock.Lock()
390
 
                retrier := sess.retrier
391
 
                sess.retrierLock.Unlock()
392
 
                if retrier == nil {
393
 
                        sess.Log.Debugf("session autoredialer skipping retry: retrier has been set to nil.")
394
 
                        return
395
 
                }
 
436
        go func(retrier util.AutoRedialer) {
396
437
                sess.Log.Debugf("session autoredialier launching Redial goroutine")
397
 
                doneCh <- retrier.Redial()
398
 
        }()
399
 
}
400
 
 
401
 
func (sess *ClientSession) Close() {
402
 
        sess.stopRedial()
403
 
        sess.doClose()
404
 
}
405
 
 
406
 
func (sess *ClientSession) doClose() {
 
438
                // if the redialer has been stopped before calling Redial(), it'll return 0.
 
439
                sess.doneCh <- retrier.Redial()
 
440
        }(sess.retrier)
 
441
}
 
442
 
 
443
func (sess *clientSession) doClose(resetCookie bool) {
407
444
        sess.connLock.Lock()
408
445
        defer sess.connLock.Unlock()
 
446
        if resetCookie {
 
447
                sess.cookie = ""
 
448
        }
 
449
        sess.closeConnection()
 
450
        sess.setState(Disconnected)
 
451
}
 
452
 
 
453
func (sess *clientSession) closeConnection() {
 
454
        // *must be called with connLock held*
409
455
        if sess.Connection != nil {
410
456
                sess.Connection.Close()
411
457
                // we ignore Close errors, on purpose (the thinking being that
413
459
                // you could do to recover at this stage).
414
460
                sess.Connection = nil
415
461
        }
416
 
        sess.setState(Disconnected)
417
462
}
418
463
 
419
464
// handle "ping" messages
420
 
func (sess *ClientSession) handlePing() error {
 
465
func (sess *clientSession) handlePing() error {
421
466
        err := sess.proto.WriteMessage(protocol.PingPongMsg{Type: "pong"})
422
467
        if err == nil {
423
468
                sess.Log.Debugf("ping.")
429
474
        return err
430
475
}
431
476
 
432
 
func (sess *ClientSession) decodeBroadcast(bcast *serverMsg) *BroadcastNotification {
 
477
func (sess *clientSession) decodeBroadcast(bcast *serverMsg) *BroadcastNotification {
433
478
        decoded := make([]map[string]interface{}, 0)
434
479
        for _, p := range bcast.Payloads {
435
480
                var v map[string]interface{}
447
492
}
448
493
 
449
494
// handle "broadcast" messages
450
 
func (sess *ClientSession) handleBroadcast(bcast *serverMsg) error {
 
495
func (sess *clientSession) handleBroadcast(bcast *serverMsg) error {
451
496
        err := sess.SeenState.SetLevel(bcast.ChanId, bcast.TopLevel)
452
497
        if err != nil {
453
498
                sess.setState(Error)
478
523
}
479
524
 
480
525
// handle "notifications" messages
481
 
func (sess *ClientSession) handleNotifications(ucast *serverMsg) error {
 
526
func (sess *clientSession) handleNotifications(ucast *serverMsg) error {
482
527
        notifs, err := sess.SeenState.FilterBySeen(ucast.Notifications)
483
528
        if err != nil {
484
529
                sess.setState(Error)
512
557
}
513
558
 
514
559
// handle "connbroken" messages
515
 
func (sess *ClientSession) handleConnBroken(connBroken *serverMsg) error {
 
560
func (sess *clientSession) handleConnBroken(connBroken *serverMsg) error {
516
561
        sess.setState(Error)
517
562
        reason := connBroken.Reason
518
563
        err := fmt.Errorf("server broke connection: %s", reason)
525
570
}
526
571
 
527
572
// handle "setparams" messages
528
 
func (sess *ClientSession) handleSetParams(setParams *serverMsg) error {
 
573
func (sess *clientSession) handleSetParams(setParams *serverMsg) error {
529
574
        if setParams.SetCookie != "" {
530
575
                sess.setCookie(setParams.SetCookie)
531
576
        }
533
578
}
534
579
 
535
580
// loop runs the session with the server, emits a stream of events.
536
 
func (sess *ClientSession) loop() error {
 
581
func (sess *clientSession) loop() error {
537
582
        var err error
538
583
        var recv serverMsg
539
584
        sess.setState(Running)
571
616
}
572
617
 
573
618
// Call this when you've connected and want to start looping.
574
 
func (sess *ClientSession) start() error {
 
619
func (sess *clientSession) start() error {
575
620
        conn := sess.getConnection()
576
621
        err := conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
577
622
        if err != nil {
634
679
 
635
680
// run calls connect, and if it works it calls start, and if it works
636
681
// it runs loop in a goroutine, and ships its return value over ErrCh.
637
 
func (sess *ClientSession) run(closer func(), authChecker, hostGetter, connecter, starter, looper func() error) error {
638
 
        closer()
 
682
func (sess *clientSession) run(closer func(bool), authChecker, hostGetter, connecter, starter, looper func() error) error {
 
683
        closer(false)
639
684
        if err := authChecker(); err != nil {
640
685
                return err
641
686
        }
646
691
        if err == nil {
647
692
                err = starter()
648
693
                if err == nil {
649
 
                        sess.ErrCh = make(chan error, 1)
650
 
                        sess.BroadcastCh = make(chan *BroadcastNotification)
651
 
                        sess.NotificationsCh = make(chan AddressedNotification)
652
 
                        go func() { sess.ErrCh <- looper() }()
 
694
                        go func() { sess.errCh <- looper() }()
653
695
                }
654
696
        }
655
697
        return err
656
698
}
657
699
 
658
700
// This Jitter returns a random time.Duration somewhere in [-spread, spread].
659
 
func (sess *ClientSession) Jitter(spread time.Duration) time.Duration {
 
701
func (sess *clientSession) Jitter(spread time.Duration) time.Duration {
660
702
        if spread < 0 {
661
703
                panic("spread must be non-negative")
662
704
        }
666
708
 
667
709
// Dial takes the session from newly created (or newly disconnected)
668
710
// to running the main loop.
669
 
func (sess *ClientSession) Dial() error {
 
711
func (sess *clientSession) Dial() error {
670
712
        if sess.Protocolator == nil {
671
713
                // a missing protocolator means you've willfully overridden
672
714
                // it; returning an error here would prompt AutoRedial to just
676
718
        return sess.run(sess.doClose, sess.addAuthorization, sess.getHosts, sess.connect, sess.start, sess.loop)
677
719
}
678
720
 
 
721
func (sess *clientSession) shutdown() {
 
722
        sess.Log.Infof("session shutting down.")
 
723
        sess.connLock.Lock()
 
724
        defer sess.connLock.Unlock()
 
725
        sess.stopRedial()
 
726
        sess.closeConnection()
 
727
}
 
728
 
 
729
func (sess *clientSession) doKeepConnection() {
 
730
        for {
 
731
                select {
 
732
                case cmd := <-sess.cmdCh:
 
733
                        switch cmd {
 
734
                        case cmdConnect:
 
735
                                sess.connHandler(true)
 
736
                        case cmdDisconnect:
 
737
                                sess.connHandler(false)
 
738
                        case cmdResetCookie:
 
739
                                sess.resetCookie()
 
740
                        }
 
741
                case <-sess.stopCh:
 
742
                        sess.shutdown()
 
743
                        return
 
744
                case n := <-sess.doneCh:
 
745
                        // if n == 0, the redialer aborted. If you do
 
746
                        // anything other than log it, keep that in mind.
 
747
                        sess.Log.Debugf("connected after %d attempts.", n)
 
748
                case err := <-sess.errCh:
 
749
                        sess.errHandler(err)
 
750
                }
 
751
        }
 
752
}
 
753
 
 
754
func (sess *clientSession) handleConn(hasConn bool) {
 
755
        sess.lastConn = hasConn
 
756
 
 
757
        // Note this does not depend on the current state!  That's because Dial
 
758
        // starts with doClose, which gets you to Disconnected even if you're
 
759
        // connected, and you can call Close when Disconnected without it
 
760
        // losing its stuff.
 
761
        if hasConn {
 
762
                sess.autoRedial()
 
763
        } else {
 
764
                sess.stopRedial()
 
765
                sess.doClose(false)
 
766
        }
 
767
}
 
768
 
 
769
func (sess *clientSession) handleErr(err error) {
 
770
        sess.Log.Errorf("session error'ed out with %v", err)
 
771
        // State() == Error mostly defends interrupting an ongoing
 
772
        // autoRedial if we went quickly already through hasConn =
 
773
        // false => hasConn = true
 
774
        if sess.State() == Error && sess.lastConn {
 
775
                sess.autoRedial()
 
776
        }
 
777
}
 
778
 
 
779
func (sess *clientSession) KeepConnection() error {
 
780
        sess.stateLock.Lock()
 
781
        defer sess.stateLock.Unlock()
 
782
        if sess.state != Pristine {
 
783
                return errors.New("don't call KeepConnection() on a non-pristine session.")
 
784
        }
 
785
        sess.state = Disconnected
 
786
 
 
787
        go sess.doKeepConnection()
 
788
 
 
789
        return nil
 
790
}
 
791
 
 
792
func (sess *clientSession) StopKeepConnection() {
 
793
        sess.setState(Shutdown)
 
794
        close(sess.stopCh)
 
795
}
 
796
 
 
797
func (sess *clientSession) HasConnectivity(hasConn bool) {
 
798
        if hasConn {
 
799
                sess.cmdCh <- cmdConnect
 
800
        } else {
 
801
                sess.cmdCh <- cmdDisconnect
 
802
        }
 
803
}
 
804
 
679
805
func init() {
680
806
        rand.Seed(time.Now().Unix()) // good enough for us (we're not using it for crypto)
681
807
}