~juju-qa/ubuntu/xenial/juju/xenial-2.0-beta3

« back to all changes in this revision

Viewing changes to src/github.com/juju/juju/state/watcher.go

  • Committer: Martin Packman
  • Date: 2016-03-30 19:31:08 UTC
  • mfrom: (1.1.41)
  • Revision ID: martin.packman@canonical.com-20160330193108-h9iz3ak334uk0z5r
Merge new upstream source 2.0~beta3

Show diffs side-by-side

added added

removed removed

Lines of Context:
191
191
// WatchIPAddresses returns a StringsWatcher that notifies of changes to the
192
192
// lifecycles of IP addresses.
193
193
func (st *State) WatchIPAddresses() StringsWatcher {
194
 
        return newLifecycleWatcher(st, ipaddressesC, nil, nil, nil)
 
194
        return newLifecycleWatcher(st, legacyipaddressesC, nil, nil, nil)
195
195
}
196
196
 
197
197
// WatchModelVolumes returns a StringsWatcher that notifies of changes to
1502
1502
                        u.st.docID(u.globalMeterStatusKey()),
1503
1503
                }, {
1504
1504
                        metricsManagerC,
1505
 
                        u.st.docID(metricsManagerKey),
 
1505
                        metricsManagerKey,
1506
1506
                },
1507
1507
        })
1508
1508
}
2297
2297
        }
2298
2298
}
2299
2299
 
2300
 
// watchEnqueuedActions starts and returns a StringsWatcher that
2301
 
// notifies on new Actions being enqueued.
2302
 
func (st *State) watchEnqueuedActions() StringsWatcher {
2303
 
        return newcollectionWatcher(st, colWCfg{
2304
 
                col:    actionNotificationsC,
2305
 
                filter: makeIdFilter(st, actionMarker),
2306
 
                idconv: actionNotificationIdToActionId,
2307
 
        })
2308
 
}
2309
 
 
2310
2300
// watchEnqueuedActionsFilteredBy starts and returns a StringsWatcher
2311
2301
// that notifies on new Actions being enqueued on the ActionRecevers
2312
2302
// being watched.
2646
2636
                }
2647
2637
        }
2648
2638
}
 
2639
 
 
2640
// WatchForModelMigration returns a notify watcher which reports when
 
2641
// a migration is in progress for the model associated with the
 
2642
// State. Only in-progress and newly created migrations are reported.
 
2643
func (st *State) WatchForModelMigration() (NotifyWatcher, error) {
 
2644
        return newMigrationActiveWatcher(st), nil
 
2645
}
 
2646
 
 
2647
type migrationActiveWatcher struct {
 
2648
        commonWatcher
 
2649
        collName string
 
2650
        sink     chan struct{}
 
2651
}
 
2652
 
 
2653
func newMigrationActiveWatcher(st *State) NotifyWatcher {
 
2654
        w := &migrationActiveWatcher{
 
2655
                commonWatcher: commonWatcher{st: st},
 
2656
                collName:      modelMigrationsActiveC,
 
2657
                sink:          make(chan struct{}),
 
2658
        }
 
2659
        go func() {
 
2660
                defer w.tomb.Done()
 
2661
                defer close(w.sink)
 
2662
                w.tomb.Kill(w.loop())
 
2663
        }()
 
2664
        return w
 
2665
}
 
2666
 
 
2667
// Changes returns the event channel for this watcher.
 
2668
func (w *migrationActiveWatcher) Changes() <-chan struct{} {
 
2669
        return w.sink
 
2670
}
 
2671
 
 
2672
func (w *migrationActiveWatcher) loop() error {
 
2673
        in := make(chan watcher.Change)
 
2674
        filter := func(id interface{}) bool {
 
2675
                // Only report migrations for the requested model.
 
2676
                if id, ok := id.(string); ok {
 
2677
                        return id == w.st.ModelUUID()
 
2678
                }
 
2679
                return false
 
2680
        }
 
2681
        w.st.watcher.WatchCollectionWithFilter(w.collName, in, filter)
 
2682
        defer w.st.watcher.UnwatchCollection(w.collName, in)
 
2683
 
 
2684
        var out chan<- struct{}
 
2685
 
 
2686
        // Check if a migration is already in progress and if so, report it immediately.
 
2687
        if active, err := w.st.IsModelMigrationActive(); err != nil {
 
2688
                return errors.Trace(err)
 
2689
        } else if active {
 
2690
                out = w.sink
 
2691
        }
 
2692
 
 
2693
        for {
 
2694
                select {
 
2695
                case <-w.tomb.Dying():
 
2696
                        return tomb.ErrDying
 
2697
                case <-w.st.watcher.Dead():
 
2698
                        return stateWatcherDeadError(w.st.watcher.Err())
 
2699
                case change := <-in:
 
2700
                        // Ignore removals from the collection.
 
2701
                        if change.Revno == -1 {
 
2702
                                continue
 
2703
                        }
 
2704
 
 
2705
                        if _, ok := collect(change, in, w.tomb.Dying()); !ok {
 
2706
                                return tomb.ErrDying
 
2707
                        }
 
2708
                        out = w.sink
 
2709
                case out <- struct{}{}:
 
2710
                        out = nil
 
2711
                }
 
2712
        }
 
2713
}
 
