2640
// WatchForModelMigration returns a notify watcher which reports when
2641
// a migration is in progress for the model associated with the
2642
// State. Only in-progress and newly created migrations are reported.
2643
func (st *State) WatchForModelMigration() (NotifyWatcher, error) {
2644
return newMigrationActiveWatcher(st), nil
2647
type migrationActiveWatcher struct {
2653
func newMigrationActiveWatcher(st *State) NotifyWatcher {
2654
w := &migrationActiveWatcher{
2655
commonWatcher: commonWatcher{st: st},
2656
collName: modelMigrationsActiveC,
2657
sink: make(chan struct{}),
2662
w.tomb.Kill(w.loop())
2667
// Changes returns the event channel for this watcher.
2668
func (w *migrationActiveWatcher) Changes() <-chan struct{} {
2672
func (w *migrationActiveWatcher) loop() error {
2673
in := make(chan watcher.Change)
2674
filter := func(id interface{}) bool {
2675
// Only report migrations for the requested model.
2676
if id, ok := id.(string); ok {
2677
return id == w.st.ModelUUID()
2681
w.st.watcher.WatchCollectionWithFilter(w.collName, in, filter)
2682
defer w.st.watcher.UnwatchCollection(w.collName, in)
2684
var out chan<- struct{}
2686
// Check if a migration is already in progress and if so, report it immediately.
2687
if active, err := w.st.IsModelMigrationActive(); err != nil {
2688
return errors.Trace(err)
2695
case <-w.tomb.Dying():
2696
return tomb.ErrDying
2697
case <-w.st.watcher.Dead():
2698
return stateWatcherDeadError(w.st.watcher.Err())
2699
case change := <-in:
2700
// Ignore removals from the collection.
2701
if change.Revno == -1 {
2705
if _, ok := collect(change, in, w.tomb.Dying()); !ok {
2706
return tomb.ErrDying
2709
case out <- struct{}{}:
2715
// WatchMigrationStatus returns a NotifyWatcher which triggers
2716
// whenever the status of latest migration for the State's model
2717
// changes. One instance can be used across migrations. The watcher
2718
// will report changes when one migration finishes and another one
2721
// Note that this watcher does not produce an initial event if there's
2722
// never been a migration attempt for the model.
2723
func (st *State) WatchMigrationStatus() (NotifyWatcher, error) {
2724
return newMigrationStatusWatcher(st), nil
2727
type migrationStatusWatcher struct {
2733
func newMigrationStatusWatcher(st *State) NotifyWatcher {
2734
w := &migrationStatusWatcher{
2735
commonWatcher: commonWatcher{st: st},
2736
collName: modelMigrationStatusC,
2737
sink: make(chan struct{}),
2742
w.tomb.Kill(w.loop())
2747
// Changes returns the event channel for this watcher.
2748
func (w *migrationStatusWatcher) Changes() <-chan struct{} {
2752
func (w *migrationStatusWatcher) loop() error {
2753
in := make(chan watcher.Change)
2755
// Watch the entire modelMigrationStatusC collection for migration
2756
// status updates related to the State's model. This is more
2757
// efficient and simpler than tracking the current active
2758
// migration (and changing watchers when one migration finishes
2759
// and another starts.
2761
// This approach is safe because there are strong guarantees that
2762
// there will only be one active migration per model. The watcher
2763
// will only see changes for one migration status document at a
2764
// time for the model.
2765
filter := func(id interface{}) bool {
2766
_, err := w.st.strictLocalID(id.(string))
2769
w.st.watcher.WatchCollectionWithFilter(w.collName, in, filter)
2770
defer w.st.watcher.UnwatchCollection(w.collName, in)
2772
var out chan<- struct{}
2774
// If there is a migration record for the model - active or not -
2775
// send an initial event.
2776
if _, err := w.st.GetModelMigration(); errors.IsNotFound(err) {
2777
// Nothing to report.
2778
} else if err != nil {
2779
return errors.Trace(err)
2786
case <-w.tomb.Dying():
2787
return tomb.ErrDying
2788
case <-w.st.watcher.Dead():
2789
return stateWatcherDeadError(w.st.watcher.Err())
2790
case change := <-in:
2791
if change.Revno == -1 {
2792
return errors.New("model migration status disappeared (shouldn't happen)")
2794
if _, ok := collect(change, in, w.tomb.Dying()); !ok {
2795
return tomb.ErrDying
2798
case out <- struct{}{}: