~nskaggs/+junk/xenial-test

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/state/presence/presence.go

  • Committer: Nicholas Skaggs
  • Date: 2016-10-24 20:56:05 UTC
  • Revision ID: nicholas.skaggs@canonical.com-20161024205605-z8lta0uvuhtxwzwl
Initi with beta15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// Copyright 2012, 2013 Canonical Ltd.
 
2
// Licensed under the AGPLv3, see LICENCE file for details.
 
3
 
 
4
// The presence package implements an interface for observing liveness
 
5
// of arbitrary keys (agents, processes, etc) on top of MongoDB.
 
6
// The design works by periodically updating the database so that
 
7
// watchers can tell an arbitrary key is alive.
 
8
package presence
 
9
 
 
10
import (
 
11
        "fmt"
 
12
        "strconv"
 
13
        "sync"
 
14
        "time"
 
15
 
 
16
        "github.com/juju/errors"
 
17
        "github.com/juju/juju/worker"
 
18
        "github.com/juju/loggo"
 
19
        "gopkg.in/juju/names.v2"
 
20
        "gopkg.in/mgo.v2"
 
21
        "gopkg.in/mgo.v2/bson"
 
22
        "launchpad.net/tomb"
 
23
)
 
24
 
 
25
var logger = loggo.GetLogger("juju.state.presence")
 
26
 
 
27
// Agent shouldn't really live here -- it's not used in this package,
 
28
// and is implemented by a couple of state types for the convenience of
 
29
// the apiserver -- but one of the methods returns a concrete *Pinger,
 
30
// and that ties it down here quite effectively (until we want to take
 
31
// on the task of cleaning it up and promoting it to core, which might
 
32
// well never happen).
 
33
type Agent interface {
 
34
        AgentPresence() (bool, error)
 
35
        SetAgentPresence() (*Pinger, error)
 
36
        WaitAgentPresence(time.Duration) error
 
37
}
 
38
 
 
39
// docIDInt64 generates a globally unique id value
 
40
// where the model uuid is prefixed to the
 
41
// given int64 localID.
 
42
func docIDInt64(modelUUID string, localID int64) string {
 
43
        return modelUUID + ":" + strconv.FormatInt(localID, 10)
 
44
}
 
45
 
 
46
// docIDStr generates a globally unique id value
 
47
// where the model uuid is prefixed to the
 
48
// given string localID.
 
49
func docIDStr(modelUUID string, localID string) string {
 
50
        return modelUUID + ":" + localID
 
51
}
 
52
 
 
53
// The implementation works by assigning a unique sequence number to each
 
54
// pinger that is alive, and the pinger is then responsible for
 
55
// periodically updating the current time slot document with its
 
56
// sequence number so that watchers can tell it is alive.
 
57
//
 
58
// There is only one time slot document per time slot, per model. The
 
59
// internal implementation of the time slot document is as follows:
 
60
//
 
61
// {
 
62
//   "_id":   <model UUID>:<time slot>,
 
63
//   "slot": <slot>,
 
64
//   "model-uuid": <model UUID>,
 
65
//   "alive": { hex(<pinger seq> / 63) : (1 << (<pinger seq> % 63) | <others>) },
 
66
//   "dead":  { hex(<pinger seq> / 63) : (1 << (<pinger seq> % 63) | <others>) },
 
67
// }
 
68
//
 
69
// All pingers that have their sequence number under "alive" and not
 
70
// under "dead" are currently alive. This design enables implementing
 
71
// a ping with a single update operation, a kill with another operation,
 
72
// and obtaining liveness data with a single query that returns two
 
73
// documents (the last two time slots).
 
74
//
 
75
// A new pinger sequence is obtained every time a pinger starts by atomically
 
76
// incrementing a counter in a document in a helper collection. There is only
 
77
// one such document per model. That sequence number is then inserted
 
78
// into the beings collection to establish the mapping between pinger sequence
 
79
// and key.
 
80
 
 
81
// BUG(gn): The pings and beings collection currently grow without bound.
 
82
 
 
83
// A Watcher can watch any number of pinger keys for liveness changes.
 
84
type Watcher struct {
 
85
        modelUUID string
 
86
        tomb      tomb.Tomb
 
87
        base      *mgo.Collection
 
88
        pings     *mgo.Collection
 
89
        beings    *mgo.Collection
 
90
 
 
91
        // delta is an approximate clock skew between the local system
 
92
        // clock and the database clock.
 
93
        delta time.Duration
 
94
 
 
95
        // beingKey and beingSeq are the pinger seq <=> key mappings.
 
96
        // Entries in these maps are considered alive.
 
97
        beingKey map[int64]string
 
98
        beingSeq map[string]int64
 
99
 
 
100
        // watches has the per-key observer channels from Watch/Unwatch.
 
101
        watches map[string][]chan<- Change
 
102
 
 
103
        // pending contains all the events to be dispatched to the watcher
 
104
        // channels. They're queued during processing and flushed at the
 
105
        // end to simplify the algorithm.
 
106
        pending []event
 
107
 
 
108
        // request is used to deliver requests from the public API into
 
109
        // the the gorotuine loop.
 
110
        request chan interface{}
 
111
 
 
112
        // syncDone contains pending done channels from sync requests.
 
113
        syncDone []chan bool
 
114
 
 
115
        // next will dispatch when it's time to sync the database
 
116
        // knowledge. It's maintained here so that ForceRefresh
 
117
        // can manipulate it to force a sync sooner.
 
118
        next <-chan time.Time
 
119
}
 
120
 
 
121
type event struct {
 
122
        ch    chan<- Change
 
123
        key   string
 
124
        alive bool
 
125
}
 
126
 
 
127
// Change holds a liveness change notification.
 
128
type Change struct {
 
129
        Key   string
 
130
        Alive bool
 
131
}
 
132
 
 
133
// NewWatcher returns a new Watcher.
 
134
func NewWatcher(base *mgo.Collection, modelTag names.ModelTag) *Watcher {
 
135
        w := &Watcher{
 
136
                modelUUID: modelTag.Id(),
 
137
                base:      base,
 
138
                pings:     pingsC(base),
 
139
                beings:    beingsC(base),
 
140
                beingKey:  make(map[int64]string),
 
141
                beingSeq:  make(map[string]int64),
 
142
                watches:   make(map[string][]chan<- Change),
 
143
                request:   make(chan interface{}),
 
144
        }
 
145
        go func() {
 
146
                err := w.loop()
 
147
                cause := errors.Cause(err)
 
148
                // tomb expects ErrDying or ErrStillAlive as
 
149
                // exact values, so we need to log and unwrap
 
150
                // the error first.
 
151
                if err != nil && cause != tomb.ErrDying {
 
152
                        logger.Infof("watcher loop failed: %v", err)
 
153
                }
 
154
                w.tomb.Kill(cause)
 
155
                w.tomb.Done()
 
156
        }()
 
157
        return w
 
158
}
 
159
 
 
160
// Kill is part of the worker.Worker interface.
 
161
func (w *Watcher) Kill() {
 
162
        w.tomb.Kill(nil)
 
163
}
 
164
 
 
165
// Wait is part of the worker.Worker interface.
 
166
func (w *Watcher) Wait() error {
 
167
        return w.tomb.Wait()
 
168
}
 
169
 
 
170
// Stop stops all the watcher activities.
 
171
func (w *Watcher) Stop() error {
 
172
        return worker.Stop(w)
 
173
}
 
174
 
 
175
// Dead returns a channel that is closed when the watcher has stopped.
 
176
func (w *Watcher) Dead() <-chan struct{} {
 
177
        return w.tomb.Dead()
 
178
}
 
179
 
 
180
// Err returns the error with which the watcher stopped.
 
181
// It returns nil if the watcher stopped cleanly, tomb.ErrStillAlive
 
182
// if the watcher is still running properly, or the respective error
 
183
// if the watcher is terminating or has terminated with an error.
 
184
func (w *Watcher) Err() error {
 
185
        return w.tomb.Err()
 
186
}
 
187
 
 
188
type reqWatch struct {
 
189
        key string
 
190
        ch  chan<- Change
 
191
}
 
192
 
 
193
type reqUnwatch struct {
 
194
        key string
 
195
        ch  chan<- Change
 
196
}
 
197
 
 
198
type reqSync struct {
 
199
        done chan bool
 
200
}
 
201
 
 
202
type reqAlive struct {
 
203
        key    string
 
204
        result chan bool
 
205
}
 
206
 
 
207
func (w *Watcher) sendReq(req interface{}) {
 
208
        select {
 
209
        case w.request <- req:
 
210
        case <-w.tomb.Dying():
 
211
        }
 
212
}
 
213
 
 
214
// Watch starts watching the liveness of key. An event will
 
215
// be sent onto ch to report the initial status for the key, and
 
216
// from then on a new event will be sent whenever a change is
 
217
// detected. Change values sent to the channel must be consumed,
 
218
// or the whole watcher will blocked.
 
219
func (w *Watcher) Watch(key string, ch chan<- Change) {
 
220
        w.sendReq(reqWatch{key, ch})
 
221
}
 
222
 
 
223
// Unwatch stops watching the liveness of key via ch.
 
