~ubuntu-branches/ubuntu/maverick/eucalyptus/maverick

« back to all changes in this revision

Viewing changes to clc/modules/wsstack/src/main/java/com/eucalyptus/ws/handlers/ServiceSinkHandler.java

  • Committer: Bazaar Package Importer
  • Author(s): Dave Walker (Daviey)
  • Date: 2010-07-21 17:27:10 UTC
  • mfrom: (1.1.38 upstream)
  • Revision ID: james.westby@ubuntu.com-20100721172710-7xv07dmdqgivc3t9
Tags: 2.0~bzr1211-0ubuntu1
* New major upstream version merge, 2.0 (r1211).
* debian/patches/:
  - 01-wsdl-stubs.patch, debian/wsdl.md5sums: wsdl stubs updated.
  - 02-Makefile.patch: Updated to reflect new code layout.
  - 07-local_support_euca_conf-in.patch: Updated to reflect new code layout.
  - 08-ubuntu-default-networking.patch: Refreshed.
  - 09-small-128-192MB.patch: Updated to point to new location.
  - 10-disable-iscsi.patch: Refreshed.
  - 11-state-cleanup-memleakfix.patch: Removed, fixed upstream.
  - 15-fix-default-ramdisk.patch: Updated to point to new location.
  - 16-kvm_libvirt_xml_default_use_kvm.patch: Updated to reflect changes.
  - 17-fix_walrus_OOM_errors.patch: Removed, fixed upstream.
  - 18-priv_security.patch: Updated to reflect upstream changes.
  - 20-brute-force-webui.patch: Updated to reflect upstream changes. 
  - 21-eucalyptus-1.7-with-gwt-1.6.4.patch: New patch, allows 
    eucalyptus-1.7 to be built against gwt 1.6.4. Based on patch courtesy 
    of Dmitrii Zagorodnov, upstream. (LP: #597330)
* debian/eucalyptus-java-common.links: 
  - Changed symlink for groovy, point to groovy.all.jar, making compatiable 
    with groovy versions >1.7. (LP: #595421)
  - Added ant.jar & jetty-rewrite-handler.jar as they are now required.
* debian/control
  - & debian/build-jars: Added libjavassist-java and libjetty-extra-java as 
    build dependencies.
  - Added libjetty-extra-java as a dependency of eucalyptus-java-common
* The binary resulting jar's have been renamed from eucalyptus-*-1.6.2.jar
  to eucalyptus-*-main.jar:    
  - debian/eucalyptus-cc.upstart
  - debian/eucalyptus-cloud.install
  - debian/eucalyptus-common.eucalyptus.upstart
  - debian/eucalyptus-java-common.install
  - debian/eucalyptus-network.upstart
  - debian/eucalyptus-sc.install
  - debian/eucalyptus-walrus.install
* debian/eucalyptus-java-common.install: New upstream jars that have been
  installed:
  - eucalyptus-db-hsqldb-ext-main.jar
  - eucalyptus-component-main.jar
* debian/control:
  - Updated Standards Version to 3.8.4 (no change)
  - Updated the upstream Homepage to: http://open.eucalyptus.com/
  - Changed Vcs-Bzr to reflect new location of Ubuntu hosted development branch.
  - Made the Build Dependency of groovy and the binary eucalyptus-java-common
    package depend on version >=1.7.

Show diffs side-by-side

added added

removed removed

Lines of Context:
65
65
 
66
66
import java.util.UUID;
67
67
import java.util.concurrent.atomic.AtomicLong;
68
 
 
69
68
import org.apache.log4j.Logger;
70
69
import org.jboss.netty.channel.ChannelEvent;
71
70
import org.jboss.netty.channel.ChannelFutureListener;
72
71
import org.jboss.netty.channel.ChannelHandlerContext;
73
 
import org.jboss.netty.channel.ChannelLocal;
74
72
import org.jboss.netty.channel.ChannelPipelineCoverage;
75
73
import org.jboss.netty.channel.ChannelStateEvent;
76
74
import org.jboss.netty.channel.Channels;
81
79
import org.jboss.netty.handler.codec.http.HttpHeaders;
82
80
import org.jboss.netty.handler.codec.http.HttpResponse;
83
81
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
84
 
import org.jboss.netty.handler.codec.http.HttpVersion;
85
82
import org.jboss.netty.handler.timeout.IdleStateEvent;
 
83
import org.mule.DefaultMuleEvent;
86
84
import org.mule.DefaultMuleMessage;
 
85
import org.mule.DefaultMuleSession;
 
86
import org.mule.api.MuleEvent;
 
87
import org.mule.api.MuleException;
87
88
import org.mule.api.MuleMessage;
 
89
import org.mule.api.MuleSession;
 
90
import org.mule.api.endpoint.OutboundEndpoint;
 
91
import org.mule.api.transport.DispatchException;
 
92
import org.mule.transport.AbstractConnector;
88
93
import org.mule.transport.NullPayload;
89
 
 
90
 
import com.eucalyptus.auth.User;
 
94
import org.mule.transport.vm.VMMessageDispatcherFactory;
 
95
import com.eucalyptus.auth.principal.User;
91
96
import com.eucalyptus.bootstrap.Component;
 
97
import com.eucalyptus.context.Context;
 
98
import com.eucalyptus.context.Contexts;
 
99
import com.eucalyptus.context.NoSuchContextException;
 
100
import com.eucalyptus.context.ServiceContext;
 
101
import com.eucalyptus.http.MappingHttpMessage;
 
102
import com.eucalyptus.http.MappingHttpRequest;
 
103
import com.eucalyptus.http.MappingHttpResponse;
 
104
import com.eucalyptus.records.EventClass;
 
105
import com.eucalyptus.records.EventRecord;
 
106
import com.eucalyptus.records.EventType;
92
107
import com.eucalyptus.util.LogUtil;
93
 
import com.eucalyptus.ws.MappingHttpMessage;
94
 
import com.eucalyptus.ws.MappingHttpResponse;
95
108
import com.eucalyptus.ws.client.NioMessageReceiver;
96
 
import com.eucalyptus.ws.util.Messaging;
97
109
import com.eucalyptus.ws.util.ReplyQueue;
98
 
 
99
 
import edu.ucsb.eucalyptus.constants.EventType;
100
110
import edu.ucsb.eucalyptus.constants.IsData;
 
111
import edu.ucsb.eucalyptus.msgs.BaseMessage;
101
112
import edu.ucsb.eucalyptus.msgs.EucalyptusErrorMessageType;
102
 
import edu.ucsb.eucalyptus.msgs.EucalyptusMessage;
103
 
import edu.ucsb.eucalyptus.msgs.EventRecord;
104
113
import edu.ucsb.eucalyptus.msgs.GetObjectResponseType;
105
114
import edu.ucsb.eucalyptus.msgs.WalrusDataGetResponseType;
106
115
 
107
116
@ChannelPipelineCoverage( "one" )
108
117
public class ServiceSinkHandler extends SimpleChannelHandler {
109
 
  private static Logger                          LOG          = Logger.getLogger( ServiceSinkHandler.class );
110
 
  private AtomicLong  startTime = new AtomicLong(0l);
111
 
  private final ChannelLocal<MappingHttpMessage> requestLocal = new ChannelLocal<MappingHttpMessage>( );
 
118
  private static VMMessageDispatcherFactory dispatcherFactory = new VMMessageDispatcherFactory( );
 
119
  private static Logger                     LOG               = Logger.getLogger( ServiceSinkHandler.class );
 
120
  private AtomicLong                        startTime         = new AtomicLong( 0l );
112
121
  
113
 
  private NioMessageReceiver                     msgReceiver;
 
122
  private NioMessageReceiver                msgReceiver;
114
123
  
115
124
  public ServiceSinkHandler( ) {}
116
125
  
136
145
        ctx.sendDownstream( e );
137
146
      } else if ( msge.getMessage( ) instanceof IsData ) {// Pass through for chunked messaging
138
147
        ctx.sendDownstream( e );
139
 
      } else if ( msge.getMessage( ) instanceof EucalyptusMessage ) {// Handle single request-response MEP
140
 
        EucalyptusMessage reply = ( EucalyptusMessage ) ( ( MessageEvent ) e ).getMessage( );
141
 
        if ( reply instanceof WalrusDataGetResponseType 
142
 
                && !( reply instanceof GetObjectResponseType && ((GetObjectResponseType)reply).getBase64Data( ) != null ) ) {
 
148
      } else if ( msge.getMessage( ) instanceof BaseMessage ) {// Handle single request-response MEP
 
149
        BaseMessage reply = ( BaseMessage ) ( ( MessageEvent ) e ).getMessage( );
 
150
        if ( reply instanceof WalrusDataGetResponseType
 
151
             && !( reply instanceof GetObjectResponseType && ( ( GetObjectResponseType ) reply ).getBase64Data( ) != null ) ) {
143
152
          e.getFuture( ).cancel( );
144
153
          return;
145
154
        } else {
149
158
        e.getFuture( ).cancel( );
150
159
        LOG.warn( "Non-specific type being written to the channel. Not dropping this message causes breakage:" + msge.getMessage( ).getClass( ) );
151
160
      }
152
 
      if( e.getFuture( ).isCancelled( ) ) {
 
161
      if ( e.getFuture( ).isCancelled( ) ) {
153
162
        LOG.trace( "Cancelling send on : " + LogUtil.dumpObject( e ) );
154
 
      } 
 
163
      }
155
164
    } else {
156
165
      ctx.sendDownstream( e );
157
166
    }
158
167
  }
159
 
 
160
 
  private void sendDownstreamNewEvent( ChannelHandlerContext ctx, ChannelEvent e, EucalyptusMessage reply ) {
161
 
    final MappingHttpMessage request = this.requestLocal.get( ctx.getChannel( ) );
162
 
    if(request != null) {
163
 
    if ( reply == null ) {
164
 
      LOG.warn( "Received a null response for request: " + request.getMessageString( ) );
165
 
      reply = new EucalyptusErrorMessageType( this.getClass( ).getSimpleName( ), ( EucalyptusMessage ) request.getMessage( ), "Received a NULL reply" );
 
168
  
 
169
  private void sendDownstreamNewEvent( ChannelHandlerContext ctx, ChannelEvent e, BaseMessage reply ) {
 
170
    MappingHttpRequest request = null;
 
171
    Context reqCtx = null; 
 
172
    try {
 
173
      reqCtx = Contexts.lookup( reply.getCorrelationId( ) );
 
174
      request = reqCtx.getHttpRequest( );
 
175
    } catch ( NoSuchContextException e1 ) {
 
176
      LOG.debug( e1, e1 );
166
177
    }
167
 
    LOG.info( EventRecord.here( Component.eucalyptus, EventType.MSG_SERVICED, reply.getClass( ).getSimpleName( ), Long.toString( System.currentTimeMillis( ) - this.startTime.get( ) ) ) );
168
 
    final MappingHttpResponse response = new MappingHttpResponse( request.getProtocolVersion( ) );
169
 
    final DownstreamMessageEvent newEvent = new DownstreamMessageEvent( ctx.getChannel( ), e.getFuture( ), response, null );
170
 
    response.setMessage( reply );
171
 
    ctx.sendDownstream( newEvent );
 
178
    if ( request != null ) {
 
179
      if ( reply == null ) {
 
180
        LOG.warn( "Received a null response for request: " + request.getMessageString( ) );
 
181
        reply = new EucalyptusErrorMessageType( this.getClass( ).getSimpleName( ), ( BaseMessage ) request.getMessage( ), "Received a NULL reply" );
 
182
      }
 
183
      EventRecord.here( reply.getClass( ), EventClass.MESSAGE, EventType.MSG_SERVICED, Long.toString( System.currentTimeMillis( ) - this.startTime.get( ) ) ).trace();
 
184
      final MappingHttpResponse response = new MappingHttpResponse( request.getProtocolVersion( ) );
 
185
      final DownstreamMessageEvent newEvent = new DownstreamMessageEvent( ctx.getChannel( ), e.getFuture( ), response, null );
 
186
      response.setMessage( reply );
 
187
      ctx.sendDownstream( newEvent );
 
188
//      Contexts.clear( reqCtx );
172
189
    }
173
190
  }
174
191
  
182
199
      final MessageEvent event = ( MessageEvent ) e;
183
200
      if ( event.getMessage( ) instanceof MappingHttpMessage ) {
184
201
        final MappingHttpMessage request = ( MappingHttpMessage ) event.getMessage( );
185
 
        final User user = request.getUser( );
186
 
        this.requestLocal.set( ctx.getChannel( ), request );
187
 
        final EucalyptusMessage msg = ( EucalyptusMessage ) request.getMessage( );
 
202
        final User user = Contexts.lookup( request.getCorrelationId( ) ).getUser( );
 
203
        final BaseMessage msg = ( BaseMessage ) request.getMessage( );
188
204
        final String userAgent = request.getHeader( HttpHeaders.Names.USER_AGENT );
189
 
        if( msg.getCorrelationId( ) == null ) {
190
 
          msg.setCorrelationId( UUID.randomUUID().toString( ) );
 
205
        if ( msg.getCorrelationId( ) == null ) {
 
206
          String corrId = null;
 
207
          try {
 
208
            corrId = Contexts.lookup( ctx.getChannel( ) ).getCorrelationId( );
 
209
          } catch ( Exception e1 ) {
 
210
            corrId = UUID.randomUUID( ).toString( );
 
211
          }
 
212
          msg.setCorrelationId( corrId );
191
213
        }
192
214
        if ( ( userAgent != null ) && userAgent.matches( ".*EucalyptusAdminAccess" ) && msg.getClass( ).getSimpleName( ).startsWith( "Describe" ) ) {
193
215
          msg.setEffectiveUserId( msg.getUserId( ) );
194
216
        } else if ( ( user != null ) && ( this.msgReceiver == null ) ) {
195
 
          msg.setUserId( user.getUserName( ) );
196
 
          msg.setEffectiveUserId( user.getIsAdministrator( ) ? Component.eucalyptus.name( ) : user.getUserName( ) );
 
217
          msg.setUserId( user.getName( ) );
 
218
          msg.setEffectiveUserId( user.isAdministrator( ) ? Component.eucalyptus.name( ) : user.getName( ) );
197
219
        }
198
 
        LOG.trace( EventRecord.here( Component.eucalyptus, EventType.MSG_RECEIVED, msg.getClass( ).getSimpleName( ) ) );
199
 
        ReplyQueue.addReplyListener( msg.getCorrelationId( ), ctx );
 
220
        EventRecord.here( ServiceSinkHandler.class, EventType.MSG_RECEIVED, msg.getClass( ).getSimpleName( ) ).trace( );
200
221
        if ( this.msgReceiver == null ) {
201
 
          Messaging.dispatch( "vm://RequestQueue", msg );
202
 
        } else if ( ( user == null ) || ( ( user != null ) && user.getIsAdministrator( ) ) ) {
203
 
          final MuleMessage reply = this.msgReceiver.routeMessage( new DefaultMuleMessage( msg ) );
204
 
          ReplyQueue.removeReplyListener(msg.getCorrelationId());
205
 
          if(reply != null)
206
 
              ctx.getChannel( ).write( reply.getPayload( ) );
 
222
          ServiceSinkHandler.dispatchRequest( msg );
 
223
        } else if ( ( user == null ) || ( ( user != null ) && user.isAdministrator( ) ) ) {
 
224
          this.dispatchRequest( ctx, request, msg );
207
225
        } else {
208
 
          ReplyQueue.removeReplyListener(msg.getCorrelationId());       
 
226
          Contexts.clear( Contexts.lookup( msg.getCorrelationId( ) ) );
209
227
          ctx.getChannel( ).write( new MappingHttpResponse( request.getProtocolVersion( ), HttpResponseStatus.FORBIDDEN ) );
210
228
        }
211
 
      } else if( e instanceof IdleStateEvent ) {
 
229
      } else if ( e instanceof IdleStateEvent ) {
212
230
        LOG.warn( "Closing idle connection: " + e );
213
231
        e.getFuture( ).addListener( ChannelFutureListener.CLOSE );
214
232
        ctx.sendUpstream( e );
215
233
      }
216
 
 
217
 
    }
 
234
      
 
235
    }
 
236
  }
 
237
  
 
238
  private void dispatchRequest( final ChannelHandlerContext ctx, final MappingHttpMessage request, final BaseMessage msg ) throws NoSuchContextException {
 
239
    try {
 
240
      final MuleMessage reply = this.msgReceiver.routeMessage( new DefaultMuleMessage( msg ), true );
 
241
      if ( reply != null ) {
 
242
        ReplyQueue.handle( this.msgReceiver.getService( ).getName( ), reply, msg );
 
243
      } else {
 
244
        EventRecord.here( ServiceSinkHandler.class, EventType.MSG_SENT_ASYNC, msg.getClass( ).getSimpleName( ), this.msgReceiver.getEndpointURI( ).toString( ) );
 
245
      }
 
246
    } catch ( Exception e1 ) {
 
247
      LOG.error( e1, e1 );
 
248
      EucalyptusErrorMessageType errMsg = new EucalyptusErrorMessageType( this.msgReceiver.getService( ).getName( ), msg,
 
249
                                                                          ( e1.getCause( ) != null ? e1.getCause( ) : e1 ).getMessage( ) );
 
250
      errMsg.setCorrelationId( msg.getCorrelationId( ) );
 
251
      errMsg.setException( e1.getCause( ) != null ? e1.getCause( ) : e1 );
 
252
      Contexts.clear( Contexts.lookup( errMsg.getCorrelationId( ) ) );
 
253
      Channels.write( ctx.getChannel( ), errMsg );
 
254
    }
 
255
  }
 
256
  
 
257
  private static void dispatchRequest( final BaseMessage msg ) throws MuleException, DispatchException {
 
258
    OutboundEndpoint endpoint = ServiceContext.getContext( ).getRegistry( ).lookupEndpointFactory( ).getOutboundEndpoint( "vm://RequestQueue" );
 
259
    if ( !endpoint.getConnector( ).isStarted( ) ) {
 
260
      endpoint.getConnector( ).start( );
 
261
    }
 
262
    MuleMessage muleMsg = new DefaultMuleMessage( msg );
 
263
    MuleSession muleSession = new DefaultMuleSession( muleMsg, ( ( AbstractConnector ) endpoint.getConnector( ) ).getSessionHandler( ),
 
264
                                                      ServiceContext.getContext( ) );
 
265
    MuleEvent muleEvent = new DefaultMuleEvent( muleMsg, endpoint, muleSession, false );
 
266
    dispatcherFactory.create( endpoint ).dispatch( muleEvent );
218
267
  }
219
268
  
220
269
  @Override
221
270
  public void channelClosed( ChannelHandlerContext ctx, ChannelStateEvent e ) throws Exception {
222
271
    try {
223
 
      MappingHttpMessage httpRequest = this.requestLocal.get( ctx.getChannel( ) );
224
 
      if ( httpRequest != null && httpRequest.getMessage( ) != null && httpRequest.getMessage( ) instanceof EucalyptusMessage ) {
225
 
        EucalyptusMessage origRequest = ( EucalyptusMessage ) httpRequest.getMessage( );
226
 
        ReplyQueue.removeReplyListener( origRequest.getCorrelationId( ) );
227
 
      }
 
272
      Contexts.clear( Contexts.lookup( ctx.getChannel( ) ) );
228
273
    } catch ( Throwable e1 ) {
229
274
      LOG.warn( "Failed to remove the channel context on connection close.", e1 );
230
275
    }
231
276
    super.channelClosed( ctx, e );
232
277
  }
233
 
 
 
278
  
234
279
  @Override
235
280
  public void messageReceived( ChannelHandlerContext ctx, MessageEvent e ) throws Exception {
236
281
    super.messageReceived( ctx, e );