2714
 
 
2715
// WatchMigrationStatus returns a NotifyWatcher which triggers
 
2716
// whenever the status of latest migration for the State's model
 
2717
// changes. One instance can be used across migrations. The watcher
 
2718
// will report changes when one migration finishes and another one
 
2719
// begins.
 
2720
//
 
2721
// Note that this watcher does not produce an initial event if there's
 
2722
// never been a migration attempt for the model.
 
2723
func (st *State) WatchMigrationStatus() (NotifyWatcher, error) {
 
2724
        return newMigrationStatusWatcher(st), nil
 
2725
}
 
2726
 
 
2727
type migrationStatusWatcher struct {
 
2728
        commonWatcher
 
2729
        collName string
 
2730
        sink     chan struct{}
 
2731
}
 
2732
 
 
2733
func newMigrationStatusWatcher(st *State) NotifyWatcher {
 
2734
        w := &migrationStatusWatcher{
 
2735
                commonWatcher: commonWatcher{st: st},
 
2736
                collName:      modelMigrationStatusC,
 
2737
                sink:          make(chan struct{}),
 
2738
        }
 
2739
        go func() {
 
2740
                defer w.tomb.Done()
 
2741
                defer close(w.sink)
 
2742
                w.tomb.Kill(w.loop())
 
2743
        }()
 
2744
        return w
 
2745
}
 
2746
 
 
2747
// Changes returns the event channel for this watcher.
 
2748
func (w *migrationStatusWatcher) Changes() <-chan struct{} {
 
2749
        return w.sink
 
2750
}
 
2751
 
 
2752
func (w *migrationStatusWatcher) loop() error {
 
2753
        in := make(chan watcher.Change)
 
2754
 
 
2755
        // Watch the entire modelMigrationStatusC collection for migration
 
2756
        // status updates related to the State's model. This is more
 
2757
        // efficient and simpler than tracking the current active
 
2758
        // migration (and changing watchers when one migration finishes
 
2759
        // and another starts.
 
2760
        //
 
2761
        // This approach is safe because there are strong guarantees that
 
2762
        // there will only be one active migration per model. The watcher
 
2763
        // will only see changes for one migration status document at a
 
2764
        // time for the model.
 
2765
        filter := func(id interface{}) bool {
 
2766
                _, err := w.st.strictLocalID(id.(string))
 
2767
                return err == nil
 
2768
        }
 
2769
        w.st.watcher.WatchCollectionWithFilter(w.collName, in, filter)
 
2770
        defer w.st.watcher.UnwatchCollection(w.collName, in)
 
2771
 
 
2772
        var out chan<- struct{}
 
2773
 
 
2774
        // If there is a migration record for the model - active or not -
 
2775
        // send an initial event.
 
2776
        if _, err := w.st.GetModelMigration(); errors.IsNotFound(err) {
 
2777
                // Nothing to report.
 
2778
        } else if err != nil {
 
2779
                return errors.Trace(err)
 
2780
        } else {
 
2781
                out = w.sink
 
2782
        }
 
2783
 
 
2784
        for {
 
2785
                select {
 
2786
                case <-w.tomb.Dying():
 
2787
                        return tomb.ErrDying
 
2788
                case <-w.st.watcher.Dead():
 
2789
                        return stateWatcherDeadError(w.st.watcher.Err())
 
2790
                case change := <-in:
 
2791
                        if change.Revno == -1 {
 
2792
                                return errors.New("model migration status disappeared (shouldn't happen)")
 
2793
                        }
 
2794
                        if _, ok := collect(change, in, w.tomb.Dying()); !ok {
 
2795
                                return tomb.ErrDying
 
2796
                        }
 
2797
                        out = w.sink
 
2798
                case out <- struct{}{}:
 
2799
                        out = nil
 
2800
                }
 
2801
        }
 
2802
}