224
func (w *Watcher) Unwatch(key string, ch chan<- Change) {
 
225
        w.sendReq(reqUnwatch{key, ch})
 
226
}
 
227
 
 
228
// StartSync forces the watcher to load new events from the database.
 
229
func (w *Watcher) StartSync() {
 
230
        w.sendReq(reqSync{nil})
 
231
}
 
232
 
 
233
// Sync forces the watcher to load new events from the database and blocks
 
234
// until all events have been dispatched.
 
235
func (w *Watcher) Sync() {
 
236
        done := make(chan bool)
 
237
        w.sendReq(reqSync{done})
 
238
        select {
 
239
        case <-done:
 
240
        case <-w.tomb.Dying():
 
241
        }
 
242
}
 
243
 
 
244
// Alive returns whether the key is currently considered alive by w,
 
245
// or an error in case the watcher is dying.
 
246
func (w *Watcher) Alive(key string) (bool, error) {
 
247
        result := make(chan bool, 1)
 
248
        w.sendReq(reqAlive{key, result})
 
249
        var alive bool
 
250
        select {
 
251
        case alive = <-result:
 
252
        case <-w.tomb.Dying():
 
253
                return false, errors.Errorf("cannot check liveness: watcher is dying")
 
254
        }
 
255
        return alive, nil
 
256
}
 
257
 
 
258
// period is the length of each time slot in seconds.
 
259
// It's not a time.Duration because the code is more convenient like
 
260
// this and also because sub-second timings don't work as the slot
 
261
// identifier is an int64 in seconds.
 
262
var period int64 = 30
 
263
 
 
264
// loop implements the main watcher loop.
 
265
func (w *Watcher) loop() error {
 
266
        var err error
 
267
        if w.delta, err = clockDelta(w.base); err != nil {
 
268
                return errors.Trace(err)
 
269
        }
 
270
        w.next = time.After(0)
 
271
        for {
 
272
                select {
 
273
                case <-w.tomb.Dying():
 
274
                        return errors.Trace(tomb.ErrDying)
 
275
                case <-w.next:
 
276
                        w.next = time.After(time.Duration(period) * time.Second)
 
277
                        syncDone := w.syncDone
 
278
                        w.syncDone = nil
 
279
                        if err := w.sync(); err != nil {
 
280
                                return errors.Trace(err)
 
281
                        }
 
282
                        w.flush()
 
283
                        for _, done := range syncDone {
 
284
                                close(done)
 
285
                        }
 
286
                case req := <-w.request:
 
287
                        w.handle(req)
 
288
                        w.flush()
 
289
                }
 
290
        }
 
291
}
 
292
 
 
293
// flush sends all pending events to their respective channels.
 
294
func (w *Watcher) flush() {
 
295
        // w.pending may get new requests as we handle other requests.
 
296
        for i := 0; i < len(w.pending); i++ {
 
297
                e := &w.pending[i]
 
298
                for e.ch != nil {
 
299
                        select {
 
300
                        case <-w.tomb.Dying():
 
301
                                return
 
302
                        case req := <-w.request:
 
303
                                w.handle(req)
 
304
                                continue
 
305
                        case e.ch <- Change{e.key, e.alive}:
 
306
                        }
 
307
                        break
 
308
                }
 
309
        }
 
310
        w.pending = w.pending[:0]
 
311
}
 
312
 
 
313
// handle deals with requests delivered by the public API
 
314
// onto the background watcher goroutine.
 
315
func (w *Watcher) handle(req interface{}) {
 
316
        logger.Tracef("got request: %#v", req)
 
317
        switch r := req.(type) {
 
318
        case reqSync:
 
319
                w.next = time.After(0)
 
320
                if r.done != nil {
 
321
                        w.syncDone = append(w.syncDone, r.done)
 
322
                }
 
323
        case reqWatch:
 
324
                for _, ch := range w.watches[r.key] {
 
325
                        if ch == r.ch {
 
326
                                panic("adding channel twice for same key")
 
327
                        }
 
328
                }
 
329
                w.watches[r.key] = append(w.watches[r.key], r.ch)
 
330
                _, alive := w.beingSeq[r.key]
 
331
                w.pending = append(w.pending, event{r.ch, r.key, alive})
 
332
        case reqUnwatch:
 
333
                watches := w.watches[r.key]
 
334
                for i, ch := range watches {
 
335
                        if ch == r.ch {
 
336
                                watches[i] = watches[len(watches)-1]
 
337
                                w.watches[r.key] = watches[:len(watches)-1]
 
338
                                break
 
339
                        }
 
340
                }
 
341
                for i := range w.pending {
 
342
                        e := &w.pending[i]
 
343
                        if e.key == r.key && e.ch == r.ch {
 
344
                                e.ch = nil
 
345
                        }
 
346
                }
 
347
        case reqAlive:
 
348
                _, alive := w.beingSeq[r.key]
 
349
                r.result <- alive
 
350
        default:
 
351
                panic(fmt.Errorf("unknown request: %T", req))
 
352
        }
 
353
}
 
