508
516
protected RequestGroupInfo global = new RequestGroupInfo();
509
517
protected AtomicLong registerCount = new AtomicLong(0);
511
protected ConcurrentHashMap<S,P> connections =
512
new ConcurrentHashMap<S,P>();
519
protected ConcurrentHashMap<S,Processor<S>> connections =
520
new ConcurrentHashMap<S,Processor<S>>();
514
522
protected RecycledProcessors<P,S> recycledProcessors =
515
523
new RecycledProcessors<P,S>(this);
532
540
public SocketState process(SocketWrapper<S> socket,
533
541
SocketStatus status) {
534
P processor = connections.remove(socket.getSocket());
536
if (getLog().isDebugEnabled()) {
537
getLog().debug("process() entry " +
538
"Socket: [" + logHashcode(socket.getSocket()) + "], " +
539
"Processor [" + logHashcode(processor) + "]");
542
Processor<S> processor = connections.remove(socket.getSocket());
542
544
socket.setAsync(false);
549
551
processor = createProcessor();
552
if (getLog().isDebugEnabled()) {
553
getLog().debug("process() gotProcessor " +
554
"Socket: [" + logHashcode(socket.getSocket()) + "], " +
555
"Processor [" + logHashcode(processor) + "]");
558
554
initSsl(socket, processor);
560
556
SocketState state = SocketState.CLOSED;
562
558
if (processor.isAsync() || state == SocketState.ASYNC_END) {
563
559
state = processor.asyncDispatch(status);
564
if (getLog().isDebugEnabled()) {
565
getLog().debug("process() asyncDispatch " +
566
"Socket: [" + logHashcode(socket.getSocket()) + "], " +
567
"Processor: [" + logHashcode(processor) + "], " +
568
"State: [" + state.toString() + "]");
570
560
} else if (processor.isComet()) {
571
561
state = processor.event(status);
572
if (getLog().isDebugEnabled()) {
573
getLog().debug("process() event " +
574
"Socket: [" + logHashcode(socket.getSocket()) + "], " +
575
"Processor: [" + logHashcode(processor) + "], " +
576
"State: [" + state.toString() + "]");
562
} else if (processor.isUpgrade()) {
563
state = processor.upgradeDispatch();
579
565
state = processor.process(socket);
580
if (getLog().isDebugEnabled()) {
581
getLog().debug("process() process " +
582
"Socket: [" + logHashcode(socket.getSocket()) + "], " +
583
"Processor: [" + logHashcode(processor) + "], " +
584
"State: [" + state.toString() + "]");
588
568
if (state != SocketState.CLOSED && processor.isAsync()) {
589
569
state = processor.asyncPostProcess();
590
if (getLog().isDebugEnabled()) {
591
getLog().debug("process() asyncPostProcess " +
592
"Socket: [" + logHashcode(socket.getSocket()) + "], " +
593
"Processor: [" + logHashcode(processor) + "], " +
594
"State: [" + state.toString() + "]");
572
if (state == SocketState.UPGRADING) {
573
// Get the UpgradeInbound handler
574
UpgradeInbound inbound = processor.getUpgradeInbound();
575
// Release the Http11 processor to be re-used
576
release(socket, processor, false, false);
577
// Create the light-weight upgrade processor
578
processor = createUpgradeProcessor(socket, inbound);
579
inbound.onUpgradeComplete();
598
} while (state == SocketState.ASYNC_END);
581
} while (state == SocketState.ASYNC_END ||
582
state == SocketState.UPGRADING);
600
584
if (state == SocketState.LONG) {
601
585
// In the middle of processing a request/response. Keep the
611
595
// closed. If it works, the socket will be re-added to the
613
597
release(socket, processor, false, false);
598
} else if (state == SocketState.UPGRADED) {
599
// Need to keep the connection associated with the processor
600
longPoll(socket, processor);
615
602
// Connection closed. OK to recycle the processor.
616
release(socket, processor, true, false);
603
if (!(processor instanceof UpgradeProcessor)) {
604
release(socket, processor, true, false);
619
608
} catch(java.net.SocketException e) {
635
624
// less-than-verbose logs.
636
625
getLog().error(sm.getString("ajpprotocol.proto.error"), e);
638
release(socket, processor, true, false);
627
// Don't try to add upgrade processors back into the pool
628
if (!(processor instanceof UpgradeProcessor)) {
629
release(socket, processor, true, false);
639
631
return SocketState.CLOSED;
642
private String logHashcode (Object o) {
646
return Integer.toString(o.hashCode());
650
634
protected abstract P createProcessor();
651
protected abstract void initSsl(SocketWrapper<S> socket, P processor);
652
protected abstract void longPoll(SocketWrapper<S> socket, P processor);
653
protected abstract void release(SocketWrapper<S> socket, P processor,
654
boolean socketClosing, boolean addToPoller);
635
protected abstract void initSsl(SocketWrapper<S> socket,
636
Processor<S> processor);
637
protected abstract void longPoll(SocketWrapper<S> socket,
638
Processor<S> processor);
639
protected abstract void release(SocketWrapper<S> socket,
640
Processor<S> processor, boolean socketClosing,
641
boolean addToPoller);
642
protected abstract Processor<S> createUpgradeProcessor(
643
SocketWrapper<S> socket,
644
UpgradeInbound inbound) throws IOException;
657
646
protected void register(AbstractProcessor<S> processor) {
658
647
if (getProtocol().getDomain() != null) {
684
protected void unregister(AbstractProcessor<S> processor) {
673
protected void unregister(Processor<S> processor) {
685
674
if (getProtocol().getDomain() != null) {
686
675
synchronized (this) {
689
processor.getRequest().getRequestProcessor();
677
Request r = processor.getRequest();
679
// Probably an UpgradeProcessor
682
RequestInfo rp = r.getRequestProcessor();
690
683
rp.setGlobalProcessor(null);
691
684
ObjectName rpName = rp.getRpName();
692
685
if (getLog().isDebugEnabled()) {