~ubuntu-branches/ubuntu/raring/eucalyptus/raring

« back to all changes in this revision

Viewing changes to clc/modules/cluster-manager/src/main/java/com/eucalyptus/util/async/StatefulMessageSet.java

  • Committer: Package Import Robot
  • Author(s): Brian Thomason
  • Date: 2011-11-29 13:17:52 UTC
  • mfrom: (1.2.1 upstream)
  • mto: This revision was merged to the branch mainline in revision 185.
  • Revision ID: package-import@ubuntu.com-20111129131752-rq31al3ntutv2vvl
Tags: upstream-3.0.999beta1
ImportĀ upstreamĀ versionĀ 3.0.999beta1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
package com.eucalyptus.util.async;
 
2
 
 
3
import java.util.concurrent.ConcurrentLinkedQueue;
 
4
import java.util.concurrent.TimeUnit;
 
5
import java.util.concurrent.TimeoutException;
 
6
import org.apache.log4j.Logger;
 
7
import com.eucalyptus.cluster.Cluster;
 
8
import com.eucalyptus.cluster.Clusters;
 
9
import com.eucalyptus.records.EventRecord;
 
10
import com.eucalyptus.records.EventType;
 
11
import com.google.common.base.Function;
 
12
import com.google.common.collect.ArrayListMultimap;
 
13
import com.google.common.collect.Lists;
 
14
import com.google.common.collect.Multimap;
 