354
 
 
355
type beingInfo struct {
 
356
        DocID     string `bson:"_id"`
 
357
        Seq       int64  `bson:"seq,omitempty"`
 
358
        ModelUUID string `bson:"model-uuid,omitempty"`
 
359
        Key       string `bson:"key,omitempty"`
 
360
}
 
361
 
 
362
type pingInfo struct {
 
363
        DocID string           `bson:"_id"`
 
364
        Slot  int64            `bson:"slot,omitempty"`
 
365
        Alive map[string]int64 `bson:",omitempty"`
 
366
        Dead  map[string]int64 `bson:",omitempty"`
 
367
}
 
368
 
 
369
func (w *Watcher) findAllBeings() (map[int64]beingInfo, error) {
 
370
        beings := make([]beingInfo, 0)
 
371
        session := w.beings.Database.Session.Copy()
 
372
        defer session.Close()
 
373
        beingsC := w.beings.With(session)
 
374
 
 
375
        err := beingsC.Find(bson.D{{"model-uuid", w.modelUUID}}).All(&beings)
 
376
        if err != nil {
 
377
                return nil, err
 
378
        }
 
379
        beingInfos := make(map[int64]beingInfo, len(beings))
 
380
        for _, being := range beings {
 
381
                beingInfos[being.Seq] = being
 
382
        }
 
383
        return beingInfos, nil
 
384
}
 
385
 
 
386
// sync updates the watcher knowledge from the database, and
 
387
// queues events to observing channels. It fetches the last two time
 
388
// slots and compares the union of both to the in-memory state.
 
389
func (w *Watcher) sync() error {
 
390
        var allBeings map[int64]beingInfo
 
391
        if len(w.beingKey) == 0 {
 
392
                // The very first time we sync, we grab all ever-known beings,
 
393
                // so we don't have to look them up one-by-one
 
394
                var err error
 
395
                if allBeings, err = w.findAllBeings(); err != nil {
 
396
                        return errors.Trace(err)
 
397
                }
 
398
        }
 
399
        // TODO(perrito666) 2016-05-02 lp:1558657
 
400
        s := timeSlot(time.Now(), w.delta)
 
401
        slot := docIDInt64(w.modelUUID, s)
 
402
        previousSlot := docIDInt64(w.modelUUID, s-period)
 
403
        session := w.pings.Database.Session.Copy()
 
404
        defer session.Close()
 
405
        pings := w.pings.With(session)
 
406
        var ping []pingInfo
 
407
        q := bson.D{{"$or", []pingInfo{{DocID: slot}, {DocID: previousSlot}}}}
 
408
        err := pings.Find(q).All(&ping)
 
409
        if err != nil && err != mgo.ErrNotFound {
 
410
                return errors.Trace(err)
 
411
        }
 
412
 
 
413
        // Learn about all enforced deaths.
 
414
        // TODO(ericsnow) Remove this once KillForTesting() goes away.
 
415
        dead := make(map[int64]bool)
 
416
        for i := range ping {
 
417
                for key, value := range ping[i].Dead {
 
418
                        k, err := strconv.ParseInt(key, 16, 64)
 
419
                        if err != nil {
 
420
                                err = errors.Annotatef(err, "presence cannot parse dead key: %q", key)
 
421
                                panic(err)
 
422
                        }
 
423
                        k *= 63
 
424
                        for i := int64(0); i < 63 && value > 0; i++ {
 
425
                                on := value&1 == 1
 
426
                                value >>= 1
 
427
                                if !on {
 
428
                                        continue
 
429
                                }
 
430
                                seq := k + i
 
431
                                dead[seq] = true
 
432
                                logger.Tracef("found seq=%d dead", seq)
 
433
                        }
 
434
                }
 
435
        }
 
436
 
 
437
        // Learn about all the pingers that reported and queue
 
438
        // events for those that weren't known to be alive and
 
439
        // are not reportedly dead either.
 
440
        beingsC := w.beings.With(session)
 
441
        alive := make(map[int64]bool)
 
442
        being := beingInfo{}
 
443
        for i := range ping {
 
444
                for key, value := range ping[i].Alive {
 
445
                        k, err := strconv.ParseInt(key, 16, 64)
 
446
                        if err != nil {
 
447
                                err = errors.Annotatef(err, "presence cannot parse alive key: %q", key)
 
448
                                panic(err)
 
449
                        }
 
450
                        k *= 63
 
451
                        for i := int64(0); i < 63 && value > 0; i++ {
 
452
                                on := value&1 == 1
 
453
                                value >>= 1
 
454
                                if !on {
 
455
                                        continue
 
456
                                }
 
457
                                seq := k + i
 
458
                                alive[seq] = true
 
459
                                if _, ok := w.beingKey[seq]; ok {
 
460
                                        continue
 
461
                                }
 
462
                                // Check if the being exists in the 'all' map,
 
463
                                // otherwise do a single lookup in mongo
 
464
                                var ok bool
 
465
                                if being, ok = allBeings[seq]; !ok {
 
466
                                        err := beingsC.Find(bson.D{{"_id", docIDInt64(w.modelUUID, seq)}}).One(&being)
 
467
                                        if err == mgo.ErrNotFound {
 
468
                                                logger.Tracef("found seq=%d unowned", seq)
 
469
                                                continue
 
470
                                        }
 
471
                                        if err != nil {
 
472
                                                return errors.Trace(err)
 
473
                                        }
 
474
                                }
 
475
                                cur := w.beingSeq[being.Key]
 
476
                                if cur < seq {
 
477
                                        delete(w.beingKey, cur)
 
478
                                } else {
 
479
                                        // Current sequence is more recent.
 
480
                                        continue
 
481
                                }
 
482
                                w.beingKey[seq] = being.Key
 
483
                                w.beingSeq[being.Key] = seq
 
484
                                if cur > 0 || dead[seq] {
 
485
                                        continue
 
486
                                }
 
487
                                logger.Tracef("found seq=%d alive with key %q", seq, being.Key)
 
488
                                for _, ch := range w.watches[being.Key] {
 
489
                                        w.pending = append(w.pending, event{ch, being.Key, true})
 
490
                                }
 
491
                        }
 
492
                }
 
493
        }
 
494
 
 
495
        // Pingers that were known to be alive and haven't reported
 
496
        // in the last two slots are now considered dead. Dispatch
 
497
        // the respective events and forget their sequences.
 
498
        for seq, key := range w.beingKey {
 
499
                if dead[seq] || !alive[seq] {
 
500
                        delete(w.beingKey, seq)
 
501
                        delete(w.beingSeq, key)
 
502
                        for _, ch := range w.watches[key] {
 
503
                                w.pending = append(w.pending, event{ch, key, false})
 
504
                        }
 
505
                }
 
506
        }
 
507
        return nil
 
508
}
 
