1
// Copyright 2012, 2013 Canonical Ltd.
2
// Licensed under the AGPLv3, see LICENCE file for details.
4
// The presence package implements an interface for observing liveness
5
// of arbitrary keys (agents, processes, etc) on top of MongoDB.
6
// The design works by periodically updating the database so that
7
// watchers can tell an arbitrary key is alive.
16
"github.com/juju/errors"
17
"github.com/juju/juju/worker"
18
"github.com/juju/loggo"
19
"gopkg.in/juju/names.v2"
21
"gopkg.in/mgo.v2/bson"
25
var logger = loggo.GetLogger("juju.state.presence")
27
// Agent shouldn't really live here -- it's not used in this package,
28
// and is implemented by a couple of state types for the convenience of
29
// the apiserver -- but one of the methods returns a concrete *Pinger,
30
// and that ties it down here quite effectively (until we want to take
31
// on the task of cleaning it up and promoting it to core, which might
32
// well never happen).
33
type Agent interface {
34
AgentPresence() (bool, error)
35
SetAgentPresence() (*Pinger, error)
36
WaitAgentPresence(time.Duration) error
39
// docIDInt64 generates a globally unique id value
40
// where the model uuid is prefixed to the
41
// given int64 localID.
42
func docIDInt64(modelUUID string, localID int64) string {
43
return modelUUID + ":" + strconv.FormatInt(localID, 10)
46
// docIDStr generates a globally unique id value
47
// where the model uuid is prefixed to the
48
// given string localID.
49
func docIDStr(modelUUID string, localID string) string {
50
return modelUUID + ":" + localID
53
// The implementation works by assigning a unique sequence number to each
54
// pinger that is alive, and the pinger is then responsible for
55
// periodically updating the current time slot document with its
56
// sequence number so that watchers can tell it is alive.
58
// There is only one time slot document per time slot, per model. The
59
// internal implementation of the time slot document is as follows:
62
// "_id": <model UUID>:<time slot>,
64
// "model-uuid": <model UUID>,
65
// "alive": { hex(<pinger seq> / 63) : (1 << (<pinger seq> % 63) | <others>) },
66
// "dead": { hex(<pinger seq> / 63) : (1 << (<pinger seq> % 63) | <others>) },
69
// All pingers that have their sequence number under "alive" and not
70
// under "dead" are currently alive. This design enables implementing
71
// a ping with a single update operation, a kill with another operation,
72
// and obtaining liveness data with a single query that returns two
73
// documents (the last two time slots).
75
// A new pinger sequence is obtained every time a pinger starts by atomically
76
// incrementing a counter in a document in a helper collection. There is only
77
// one such document per model. That sequence number is then inserted
78
// into the beings collection to establish the mapping between pinger sequence
81
// BUG(gn): The pings and beings collection currently grow without bound.
83
// A Watcher can watch any number of pinger keys for liveness changes.
89
beings *mgo.Collection
91
// delta is an approximate clock skew between the local system
92
// clock and the database clock.
95
// beingKey and beingSeq are the pinger seq <=> key mappings.
96
// Entries in these maps are considered alive.
97
beingKey map[int64]string
98
beingSeq map[string]int64
100
// watches has the per-key observer channels from Watch/Unwatch.
101
watches map[string][]chan<- Change
103
// pending contains all the events to be dispatched to the watcher
104
// channels. They're queued during processing and flushed at the
105
// end to simplify the algorithm.
108
// request is used to deliver requests from the public API into
109
// the the gorotuine loop.
110
request chan interface{}
112
// syncDone contains pending done channels from sync requests.
115
// next will dispatch when it's time to sync the database
116
// knowledge. It's maintained here so that ForceRefresh
117
// can manipulate it to force a sync sooner.
118
next <-chan time.Time
127
// Change holds a liveness change notification.
133
// NewWatcher returns a new Watcher.
134
func NewWatcher(base *mgo.Collection, modelTag names.ModelTag) *Watcher {
136
modelUUID: modelTag.Id(),
139
beings: beingsC(base),
140
beingKey: make(map[int64]string),
141
beingSeq: make(map[string]int64),
142
watches: make(map[string][]chan<- Change),
143
request: make(chan interface{}),
147
cause := errors.Cause(err)
148
// tomb expects ErrDying or ErrStillAlive as
149
// exact values, so we need to log and unwrap
151
if err != nil && cause != tomb.ErrDying {
152
logger.Infof("watcher loop failed: %v", err)
160
// Kill is part of the worker.Worker interface.
161
func (w *Watcher) Kill() {
165
// Wait is part of the worker.Worker interface.
166
func (w *Watcher) Wait() error {
170
// Stop stops all the watcher activities.
171
func (w *Watcher) Stop() error {
172
return worker.Stop(w)
175
// Dead returns a channel that is closed when the watcher has stopped.
176
func (w *Watcher) Dead() <-chan struct{} {
180
// Err returns the error with which the watcher stopped.
181
// It returns nil if the watcher stopped cleanly, tomb.ErrStillAlive
182
// if the watcher is still running properly, or the respective error
183
// if the watcher is terminating or has terminated with an error.
184
func (w *Watcher) Err() error {
188
type reqWatch struct {
193
type reqUnwatch struct {
198
type reqSync struct {
202
type reqAlive struct {
207
func (w *Watcher) sendReq(req interface{}) {
209
case w.request <- req:
210
case <-w.tomb.Dying():
214
// Watch starts watching the liveness of key. An event will
215
// be sent onto ch to report the initial status for the key, and
216
// from then on a new event will be sent whenever a change is
217
// detected. Change values sent to the channel must be consumed,
218
// or the whole watcher will blocked.
219
func (w *Watcher) Watch(key string, ch chan<- Change) {
220
w.sendReq(reqWatch{key, ch})
223
// Unwatch stops watching the liveness of key via ch.
224
func (w *Watcher) Unwatch(key string, ch chan<- Change) {
225
w.sendReq(reqUnwatch{key, ch})
228
// StartSync forces the watcher to load new events from the database.
229
func (w *Watcher) StartSync() {
230
w.sendReq(reqSync{nil})
233
// Sync forces the watcher to load new events from the database and blocks
234
// until all events have been dispatched.
235
func (w *Watcher) Sync() {
236
done := make(chan bool)
237
w.sendReq(reqSync{done})
240
case <-w.tomb.Dying():
244
// Alive returns whether the key is currently considered alive by w,
245
// or an error in case the watcher is dying.
246
func (w *Watcher) Alive(key string) (bool, error) {
247
result := make(chan bool, 1)
248
w.sendReq(reqAlive{key, result})
251
case alive = <-result:
252
case <-w.tomb.Dying():
253
return false, errors.Errorf("cannot check liveness: watcher is dying")
258
// period is the length of each time slot in seconds.
259
// It's not a time.Duration because the code is more convenient like
260
// this and also because sub-second timings don't work as the slot
261
// identifier is an int64 in seconds.
262
var period int64 = 30
264
// loop implements the main watcher loop.
265
func (w *Watcher) loop() error {
267
if w.delta, err = clockDelta(w.base); err != nil {
268
return errors.Trace(err)
270
w.next = time.After(0)
273
case <-w.tomb.Dying():
274
return errors.Trace(tomb.ErrDying)
276
w.next = time.After(time.Duration(period) * time.Second)
277
syncDone := w.syncDone
279
if err := w.sync(); err != nil {
280
return errors.Trace(err)
283
for _, done := range syncDone {
286
case req := <-w.request:
293
// flush sends all pending events to their respective channels.
294
func (w *Watcher) flush() {
295
// w.pending may get new requests as we handle other requests.
296
for i := 0; i < len(w.pending); i++ {
300
case <-w.tomb.Dying():
302
case req := <-w.request:
305
case e.ch <- Change{e.key, e.alive}:
310
w.pending = w.pending[:0]
313
// handle deals with requests delivered by the public API
314
// onto the background watcher goroutine.
315
func (w *Watcher) handle(req interface{}) {
316
logger.Tracef("got request: %#v", req)
317
switch r := req.(type) {
319
w.next = time.After(0)
321
w.syncDone = append(w.syncDone, r.done)
324
for _, ch := range w.watches[r.key] {
326
panic("adding channel twice for same key")
329
w.watches[r.key] = append(w.watches[r.key], r.ch)
330
_, alive := w.beingSeq[r.key]
331
w.pending = append(w.pending, event{r.ch, r.key, alive})
333
watches := w.watches[r.key]
334
for i, ch := range watches {
336
watches[i] = watches[len(watches)-1]
337
w.watches[r.key] = watches[:len(watches)-1]
341
for i := range w.pending {
343
if e.key == r.key && e.ch == r.ch {
348
_, alive := w.beingSeq[r.key]
351
panic(fmt.Errorf("unknown request: %T", req))
355
type beingInfo struct {
356
DocID string `bson:"_id"`
357
Seq int64 `bson:"seq,omitempty"`
358
ModelUUID string `bson:"model-uuid,omitempty"`
359
Key string `bson:"key,omitempty"`
362
type pingInfo struct {
363
DocID string `bson:"_id"`
364
Slot int64 `bson:"slot,omitempty"`
365
Alive map[string]int64 `bson:",omitempty"`
366
Dead map[string]int64 `bson:",omitempty"`
369
func (w *Watcher) findAllBeings() (map[int64]beingInfo, error) {
370
beings := make([]beingInfo, 0)
371
session := w.beings.Database.Session.Copy()
372
defer session.Close()
373
beingsC := w.beings.With(session)
375
err := beingsC.Find(bson.D{{"model-uuid", w.modelUUID}}).All(&beings)
379
beingInfos := make(map[int64]beingInfo, len(beings))
380
for _, being := range beings {
381
beingInfos[being.Seq] = being
383
return beingInfos, nil
386
// sync updates the watcher knowledge from the database, and
387
// queues events to observing channels. It fetches the last two time
388
// slots and compares the union of both to the in-memory state.
389
func (w *Watcher) sync() error {
390
var allBeings map[int64]beingInfo
391
if len(w.beingKey) == 0 {
392
// The very first time we sync, we grab all ever-known beings,
393
// so we don't have to look them up one-by-one
395
if allBeings, err = w.findAllBeings(); err != nil {
396
return errors.Trace(err)
399
// TODO(perrito666) 2016-05-02 lp:1558657
400
s := timeSlot(time.Now(), w.delta)
401
slot := docIDInt64(w.modelUUID, s)
402
previousSlot := docIDInt64(w.modelUUID, s-period)
403
session := w.pings.Database.Session.Copy()
404
defer session.Close()
405
pings := w.pings.With(session)
407
q := bson.D{{"$or", []pingInfo{{DocID: slot}, {DocID: previousSlot}}}}
408
err := pings.Find(q).All(&ping)
409
if err != nil && err != mgo.ErrNotFound {
410
return errors.Trace(err)
413
// Learn about all enforced deaths.
414
// TODO(ericsnow) Remove this once KillForTesting() goes away.
415
dead := make(map[int64]bool)
416
for i := range ping {
417
for key, value := range ping[i].Dead {
418
k, err := strconv.ParseInt(key, 16, 64)
420
err = errors.Annotatef(err, "presence cannot parse dead key: %q", key)
424
for i := int64(0); i < 63 && value > 0; i++ {
432
logger.Tracef("found seq=%d dead", seq)
437
// Learn about all the pingers that reported and queue
438
// events for those that weren't known to be alive and
439
// are not reportedly dead either.
440
beingsC := w.beings.With(session)
441
alive := make(map[int64]bool)
443
for i := range ping {
444
for key, value := range ping[i].Alive {
445
k, err := strconv.ParseInt(key, 16, 64)
447
err = errors.Annotatef(err, "presence cannot parse alive key: %q", key)
451
for i := int64(0); i < 63 && value > 0; i++ {
459
if _, ok := w.beingKey[seq]; ok {
462
// Check if the being exists in the 'all' map,
463
// otherwise do a single lookup in mongo
465
if being, ok = allBeings[seq]; !ok {
466
err := beingsC.Find(bson.D{{"_id", docIDInt64(w.modelUUID, seq)}}).One(&being)
467
if err == mgo.ErrNotFound {
468
logger.Tracef("found seq=%d unowned", seq)
472
return errors.Trace(err)
475
cur := w.beingSeq[being.Key]
477
delete(w.beingKey, cur)
479
// Current sequence is more recent.
482
w.beingKey[seq] = being.Key
483
w.beingSeq[being.Key] = seq
484
if cur > 0 || dead[seq] {
487
logger.Tracef("found seq=%d alive with key %q", seq, being.Key)
488
for _, ch := range w.watches[being.Key] {
489
w.pending = append(w.pending, event{ch, being.Key, true})
495
// Pingers that were known to be alive and haven't reported
496
// in the last two slots are now considered dead. Dispatch
497
// the respective events and forget their sequences.
498
for seq, key := range w.beingKey {
499
if dead[seq] || !alive[seq] {
500
delete(w.beingKey, seq)
501
delete(w.beingSeq, key)
502
for _, ch := range w.watches[key] {
503
w.pending = append(w.pending, event{ch, key, false})
510
// Pinger periodically reports that a specific key is alive, so that
511
// watchers interested on that fact can react appropriately.
517
pings *mgo.Collection
521
fieldKey string // hex(beingKey / 63)
522
fieldBit uint64 // 1 << (beingKey%63)
527
// NewPinger returns a new Pinger to report that key is alive.
528
// It starts reporting after Start is called.
529
func NewPinger(base *mgo.Collection, modelTag names.ModelTag, key string) *Pinger {
534
modelUUID: modelTag.Id(),
538
// Start starts periodically reporting that p's key is alive.
539
func (p *Pinger) Start() error {
543
return errors.Errorf("pinger already started")
546
if err := p.prepare(); err != nil {
547
return errors.Trace(err)
549
logger.Tracef("starting pinger for %q with seq=%d", p.beingKey, p.beingSeq)
550
if err := p.ping(); err != nil {
551
return errors.Trace(err)
556
cause := errors.Cause(err)
557
// tomb expects ErrDying or ErrStillAlive as
558
// exact values, so we need to log and unwrap
560
if err != nil && cause != tomb.ErrDying {
561
logger.Infof("pinger loop failed: %v", err)
569
// Kill is part of the worker.Worker interface.
570
func (p *Pinger) Kill() {
574
// Wait returns when the Pinger has stopped, and returns the first error
576
func (p *Pinger) Wait() error {
580
// Stop stops p's periodical ping.
581
// Watchers will not notice p has stopped pinging until the
582
// previous ping times out.
583
func (p *Pinger) Stop() error {
587
logger.Tracef("stopping pinger for %q with seq=%d", p.beingKey, p.beingSeq)
591
// TODO ping one more time to guarantee a late timeout.
593
return errors.Trace(err)
597
// KillForTesting stops p's periodical ping and immediately reports that it is dead.
598
// TODO(ericsnow) We should be able to drop this and the two kill* methods.
599
func (p *Pinger) KillForTesting() error {
603
logger.Tracef("killing pinger for %q (was started)", p.beingKey)
604
return p.killStarted()
606
logger.Tracef("killing pinger for %q (was stopped)", p.beingKey)
607
return p.killStopped()
610
// killStarted kills the pinger while it is running, by first
611
// stopping it and then recording in the last pinged slot that
612
// the pinger was killed.
613
func (p *Pinger) killStarted() error {
615
killErr := p.tomb.Wait()
620
{"$set", bson.D{{"slot", slot}}},
621
{"$inc", bson.D{{"dead." + p.fieldKey, p.fieldBit}}}}
622
session := p.pings.Database.Session.Copy()
623
defer session.Close()
624
pings := p.pings.With(session)
625
if _, err := pings.UpsertId(docIDInt64(p.modelUUID, slot), udoc); err != nil {
626
return errors.Trace(err)
628
return errors.Trace(killErr)
631
// killStopped kills the pinger while it is not running, by
632
// first allocating a new sequence, and then atomically recording
633
// the new sequence both as alive and dead at once.
634
func (p *Pinger) killStopped() error {
635
if err := p.prepare(); err != nil {
638
// TODO(perrito666) 2016-05-02 lp:1558657
639
slot := timeSlot(time.Now(), p.delta)
641
{"$set", bson.D{{"slot", slot}}},
643
{"dead." + p.fieldKey, p.fieldBit},
644
{"alive." + p.fieldKey, p.fieldBit},
646
session := p.pings.Database.Session.Copy()
647
defer session.Close()
648
pings := p.pings.With(session)
649
_, err := pings.UpsertId(docIDInt64(p.modelUUID, slot), udoc)
650
return errors.Trace(err)
653
// loop is the main pinger loop that runs while it is
655
func (p *Pinger) loop() error {
658
case <-p.tomb.Dying():
659
return errors.Trace(tomb.ErrDying)
660
case <-time.After(time.Duration(float64(period+1)*0.75) * time.Second):
661
if err := p.ping(); err != nil {
662
return errors.Trace(err)
668
// prepare allocates a new unique sequence for the
669
// pinger key and prepares the pinger to use it.
670
func (p *Pinger) prepare() error {
671
change := mgo.Change{
672
Update: bson.D{{"$inc", bson.D{{"seq", int64(1)}}}},
676
session := p.base.Database.Session.Copy()
677
defer session.Close()
678
base := p.base.With(session)
680
var seq struct{ Seq int64 }
681
seqID := docIDStr(p.modelUUID, "beings")
682
if _, err := seqs.FindId(seqID).Apply(change, &seq); err != nil {
683
return errors.Trace(err)
686
p.fieldKey = fmt.Sprintf("%x", p.beingSeq/63)
687
p.fieldBit = 1 << uint64(p.beingSeq%63)
689
beings := beingsC(base)
690
return errors.Trace(beings.Insert(
692
DocID: docIDInt64(p.modelUUID, p.beingSeq),
694
ModelUUID: p.modelUUID,
700
// ping records updates the current time slot with the
701
// sequence in use by the pinger.
702
func (p *Pinger) ping() (err error) {
703
logger.Tracef("pinging %q with seq=%d", p.beingKey, p.beingSeq)
705
// If the session is killed from underneath us, it panics when we
706
// try to copy it, so deal with that here.
707
if v := recover(); v != nil {
708
err = fmt.Errorf("%v", v)
711
session := p.pings.Database.Session.Copy()
712
defer session.Close()
714
base := p.base.With(session)
715
delta, err := clockDelta(base)
717
return errors.Trace(err)
721
// TODO(perrito666) 2016-05-02 lp:1558657
722
slot := timeSlot(time.Now(), p.delta)
723
if slot == p.lastSlot {
724
// Never, ever, ping the same slot twice.
725
// The increment below would corrupt the slot.
729
pings := p.pings.With(session)
730
_, err = pings.UpsertId(
731
docIDInt64(p.modelUUID, slot),
733
{"$set", bson.D{{"slot", slot}}},
734
{"$inc", bson.D{{"alive." + p.fieldKey, p.fieldBit}}},
736
return errors.Trace(err)
739
// clockDelta returns the approximate skew between
740
// the local clock and the database clock.
741
func clockDelta(c *mgo.Collection) (time.Duration, error) {
743
time.Time `bson:"retval"`
745
var isMaster struct {
746
LocalTime time.Time `bson:"localTime"`
750
var serverDelay time.Duration
751
supportsMasterLocalTime := true
752
session := c.Database.Session.Copy()
753
defer session.Close()
754
db := c.Database.With(session)
755
for i := 0; i < 10; i++ {
756
if supportsMasterLocalTime {
757
// Try isMaster.localTime, which is present since MongoDB 2.2
758
// and does not require admin privileges.
759
// TODO(perrito666) 2016-05-02 lp:1558657
761
err := db.Run("isMaster", &isMaster)
762
// TODO(perrito666) 2016-05-02 lp:1558657
765
return 0, errors.Trace(err)
767
if isMaster.LocalTime.IsZero() {
768
supportsMasterLocalTime = false
771
serverDelay = isMaster.LocalTime.Sub(before)
774
// If MongoDB doesn't have localTime as part of
775
// isMaster result, it means that the server is likely
776
// a MongoDB older than 2.2.
778
// Fallback to 'eval' works fine on versions older than
779
// 2.4 where it does not require admin privileges.
781
// NOTE: 'eval' takes a global write lock unless you
782
// specify 'nolock' (which we are not doing below, for
783
// no apparent reason), so it is quite likely that the
784
// eval could take a relatively long time to acquire
785
// the lock and thus cause a retry on the callDelay
786
// check below on a busy server.
787
// TODO(perrito666) 2016-05-02 lp:1558657
789
err := db.Run(bson.D{{"$eval", "function() { return new Date(); }"}}, &server)
790
// TODO(perrito666) 2016-05-02 lp:1558657
793
return 0, errors.Trace(err)
795
serverDelay = server.Sub(before)
797
// If the call to the server takes longer than a few seconds we
798
// retry it a couple more times before giving up. It is unclear
799
// why the retry would help at all here.
801
// If the server takes longer than the specified amount of time
802
// on every single try, then we simply give up.
803
callDelay := after.Sub(before)
804
if callDelay > 5*time.Second {
807
return serverDelay, nil
809
return 0, errors.Errorf("cannot synchronize clock with database server")
812
// timeSlot returns the current time slot, in seconds since the
813
// epoch, for the provided now time. The delta skew is applied
814
// to the now time to improve the synchronization with a
815
// centrally agreed time.
817
// The result of this method may be manipulated for test purposes
818
// by fakeTimeSlot and realTimeSlot.
819
func timeSlot(now time.Time, delta time.Duration) int64 {
821
fake := !fakeNow.IsZero()
825
slot := now.Add(delta).Unix()
826
slot -= slot % period
828
slot += int64(fakeOffset) * period
835
fakeMutex sync.Mutex // protects fakeOffset, fakeNow
840
// fakeTimeSlot hardcodes the slot time returned by the timeSlot
841
// function for testing purposes. The offset parameter is the slot
842
// position to return: offsets +1 and -1 are +period and -period
843
// seconds from slot 0, respectively.
844
func fakeTimeSlot(offset int) {
846
if fakeNow.IsZero() {
851
logger.Infof("faking presence to time slot %d", offset)
854
// realTimeSlot disables the hardcoding introduced by fakeTimeSlot.
855
func realTimeSlot() {
857
fakeNow = time.Time{}
860
logger.Infof("not faking presence time. Real time slot in use.")
863
func seqsC(base *mgo.Collection) *mgo.Collection {
864
return base.Database.C(base.Name + ".seqs")
867
func beingsC(base *mgo.Collection) *mgo.Collection {
868
return base.Database.C(base.Name + ".beings")
871
func pingsC(base *mgo.Collection) *mgo.Collection {
872
return base.Database.C(base.Name + ".pings")