15
 
 
16
public class StatefulMessageSet<E extends Enum<E>> {
 
17
  private static Logger                        LOG           = Logger.getLogger( StatefulMessageSet.class );
 
18
  private final Multimap<E, Request>           messages      = ArrayListMultimap.create( );
 
19
  private final ConcurrentLinkedQueue<Request> pendingEvents = new ConcurrentLinkedQueue<Request>( );
 
20
  private final E[]                            states;
 
21
  private E                                    state;
 
22
  private final E                              endState;
 
23
  private final E                              failState;
 
24
  private final Cluster                        cluster;
 
25
  private final Long                           startTime;
 
26
  
 
27
  /**
 
28
   * Collection of messages which need to honor a certain ordering. The state
 
29
   * array is a list of enum values where:
 
30
   * <ul>
 
31
   * <li>Index 0: is the start state</li>
 
32
   * <li>Index length-2: is the end state</li>
 
33
   * <li>Index length-1: is the rollback state</li>
 
34
   * </ul>
 
35
   * A transition will increase the currentState's ordinal by 1, drain the
 
36
   * messages to a pending queue and wait for all messages to be serviced before
 
37
   * proceeding.
 
38
   * 
 
39
   * @param cluster
 
40
   * @param states
 
41
   */
 
42
  public StatefulMessageSet( final Cluster cluster, final E[] states ) {
 
43
    this.cluster = cluster;
 
44
    this.states = states;
 
45
    this.state = states[0];
 
46
    this.endState = states[states.length - 2];
 
47
    this.failState = states[states.length - 1];
 
48
    this.startTime = System.currentTimeMillis( );
 
49
  }
 
50
  
 
51
  private E rollback( ) {
 
52
    return ( this.state = this.failState );
 
53
  }
 
54
  
 
55
  public void addRequest( final E state, final Request asyncRequest ) {
 
56
    EventRecord.caller( StatefulMessageSet.class, EventType.VM_PREPARE, state.name( ), asyncRequest.getCallback( ).getClass( ).getSimpleName( ) ).debug( );
 
57
    this.messages.put( state, asyncRequest );
 
58
  }
 
59
  
 
60
  @SuppressWarnings( "unchecked" )
 
61
  private void queueEvents( final E state ) {
 
62
    for ( final Request event : this.messages.get( state ) ) {
 
63
      try {
 
64
        EventRecord.caller( StatefulMessageSet.class, EventType.VM_STARTING, state.name( ), event.getCallback( ).toString( ) ).debug( );
 
65
        if ( event.getCallback( ) instanceof BroadcastCallback ) {
 
66
          final BroadcastCallback callback = ( BroadcastCallback ) event.getCallback( );
 
67
          this.pendingEvents.addAll( Lists.transform( Clusters.getInstance( ).listValues( ), new Function<Cluster, Request>( ) {
 
68
            @Override
 
69
            public Request apply( final Cluster c ) {
 
70
              LOG.debug( "VM_STARTING: " + state.name( ) + " " + c.getName( ) + " " + event.getClass( ).getSimpleName( ) + " " + event.getCallback( ) );
 
71
              final Request request = AsyncRequests.newRequest( callback.newInstance( ) );
 
72
              request.getRequest( ).regardingUserRequest( callback.getRequest( ) );
 
73
              request.dispatch( c.getConfiguration( ) );
 
74
              return request;
 
75
            }
 
76
          } ) );
 
77
        } else {
 
78
          LOG.debug( "VM_STARTING: " + state.name( ) + " " + this.cluster.getName( ) + " " + event.getClass( ).getSimpleName( ) + " " + event.getCallback( ) );
 
79
          event.dispatch( this.cluster.getConfiguration( ) );
 
80
          this.pendingEvents.add( event );
 
81
        }
 
82
      } catch ( Exception ex ) {
 
83
        LOG.error( ex, ex );
 
84
      }
 
85
    }
 
86
  }
 
87
  
 
88
  private E transition( final E currentState ) {
 
89
    Request request = null;
 
90
    E nextState = this.states[currentState.ordinal( ) + 1];
 
91
    while ( ( request = this.pendingEvents.poll( ) ) != null ) {
 
92
      try {
 
93
        try {
 
94
          Object o = request.getResponse( ).get( 240, TimeUnit.SECONDS );
 
95
          if ( o != null ) {
 
96
            EventRecord.here( StatefulMessageSet.class, EventType.VM_STARTING, currentState.name( ), this.cluster.getName( ), o.getClass( ).getSimpleName( ) ).info( );
 
97
            EventRecord.here( StatefulMessageSet.class, EventType.VM_STARTING, currentState.name( ), this.cluster.getName( ), o.toString( ) ).debug( );
 
98
          }
 
99
        } catch ( TimeoutException ex1 ) {
 
100
          request.getCallback( ).fireException( ex1 );
 
101
        }
 
102
      } catch ( final InterruptedException t ) {
 
103
        Thread.currentThread( ).interrupt( );
 
104
        EventRecord.here(
 
105
          StatefulMessageSet.class,
 
106
          EventType.VM_STARTING,
 
107
          "FAILED",
 
108
          currentState.name( ),
 
109
          this.cluster.getName( ),
 
110
          t.getClass( ).getSimpleName( ) ).info( );
 
111
        LOG.error( t, t );
 
112
        nextState = this.rollback( );
 
113
        break;
 
114
      } catch ( final Exception t ) {
 
115
        EventRecord.here(
 
116
          StatefulMessageSet.class,
 
117
          EventType.VM_STARTING,
 
118
          "FAILED",
 
119
          currentState.name( ),
 
120
          this.cluster.getName( ),
 
121
          t.getClass( ).getSimpleName( ) ).info( );
 
122
        LOG.error( t, t );
 
123
        nextState = this.rollback( );
 
124
        break;
 
125
      }
 
126
    }
 
127
    EventRecord.here( StatefulMessageSet.class, EventType.VM_STARTING, currentState.name( ), EventType.TRANSITION.name( ), nextState.name( ) ).info( );
 
128
    return nextState;
 
129
  }
 
130
  
 
131
  private boolean isSuccessful( ) {
 
132
    return this.state.equals( this.endState );
 
133
  }
 
134
  
 
135
  private boolean isFinished( ) {
 
136
    return this.state.equals( this.failState ) || this.state.equals( this.endState );
 
137
  }
 
138
  
 
139
  public void run( ) {
 
140
    do {
 
141
      LOG.info( EventRecord.here( StatefulMessageSet.class, EventType.VM_STARTING, this.state.name( ), ( System.currentTimeMillis( ) - this.startTime )
 
142
                                                                                                       / 1000.0d
 
143
                                                                                                       + "s" ) );
 
144
      try {
 
145
        this.queueEvents( this.state );
 
146
        this.state = this.transition( this.state );
 
147
      } catch ( final Exception ex ) {
 
148
        LOG.error( ex, ex );
 
149
      }
 
150
    } while ( !this.isFinished( ) );
 
151
    LOG.info( EventRecord.here( StatefulMessageSet.class, this.isSuccessful( )
 
152
                                                                              ? EventType.VM_START_COMPLETED
 
153
                                                                              : EventType.VM_START_ABORTED,
 
154
                                ( System.currentTimeMillis( ) - this.startTime ) / 1000.0d + "s" ) );
 
155
  }
 
156
}