509
 
 
510
// Pinger periodically reports that a specific key is alive, so that
 
511
// watchers interested on that fact can react appropriately.
 
512
type Pinger struct {
 
513
        modelUUID string
 
514
        mu        sync.Mutex
 
515
        tomb      tomb.Tomb
 
516
        base      *mgo.Collection
 
517
        pings     *mgo.Collection
 
518
        started   bool
 
519
        beingKey  string
 
520
        beingSeq  int64
 
521
        fieldKey  string // hex(beingKey / 63)
 
522
        fieldBit  uint64 // 1 << (beingKey%63)
 
523
        lastSlot  int64
 
524
        delta     time.Duration
 
525
}
 
526
 
 
527
// NewPinger returns a new Pinger to report that key is alive.
 
528
// It starts reporting after Start is called.
 
529
func NewPinger(base *mgo.Collection, modelTag names.ModelTag, key string) *Pinger {
 
530
        return &Pinger{
 
531
                base:      base,
 
532
                pings:     pingsC(base),
 
533
                beingKey:  key,
 
534
                modelUUID: modelTag.Id(),
 
535
        }
 
536
}
 
537
 
 
538
// Start starts periodically reporting that p's key is alive.
 
539
func (p *Pinger) Start() error {
 
540
        p.mu.Lock()
 
541
        defer p.mu.Unlock()
 
542
        if p.started {
 
543
                return errors.Errorf("pinger already started")
 
544
        }
 
545
        p.tomb = tomb.Tomb{}
 
546
        if err := p.prepare(); err != nil {
 
547
                return errors.Trace(err)
 
548
        }
 
549
        logger.Tracef("starting pinger for %q with seq=%d", p.beingKey, p.beingSeq)
 
550
        if err := p.ping(); err != nil {
 
551
                return errors.Trace(err)
 
552
        }
 
553
        p.started = true
 
554
        go func() {
 
555
                err := p.loop()
 
556
                cause := errors.Cause(err)
 
557
                // tomb expects ErrDying or ErrStillAlive as
 
558
                // exact values, so we need to log and unwrap
 
559
                // the error first.
 
560
                if err != nil && cause != tomb.ErrDying {
 
561
                        logger.Infof("pinger loop failed: %v", err)
 
562
                }
 
563
                p.tomb.Kill(cause)
 
564
                p.tomb.Done()
 
565
        }()
 
566
        return nil
 
567
}
 
