1
package com.eucalyptus.util.async;
3
import java.util.concurrent.ConcurrentLinkedQueue;
4
import java.util.concurrent.TimeUnit;
5
import java.util.concurrent.TimeoutException;
6
import org.apache.log4j.Logger;
7
import com.eucalyptus.cluster.Cluster;
8
import com.eucalyptus.cluster.Clusters;
9
import com.eucalyptus.records.EventRecord;
10
import com.eucalyptus.records.EventType;
11
import com.google.common.base.Function;
12
import com.google.common.collect.ArrayListMultimap;
13
import com.google.common.collect.Lists;
14
import com.google.common.collect.Multimap;
16
public class StatefulMessageSet<E extends Enum<E>> {
17
private static Logger LOG = Logger.getLogger( StatefulMessageSet.class );
18
private final Multimap<E, Request> messages = ArrayListMultimap.create( );
19
private final ConcurrentLinkedQueue<Request> pendingEvents = new ConcurrentLinkedQueue<Request>( );
20
private final E[] states;
22
private final E endState;
23
private final E failState;
24
private final Cluster cluster;
25
private final Long startTime;
28
* Collection of messages which need to honor a certain ordering. The state
29
* array is a list of enum values where:
31
* <li>Index 0: is the start state</li>
32
* <li>Index length-2: is the end state</li>
33
* <li>Index length-1: is the rollback state</li>
35
* A transition will increase the currentState's ordinal by 1, drain the
36
* messages to a pending queue and wait for all messages to be serviced before
42
public StatefulMessageSet( final Cluster cluster, final E[] states ) {
43
this.cluster = cluster;
45
this.state = states[0];
46
this.endState = states[states.length - 2];
47
this.failState = states[states.length - 1];
48
this.startTime = System.currentTimeMillis( );
51
private E rollback( ) {
52
return ( this.state = this.failState );
55
public void addRequest( final E state, final Request asyncRequest ) {
56
EventRecord.caller( StatefulMessageSet.class, EventType.VM_PREPARE, state.name( ), asyncRequest.getCallback( ).getClass( ).getSimpleName( ) ).debug( );
57
this.messages.put( state, asyncRequest );
60
@SuppressWarnings( "unchecked" )
61
private void queueEvents( final E state ) {
62
for ( final Request event : this.messages.get( state ) ) {
64
EventRecord.caller( StatefulMessageSet.class, EventType.VM_STARTING, state.name( ), event.getCallback( ).toString( ) ).debug( );
65
if ( event.getCallback( ) instanceof BroadcastCallback ) {
66
final BroadcastCallback callback = ( BroadcastCallback ) event.getCallback( );
67
this.pendingEvents.addAll( Lists.transform( Clusters.getInstance( ).listValues( ), new Function<Cluster, Request>( ) {
69
public Request apply( final Cluster c ) {
70
LOG.debug( "VM_STARTING: " + state.name( ) + " " + c.getName( ) + " " + event.getClass( ).getSimpleName( ) + " " + event.getCallback( ) );
71
final Request request = AsyncRequests.newRequest( callback.newInstance( ) );
72
request.getRequest( ).regardingUserRequest( callback.getRequest( ) );
73
request.dispatch( c.getConfiguration( ) );
78
LOG.debug( "VM_STARTING: " + state.name( ) + " " + this.cluster.getName( ) + " " + event.getClass( ).getSimpleName( ) + " " + event.getCallback( ) );
79
event.dispatch( this.cluster.getConfiguration( ) );
80
this.pendingEvents.add( event );
82
} catch ( Exception ex ) {
88
private E transition( final E currentState ) {
89
Request request = null;
90
E nextState = this.states[currentState.ordinal( ) + 1];
91
while ( ( request = this.pendingEvents.poll( ) ) != null ) {
94
Object o = request.getResponse( ).get( 240, TimeUnit.SECONDS );
96
EventRecord.here( StatefulMessageSet.class, EventType.VM_STARTING, currentState.name( ), this.cluster.getName( ), o.getClass( ).getSimpleName( ) ).info( );
97
EventRecord.here( StatefulMessageSet.class, EventType.VM_STARTING, currentState.name( ), this.cluster.getName( ), o.toString( ) ).debug( );
99
} catch ( TimeoutException ex1 ) {
100
request.getCallback( ).fireException( ex1 );
102
} catch ( final InterruptedException t ) {
103
Thread.currentThread( ).interrupt( );
105
StatefulMessageSet.class,
106
EventType.VM_STARTING,
108
currentState.name( ),
109
this.cluster.getName( ),
110
t.getClass( ).getSimpleName( ) ).info( );
112
nextState = this.rollback( );
114
} catch ( final Exception t ) {
116
StatefulMessageSet.class,
117
EventType.VM_STARTING,
119
currentState.name( ),
120
this.cluster.getName( ),
121
t.getClass( ).getSimpleName( ) ).info( );
123
nextState = this.rollback( );
127
EventRecord.here( StatefulMessageSet.class, EventType.VM_STARTING, currentState.name( ), EventType.TRANSITION.name( ), nextState.name( ) ).info( );
131
private boolean isSuccessful( ) {
132
return this.state.equals( this.endState );
135
private boolean isFinished( ) {
136
return this.state.equals( this.failState ) || this.state.equals( this.endState );
141
LOG.info( EventRecord.here( StatefulMessageSet.class, EventType.VM_STARTING, this.state.name( ), ( System.currentTimeMillis( ) - this.startTime )
145
this.queueEvents( this.state );
146
this.state = this.transition( this.state );
147
} catch ( final Exception ex ) {
150
} while ( !this.isFinished( ) );
151
LOG.info( EventRecord.here( StatefulMessageSet.class, this.isSuccessful( )
152
? EventType.VM_START_COMPLETED
153
: EventType.VM_START_ABORTED,
154
( System.currentTimeMillis( ) - this.startTime ) / 1000.0d + "s" ) );