81
79
import org.jboss.netty.handler.codec.http.HttpHeaders;
82
80
import org.jboss.netty.handler.codec.http.HttpResponse;
83
81
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
84
import org.jboss.netty.handler.codec.http.HttpVersion;
85
82
import org.jboss.netty.handler.timeout.IdleStateEvent;
83
import org.mule.DefaultMuleEvent;
86
84
import org.mule.DefaultMuleMessage;
85
import org.mule.DefaultMuleSession;
86
import org.mule.api.MuleEvent;
87
import org.mule.api.MuleException;
87
88
import org.mule.api.MuleMessage;
89
import org.mule.api.MuleSession;
90
import org.mule.api.endpoint.OutboundEndpoint;
91
import org.mule.api.transport.DispatchException;
92
import org.mule.transport.AbstractConnector;
88
93
import org.mule.transport.NullPayload;
90
import com.eucalyptus.auth.User;
94
import org.mule.transport.vm.VMMessageDispatcherFactory;
95
import com.eucalyptus.auth.principal.User;
91
96
import com.eucalyptus.bootstrap.Component;
97
import com.eucalyptus.context.Context;
98
import com.eucalyptus.context.Contexts;
99
import com.eucalyptus.context.NoSuchContextException;
100
import com.eucalyptus.context.ServiceContext;
101
import com.eucalyptus.http.MappingHttpMessage;
102
import com.eucalyptus.http.MappingHttpRequest;
103
import com.eucalyptus.http.MappingHttpResponse;
104
import com.eucalyptus.records.EventClass;
105
import com.eucalyptus.records.EventRecord;
106
import com.eucalyptus.records.EventType;
92
107
import com.eucalyptus.util.LogUtil;
93
import com.eucalyptus.ws.MappingHttpMessage;
94
import com.eucalyptus.ws.MappingHttpResponse;
95
108
import com.eucalyptus.ws.client.NioMessageReceiver;
96
import com.eucalyptus.ws.util.Messaging;
97
109
import com.eucalyptus.ws.util.ReplyQueue;
99
import edu.ucsb.eucalyptus.constants.EventType;
100
110
import edu.ucsb.eucalyptus.constants.IsData;
111
import edu.ucsb.eucalyptus.msgs.BaseMessage;
101
112
import edu.ucsb.eucalyptus.msgs.EucalyptusErrorMessageType;
102
import edu.ucsb.eucalyptus.msgs.EucalyptusMessage;
103
import edu.ucsb.eucalyptus.msgs.EventRecord;
104
113
import edu.ucsb.eucalyptus.msgs.GetObjectResponseType;
105
114
import edu.ucsb.eucalyptus.msgs.WalrusDataGetResponseType;
107
116
@ChannelPipelineCoverage( "one" )
108
117
public class ServiceSinkHandler extends SimpleChannelHandler {
109
private static Logger LOG = Logger.getLogger( ServiceSinkHandler.class );
110
private AtomicLong startTime = new AtomicLong(0l);
111
private final ChannelLocal<MappingHttpMessage> requestLocal = new ChannelLocal<MappingHttpMessage>( );
118
private static VMMessageDispatcherFactory dispatcherFactory = new VMMessageDispatcherFactory( );
119
private static Logger LOG = Logger.getLogger( ServiceSinkHandler.class );
120
private AtomicLong startTime = new AtomicLong( 0l );
113
private NioMessageReceiver msgReceiver;
122
private NioMessageReceiver msgReceiver;
115
124
public ServiceSinkHandler( ) {}
149
158
e.getFuture( ).cancel( );
150
159
LOG.warn( "Non-specific type being written to the channel. Not dropping this message causes breakage:" + msge.getMessage( ).getClass( ) );
152
if( e.getFuture( ).isCancelled( ) ) {
161
if ( e.getFuture( ).isCancelled( ) ) {
153
162
LOG.trace( "Cancelling send on : " + LogUtil.dumpObject( e ) );
156
165
ctx.sendDownstream( e );
160
private void sendDownstreamNewEvent( ChannelHandlerContext ctx, ChannelEvent e, EucalyptusMessage reply ) {
161
final MappingHttpMessage request = this.requestLocal.get( ctx.getChannel( ) );
162
if(request != null) {
163
if ( reply == null ) {
164
LOG.warn( "Received a null response for request: " + request.getMessageString( ) );
165
reply = new EucalyptusErrorMessageType( this.getClass( ).getSimpleName( ), ( EucalyptusMessage ) request.getMessage( ), "Received a NULL reply" );
169
private void sendDownstreamNewEvent( ChannelHandlerContext ctx, ChannelEvent e, BaseMessage reply ) {
170
MappingHttpRequest request = null;
171
Context reqCtx = null;
173
reqCtx = Contexts.lookup( reply.getCorrelationId( ) );
174
request = reqCtx.getHttpRequest( );
175
} catch ( NoSuchContextException e1 ) {
167
LOG.info( EventRecord.here( Component.eucalyptus, EventType.MSG_SERVICED, reply.getClass( ).getSimpleName( ), Long.toString( System.currentTimeMillis( ) - this.startTime.get( ) ) ) );
168
final MappingHttpResponse response = new MappingHttpResponse( request.getProtocolVersion( ) );
169
final DownstreamMessageEvent newEvent = new DownstreamMessageEvent( ctx.getChannel( ), e.getFuture( ), response, null );
170
response.setMessage( reply );
171
ctx.sendDownstream( newEvent );
178
if ( request != null ) {
179
if ( reply == null ) {
180
LOG.warn( "Received a null response for request: " + request.getMessageString( ) );
181
reply = new EucalyptusErrorMessageType( this.getClass( ).getSimpleName( ), ( BaseMessage ) request.getMessage( ), "Received a NULL reply" );
183
EventRecord.here( reply.getClass( ), EventClass.MESSAGE, EventType.MSG_SERVICED, Long.toString( System.currentTimeMillis( ) - this.startTime.get( ) ) ).trace();
184
final MappingHttpResponse response = new MappingHttpResponse( request.getProtocolVersion( ) );
185
final DownstreamMessageEvent newEvent = new DownstreamMessageEvent( ctx.getChannel( ), e.getFuture( ), response, null );
186
response.setMessage( reply );
187
ctx.sendDownstream( newEvent );
188
// Contexts.clear( reqCtx );
182
199
final MessageEvent event = ( MessageEvent ) e;
183
200
if ( event.getMessage( ) instanceof MappingHttpMessage ) {
184
201
final MappingHttpMessage request = ( MappingHttpMessage ) event.getMessage( );
185
final User user = request.getUser( );
186
this.requestLocal.set( ctx.getChannel( ), request );
187
final EucalyptusMessage msg = ( EucalyptusMessage ) request.getMessage( );
202
final User user = Contexts.lookup( request.getCorrelationId( ) ).getUser( );
203
final BaseMessage msg = ( BaseMessage ) request.getMessage( );
188
204
final String userAgent = request.getHeader( HttpHeaders.Names.USER_AGENT );
189
if( msg.getCorrelationId( ) == null ) {
190
msg.setCorrelationId( UUID.randomUUID().toString( ) );
205
if ( msg.getCorrelationId( ) == null ) {
206
String corrId = null;
208
corrId = Contexts.lookup( ctx.getChannel( ) ).getCorrelationId( );
209
} catch ( Exception e1 ) {
210
corrId = UUID.randomUUID( ).toString( );
212
msg.setCorrelationId( corrId );
192
214
if ( ( userAgent != null ) && userAgent.matches( ".*EucalyptusAdminAccess" ) && msg.getClass( ).getSimpleName( ).startsWith( "Describe" ) ) {
193
215
msg.setEffectiveUserId( msg.getUserId( ) );
194
216
} else if ( ( user != null ) && ( this.msgReceiver == null ) ) {
195
msg.setUserId( user.getUserName( ) );
196
msg.setEffectiveUserId( user.getIsAdministrator( ) ? Component.eucalyptus.name( ) : user.getUserName( ) );
217
msg.setUserId( user.getName( ) );
218
msg.setEffectiveUserId( user.isAdministrator( ) ? Component.eucalyptus.name( ) : user.getName( ) );
198
LOG.trace( EventRecord.here( Component.eucalyptus, EventType.MSG_RECEIVED, msg.getClass( ).getSimpleName( ) ) );
199
ReplyQueue.addReplyListener( msg.getCorrelationId( ), ctx );
220
EventRecord.here( ServiceSinkHandler.class, EventType.MSG_RECEIVED, msg.getClass( ).getSimpleName( ) ).trace( );
200
221
if ( this.msgReceiver == null ) {
201
Messaging.dispatch( "vm://RequestQueue", msg );
202
} else if ( ( user == null ) || ( ( user != null ) && user.getIsAdministrator( ) ) ) {
203
final MuleMessage reply = this.msgReceiver.routeMessage( new DefaultMuleMessage( msg ) );
204
ReplyQueue.removeReplyListener(msg.getCorrelationId());
206
ctx.getChannel( ).write( reply.getPayload( ) );
222
ServiceSinkHandler.dispatchRequest( msg );
223
} else if ( ( user == null ) || ( ( user != null ) && user.isAdministrator( ) ) ) {
224
this.dispatchRequest( ctx, request, msg );
208
ReplyQueue.removeReplyListener(msg.getCorrelationId());
226
Contexts.clear( Contexts.lookup( msg.getCorrelationId( ) ) );
209
227
ctx.getChannel( ).write( new MappingHttpResponse( request.getProtocolVersion( ), HttpResponseStatus.FORBIDDEN ) );
211
} else if( e instanceof IdleStateEvent ) {
229
} else if ( e instanceof IdleStateEvent ) {
212
230
LOG.warn( "Closing idle connection: " + e );
213
231
e.getFuture( ).addListener( ChannelFutureListener.CLOSE );
214
232
ctx.sendUpstream( e );
238
private void dispatchRequest( final ChannelHandlerContext ctx, final MappingHttpMessage request, final BaseMessage msg ) throws NoSuchContextException {
240
final MuleMessage reply = this.msgReceiver.routeMessage( new DefaultMuleMessage( msg ), true );
241
if ( reply != null ) {
242
ReplyQueue.handle( this.msgReceiver.getService( ).getName( ), reply, msg );
244
EventRecord.here( ServiceSinkHandler.class, EventType.MSG_SENT_ASYNC, msg.getClass( ).getSimpleName( ), this.msgReceiver.getEndpointURI( ).toString( ) );
246
} catch ( Exception e1 ) {
248
EucalyptusErrorMessageType errMsg = new EucalyptusErrorMessageType( this.msgReceiver.getService( ).getName( ), msg,
249
( e1.getCause( ) != null ? e1.getCause( ) : e1 ).getMessage( ) );
250
errMsg.setCorrelationId( msg.getCorrelationId( ) );
251
errMsg.setException( e1.getCause( ) != null ? e1.getCause( ) : e1 );
252
Contexts.clear( Contexts.lookup( errMsg.getCorrelationId( ) ) );
253
Channels.write( ctx.getChannel( ), errMsg );
257
private static void dispatchRequest( final BaseMessage msg ) throws MuleException, DispatchException {
258
OutboundEndpoint endpoint = ServiceContext.getContext( ).getRegistry( ).lookupEndpointFactory( ).getOutboundEndpoint( "vm://RequestQueue" );
259
if ( !endpoint.getConnector( ).isStarted( ) ) {
260
endpoint.getConnector( ).start( );
262
MuleMessage muleMsg = new DefaultMuleMessage( msg );
263
MuleSession muleSession = new DefaultMuleSession( muleMsg, ( ( AbstractConnector ) endpoint.getConnector( ) ).getSessionHandler( ),
264
ServiceContext.getContext( ) );
265
MuleEvent muleEvent = new DefaultMuleEvent( muleMsg, endpoint, muleSession, false );
266
dispatcherFactory.create( endpoint ).dispatch( muleEvent );
221
270
public void channelClosed( ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
223
MappingHttpMessage httpRequest = this.requestLocal.get( ctx.getChannel( ) );
224
if ( httpRequest != null && httpRequest.getMessage( ) != null && httpRequest.getMessage( ) instanceof EucalyptusMessage ) {
225
EucalyptusMessage origRequest = ( EucalyptusMessage ) httpRequest.getMessage( );
226
ReplyQueue.removeReplyListener( origRequest.getCorrelationId( ) );
272
Contexts.clear( Contexts.lookup( ctx.getChannel( ) ) );
228
273
} catch ( Throwable e1 ) {
229
274
LOG.warn( "Failed to remove the channel context on connection close.", e1 );
231
276
super.channelClosed( ctx, e );
235
280
public void messageReceived( ChannelHandlerContext ctx, MessageEvent e ) throws Exception {
236
281
super.messageReceived( ctx, e );