568
 
 
569
// Kill is part of the worker.Worker interface.
 
570
func (p *Pinger) Kill() {
 
571
        p.tomb.Kill(nil)
 
572
}
 
573
 
 
574
// Wait returns when the Pinger has stopped, and returns the first error
 
575
// it encountered.
 
576
func (p *Pinger) Wait() error {
 
577
        return p.tomb.Wait()
 
578
}
 
579
 
 
580
// Stop stops p's periodical ping.
 
581
// Watchers will not notice p has stopped pinging until the
 
582
// previous ping times out.
 
583
func (p *Pinger) Stop() error {
 
584
        p.mu.Lock()
 
585
        defer p.mu.Unlock()
 
586
        if p.started {
 
587
                logger.Tracef("stopping pinger for %q with seq=%d", p.beingKey, p.beingSeq)
 
588
        }
 
589
        p.tomb.Kill(nil)
 
590
        err := p.tomb.Wait()
 
591
        // TODO ping one more time to guarantee a late timeout.
 
592
        p.started = false
 
593
        return errors.Trace(err)
 
594
 
 
595
}
 
596
 
 
597
// KillForTesting stops p's periodical ping and immediately reports that it is dead.
 
598
// TODO(ericsnow) We should be able to drop this and the two kill* methods.
 
599
func (p *Pinger) KillForTesting() error {
 
600
        p.mu.Lock()
 
601
        defer p.mu.Unlock()
 
602
        if p.started {
 
603
                logger.Tracef("killing pinger for %q (was started)", p.beingKey)
 
604
                return p.killStarted()
 
605
        }
 
606
        logger.Tracef("killing pinger for %q (was stopped)", p.beingKey)
 
607
        return p.killStopped()
 
608
}
 
609
 
 
610
// killStarted kills the pinger while it is running, by first
 
611
// stopping it and then recording in the last pinged slot that
 
612
// the pinger was killed.
 
613
func (p *Pinger) killStarted() error {
 
614
        p.tomb.Kill(nil)
 
615
        killErr := p.tomb.Wait()
 
616
        p.started = false
 
617
 
 
618
        slot := p.lastSlot
 
619
        udoc := bson.D{
 
620
                {"$set", bson.D{{"slot", slot}}},
 
621
                {"$inc", bson.D{{"dead." + p.fieldKey, p.fieldBit}}}}
 
622
        session := p.pings.Database.Session.Copy()
 
623
        defer session.Close()
 
624
        pings := p.pings.With(session)
 
625
        if _, err := pings.UpsertId(docIDInt64(p.modelUUID, slot), udoc); err != nil {
 
626
                return errors.Trace(err)
 
627
        }
 
628
        return errors.Trace(killErr)
 
629
}
 
630
 
 
631
// killStopped kills the pinger while it is not running, by
 
632
// first allocating a new sequence, and then atomically recording
 
633
// the new sequence both as alive and dead at once.
 
634
func (p *Pinger) killStopped() error {
 
635
        if err := p.prepare(); err != nil {
 
636
                return err
 
637
        }
 
638
        // TODO(perrito666) 2016-05-02 lp:1558657
 
639
        slot := timeSlot(time.Now(), p.delta)
 
640
        udoc := bson.D{
 
641
                {"$set", bson.D{{"slot", slot}}},
 
642
                {"$inc", bson.D{
 
643
                        {"dead." + p.fieldKey, p.fieldBit},
 
644
                        {"alive." + p.fieldKey, p.fieldBit},
 
645
                }}}
 
646
        session := p.pings.Database.Session.Copy()
 
647
        defer session.Close()
 
648
        pings := p.pings.With(session)
 
649
        _, err := pings.UpsertId(docIDInt64(p.modelUUID, slot), udoc)
 
650
        return errors.Trace(err)
 
651
}
 
652
 
 
653
// loop is the main pinger loop that runs while it is
 
654
// in started state.
 
655
func (p *Pinger) loop() error {
 
656
        for {
 
657
                select {
 
658
                case <-p.tomb.Dying():
 
659
                        return errors.Trace(tomb.ErrDying)
 
660
                case <-time.After(time.Duration(float64(period+1)*0.75) * time.Second):
 
661
                        if err := p.ping(); err != nil {
 
662
                                return errors.Trace(err)
 
663
                        }
 
664
                }
 
665
        }
 
666
}
 
667
 
 
668
// prepare allocates a new unique sequence for the
 
669
// pinger key and prepares the pinger to use it.
 
670
func (p *Pinger) prepare() error {
 
671
        change := mgo.Change{
 
672
                Update:    bson.D{{"$inc", bson.D{{"seq", int64(1)}}}},
 
673
                Upsert:    true,
 
674
                ReturnNew: true,
 
675
        }
 
676
        session := p.base.Database.Session.Copy()
 
677
        defer session.Close()
 
678
        base := p.base.With(session)
 
679
        seqs := seqsC(base)
 
680
        var seq struct{ Seq int64 }
 
681
        seqID := docIDStr(p.modelUUID, "beings")
 
682
        if _, err := seqs.FindId(seqID).Apply(change, &seq); err != nil {
 
683
                return errors.Trace(err)
 
684
        }
 
685
        p.beingSeq = seq.Seq
 
686
        p.fieldKey = fmt.Sprintf("%x", p.beingSeq/63)
 
687
        p.fieldBit = 1 << uint64(p.beingSeq%63)
 
688
        p.lastSlot = 0
 
689
        beings := beingsC(base)
 
690
        return errors.Trace(beings.Insert(
 
691
                beingInfo{
 
692
                        DocID:     docIDInt64(p.modelUUID, p.beingSeq),
 
693
                        Seq:       p.beingSeq,
 
694
                        ModelUUID: p.modelUUID,
 
695
                        Key:       p.beingKey,
 
696
                },
 
697
        ))
 
698
}
 
699
 
 
700
// ping records updates the current time slot with the
 
701
// sequence in use by the pinger.
 
702
func (p *Pinger) ping() (err error) {
 
703
        logger.Tracef("pinging %q with seq=%d", p.beingKey, p.beingSeq)
 
704
        defer func() {
 
705
                // If the session is killed from underneath us, it panics when we
 
706
                // try to copy it, so deal with that here.
 
707
                if v := recover(); v != nil {
 
708
                        err = fmt.Errorf("%v", v)
 
709
                }
 
710
        }()
 
711
        session := p.pings.Database.Session.Copy()
 
712
        defer session.Close()
 
713
        if p.delta == 0 {
 
714
                base := p.base.With(session)
 
715
                delta, err := clockDelta(base)
 
716
                if err != nil {
 
717
                        return errors.Trace(err)
 
718
                }
 
719
                p.delta = delta
 
720
        }
 
721
        // TODO(perrito666) 2016-05-02 lp:1558657
 
722
        slot := timeSlot(time.Now(), p.delta)
 
723
        if slot == p.lastSlot {
 
724
                // Never, ever, ping the same slot twice.
 
725
                // The increment below would corrupt the slot.
 
726
                return nil
 
727
        }
 
728
        p.lastSlot = slot
 
729
        pings := p.pings.With(session)
 
730
        _, err = pings.UpsertId(
 
731
                docIDInt64(p.modelUUID, slot),
 
732
                bson.D{
 
733
                        {"$set", bson.D{{"slot", slot}}},
 
734
                        {"$inc", bson.D{{"alive." + p.fieldKey, p.fieldBit}}},
 
735
                })
 
736
        return errors.Trace(err)
 
737
}
 
738
 
 
739
// clockDelta returns the approximate skew between
 
740
// the local clock and the database clock.
 
741
func clockDelta(c *mgo.Collection) (time.Duration, error) {
 
742
        var server struct {
 
743
                time.Time `bson:"retval"`
 
744
        }
 
745
        var isMaster struct {
 
746
                LocalTime time.Time `bson:"localTime"`
 
747
        }
 
748
        var after time.Time
 
749
        var before time.Time
 
750
        var serverDelay time.Duration
 
751
        supportsMasterLocalTime := true
 
752
        session := c.Database.Session.Copy()
 
753
        defer session.Close()
 
754
        db := c.Database.With(session)
 
755
        for i := 0; i < 10; i++ {
 
756
                if supportsMasterLocalTime {
 
757
                        // Try isMaster.localTime, which is present since MongoDB 2.2
 
758
                        // and does not require admin privileges.
 
759
                        // TODO(perrito666) 2016-05-02 lp:1558657
 
760
                        before = time.Now()
 
761
                        err := db.Run("isMaster", &isMaster)
 
762
                        // TODO(perrito666) 2016-05-02 lp:1558657
 
763
                        after = time.Now()
 
764
                        if err != nil {
 
765
                                return 0, errors.Trace(err)
 
766
                        }
 
767
                        if isMaster.LocalTime.IsZero() {
 
768
                                supportsMasterLocalTime = false
 
769
                                continue
 
770
                        } else {
 
771
                                serverDelay = isMaster.LocalTime.Sub(before)
 
772
                        }
 
773
                } else {
 
774
                        // If MongoDB doesn't have localTime as part of
 
775
                        // isMaster result, it means that the server is likely
 
776
                        // a MongoDB older than 2.2.
 
777
                        //
 
778
                        // Fallback to 'eval' works fine on versions older than
 
779
                        // 2.4 where it does not require admin privileges.
 
780
                        //
 
781
                        // NOTE: 'eval' takes a global write lock unless you
 
782
                        // specify 'nolock' (which we are not doing below, for
 
783
                        // no apparent reason), so it is quite likely that the
 
784
                        // eval could take a relatively long time to acquire
 
785
                        // the lock and thus cause a retry on the callDelay
 
786
                        // check below on a busy server.
 
787
                        // TODO(perrito666) 2016-05-02 lp:1558657
 
788
                        before = time.Now()
 
789
                        err := db.Run(bson.D{{"$eval", "function() { return new Date(); }"}}, &server)
 
790
                        // TODO(perrito666) 2016-05-02 lp:1558657
 
791
                        after = time.Now()
 
792
                        if err != nil {
 
793
                                return 0, errors.Trace(err)
 
794
                        }
 
795
                        serverDelay = server.Sub(before)
 
796
                }
 
797
                // If the call to the server takes longer than a few seconds we
 
798
                // retry it a couple more times before giving up. It is unclear
 
799
                // why the retry would help at all here.
 
800
                //
 
801
                // If the server takes longer than the specified amount of time
 
802
                // on every single try, then we simply give up.
 
803
                callDelay := after.Sub(before)
 
804
                if callDelay > 5*time.Second {
 
805
                        continue
 
806
                }
 
807
                return serverDelay, nil
 
808
        }
 
809
        return 0, errors.Errorf("cannot synchronize clock with database server")
 
810
}
 
811
 
 
812
// timeSlot returns the current time slot, in seconds since the
 
813
// epoch, for the provided now time. The delta skew is applied
 
814
// to the now time to improve the synchronization with a
 
815
// centrally agreed time.
 
816
//
 
817
// The result of this method may be manipulated for test purposes
 
818
// by fakeTimeSlot and realTimeSlot.
 
819
func timeSlot(now time.Time, delta time.Duration) int64 {
 
820
        fakeMutex.Lock()
 
821
        fake := !fakeNow.IsZero()
 
822
        if fake {
 
823
                now = fakeNow
 
824
        }
 
825
        slot := now.Add(delta).Unix()
 
826
        slot -= slot % period
 
827
        if fake {
 
828
                slot += int64(fakeOffset) * period
 
829
        }
 
830
        fakeMutex.Unlock()
 
831
        return slot
 
832
}
 
833
 
 
834
var (
 
835
        fakeMutex  sync.Mutex // protects fakeOffset, fakeNow
 
836
        fakeNow    time.Time
 
837
        fakeOffset int
 
838
)
 
839
 
 
840
// fakeTimeSlot hardcodes the slot time returned by the timeSlot
 
841
// function for testing purposes. The offset parameter is the slot
 
842
// position to return: offsets +1 and -1 are +period and -period
 
843
// seconds from slot 0, respectively.
 
844
func fakeTimeSlot(offset int) {
 
845
        fakeMutex.Lock()
 
846
        if fakeNow.IsZero() {
 
847
                fakeNow = time.Now()
 
848
        }
 
849
        fakeOffset = offset
 
850
        fakeMutex.Unlock()
 
851
        logger.Infof("faking presence to time slot %d", offset)
 
852
}
 
853
 
 
854
// realTimeSlot disables the hardcoding introduced by fakeTimeSlot.
 
855
func realTimeSlot() {
 
856
        fakeMutex.Lock()
 
857
        fakeNow = time.Time{}
 
858
        fakeOffset = 0
 
859
        fakeMutex.Unlock()
 
860
        logger.Infof("not faking presence time. Real time slot in use.")
 
861
}
 
862
 
 
863
func seqsC(base *mgo.Collection) *mgo.Collection {
 
864
        return base.Database.C(base.Name + ".seqs")
 
865
}
 
866
 
 
867
func beingsC(base *mgo.Collection) *mgo.Collection {
 
868
        return base.Database.C(base.Name + ".beings")
 
869
}
 
870
 
 
871
func pingsC(base *mgo.Collection) *mgo.Collection {
 
872
        return base.Database.C(base.Name + ".pings")
 
873
}