2
* Licensed to the Apache Software Foundation (ASF) under one or more
3
* contributor license agreements. See the NOTICE file distributed with
4
* this work for additional information regarding copyright ownership.
5
* The ASF licenses this file to You under the Apache License, Version 2.0
6
* (the "License"); you may not use this file except in compliance with
7
* the License. You may obtain a copy of the License at
9
* http://www.apache.org/licenses/LICENSE-2.0
11
* Unless required by applicable law or agreed to in writing, software
12
* distributed under the License is distributed on an "AS IS" BASIS,
13
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
* See the License for the specific language governing permissions and
15
* limitations under the License.
18
package org.apache.tomcat.util.net;
20
import java.io.OutputStreamWriter;
21
import java.net.InetAddress;
22
import java.net.InetSocketAddress;
23
import java.util.ArrayList;
24
import java.util.HashMap;
25
import java.util.concurrent.Executor;
27
import org.apache.juli.logging.Log;
28
import org.apache.juli.logging.LogFactory;
29
import org.apache.tomcat.jni.Address;
30
import org.apache.tomcat.jni.Error;
31
import org.apache.tomcat.jni.File;
32
import org.apache.tomcat.jni.Library;
33
import org.apache.tomcat.jni.OS;
34
import org.apache.tomcat.jni.Poll;
35
import org.apache.tomcat.jni.Pool;
36
import org.apache.tomcat.jni.SSL;
37
import org.apache.tomcat.jni.SSLContext;
38
import org.apache.tomcat.jni.SSLSocket;
39
import org.apache.tomcat.jni.Socket;
40
import org.apache.tomcat.jni.Status;
41
import org.apache.tomcat.util.res.StringManager;
44
* APR tailored thread pool, providing the following services:
46
* <li>Socket acceptor thread</li>
47
* <li>Socket poller thread</li>
48
* <li>Sendfile thread</li>
49
* <li>Worker threads pool</li>
52
* When switching to Java 5, there's an opportunity to use the virtual
53
* machine's thread pool.
56
* @author Remy Maucherat
58
public class AprEndpoint {
61
// -------------------------------------------------------------- Constants
64
protected static Log log = LogFactory.getLog(AprEndpoint.class);
66
protected static StringManager sm =
67
StringManager.getManager("org.apache.tomcat.util.net.res");
71
* The Request attribute key for the cipher suite.
73
public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";
76
* The Request attribute key for the key size.
78
public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
81
* The Request attribute key for the client certificate chain.
83
public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";
86
* The Request attribute key for the session id.
87
* This one is a Tomcat extension to the Servlet spec.
89
public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";
92
// ----------------------------------------------------------------- Fields
98
protected WorkerStack workers = null;
102
* Running state of the endpoint.
104
protected volatile boolean running = false;
108
* Will be set to true whenever the endpoint is paused.
110
protected volatile boolean paused = false;
114
* Track the initialization state of the endpoint.
116
protected boolean initialized = false;
120
* Current worker threads busy count.
122
protected int curThreadsBusy = 0;
126
* Current worker threads count.
128
protected int curThreads = 0;
132
* Sequence number used to generate thread names.
134
protected int sequence = 0;
138
* Root APR memory pool.
140
protected long rootPool = 0;
144
* Server socket "pointer".
146
protected long serverSock = 0;
150
* APR memory pool for the server socket.
152
protected long serverSockPool = 0;
158
protected long sslContext = 0;
161
// ------------------------------------------------------------- Properties
167
protected boolean deferAccept = true;
168
public void setDeferAccept(boolean deferAccept) { this.deferAccept = deferAccept; }
169
public boolean getDeferAccept() { return deferAccept; }
173
* External Executor based thread pool.
175
protected Executor executor = null;
176
public void setExecutor(Executor executor) { this.executor = executor; }
177
public Executor getExecutor() { return executor; }
181
* Maximum amount of worker threads.
183
protected int maxThreads = 200;
184
public void setMaxThreads(int maxThreads) {
185
this.maxThreads = maxThreads;
187
synchronized(workers) {
188
workers.resize(maxThreads);
192
public int getMaxThreads() {
193
if (executor != null) {
202
* Priority of the acceptor and poller threads.
204
protected int threadPriority = Thread.NORM_PRIORITY;
205
public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; }
206
public int getThreadPriority() { return threadPriority; }
210
* Size of the socket poller.
212
protected int pollerSize = 8 * 1024;
213
public void setPollerSize(int pollerSize) { this.pollerSize = pollerSize; }
214
public int getPollerSize() { return pollerSize; }
218
* Size of the sendfile (= concurrent files which can be served).
220
protected int sendfileSize = 1 * 1024;
221
public void setSendfileSize(int sendfileSize) { this.sendfileSize = sendfileSize; }
222
public int getSendfileSize() { return sendfileSize; }
226
* Server socket port.
229
public int getPort() { return port; }
230
public void setPort(int port ) { this.port=port; }
234
* Address for the server socket.
236
protected InetAddress address;
237
public InetAddress getAddress() { return address; }
238
public void setAddress(InetAddress address) { this.address = address; }
242
* Handling of accepted sockets.
244
protected Handler handler = null;
245
public void setHandler(Handler handler ) { this.handler = handler; }
246
public Handler getHandler() { return handler; }
250
* Allows the server developer to specify the backlog that
251
* should be used for server sockets. By default, this value
254
protected int backlog = 100;
255
public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; }
256
public int getBacklog() { return backlog; }
260
* Socket TCP no delay.
262
protected boolean tcpNoDelay = false;
263
public boolean getTcpNoDelay() { return tcpNoDelay; }
264
public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; }
270
protected int soLinger = 100;
271
public int getSoLinger() { return soLinger; }
272
public void setSoLinger(int soLinger) { this.soLinger = soLinger; }
278
protected int soTimeout = -1;
279
public int getSoTimeout() { return soTimeout; }
280
public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; }
284
* Keep-Alive timeout.
286
protected int keepAliveTimeout = -1;
287
public int getKeepAliveTimeout() { return keepAliveTimeout; }
288
public void setKeepAliveTimeout(int keepAliveTimeout) { this.keepAliveTimeout = keepAliveTimeout; }
292
* Poll interval, in microseconds. The smaller the value, the more CPU the poller
293
* will use, but the more responsive to activity it will be.
295
protected int pollTime = 2000;
296
public int getPollTime() { return pollTime; }
297
public void setPollTime(int pollTime) { if (pollTime > 0) { this.pollTime = pollTime; } }
301
* The default is true - the created threads will be
302
* in daemon mode. If set to false, the control thread
303
* will not be daemon - and will keep the process alive.
305
protected boolean daemon = true;
306
public void setDaemon(boolean b) { daemon = b; }
307
public boolean getDaemon() { return daemon; }
311
* Name of the thread pool, which will be used for naming child threads.
313
protected String name = "TP";
314
public void setName(String name) { this.name = name; }
315
public String getName() { return name; }
319
* Use endfile for sending static files.
321
protected boolean useSendfile = Library.APR_HAS_SENDFILE;
322
public void setUseSendfile(boolean useSendfile) { this.useSendfile = useSendfile; }
323
public boolean getUseSendfile() { return useSendfile; }
327
* Allow comet request handling.
329
protected boolean useComet = true;
330
public void setUseComet(boolean useComet) { this.useComet = useComet; }
331
public boolean getUseComet() { return useComet; }
335
* Acceptor thread count.
337
protected int acceptorThreadCount = 0;
338
public void setAcceptorThreadCount(int acceptorThreadCount) { this.acceptorThreadCount = acceptorThreadCount; }
339
public int getAcceptorThreadCount() { return acceptorThreadCount; }
343
* Sendfile thread count.
345
protected int sendfileThreadCount = 0;
346
public void setSendfileThreadCount(int sendfileThreadCount) { this.sendfileThreadCount = sendfileThreadCount; }
347
public int getSendfileThreadCount() { return sendfileThreadCount; }
351
* Poller thread count.
353
protected int pollerThreadCount = 0;
354
public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; }
355
public int getPollerThreadCount() { return pollerThreadCount; }
361
protected Poller[] pollers = null;
362
protected int pollerRoundRobin = 0;
363
public Poller getPoller() {
364
pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length;
365
return pollers[pollerRoundRobin];
370
* The socket poller used for Comet support.
372
protected Poller[] cometPollers = null;
373
protected int cometPollerRoundRobin = 0;
374
public Poller getCometPoller() {
375
cometPollerRoundRobin = (cometPollerRoundRobin + 1) % cometPollers.length;
376
return cometPollers[cometPollerRoundRobin];
381
* The static file sender.
383
protected Sendfile[] sendfiles = null;
384
protected int sendfileRoundRobin = 0;
385
public Sendfile getSendfile() {
386
sendfileRoundRobin = (sendfileRoundRobin + 1) % sendfiles.length;
387
return sendfiles[sendfileRoundRobin];
392
* Dummy maxSpareThreads property.
394
public int getMaxSpareThreads() { return 0; }
398
* Dummy minSpareThreads property.
400
public int getMinSpareThreads() { return 0; }
406
protected int unlockTimeout = 250;
407
public int getUnlockTimeout() { return unlockTimeout; }
408
public void setUnlockTimeout(int unlockTimeout) {
409
this.unlockTimeout = unlockTimeout;
416
protected boolean SSLEnabled = false;
417
public boolean isSSLEnabled() { return SSLEnabled; }
418
public void setSSLEnabled(boolean SSLEnabled) { this.SSLEnabled = SSLEnabled; }
424
protected String SSLProtocol = "all";
425
public String getSSLProtocol() { return SSLProtocol; }
426
public void setSSLProtocol(String SSLProtocol) { this.SSLProtocol = SSLProtocol; }
430
* SSL password (if a cert is encrypted, and no password has been provided, a callback
431
* will ask for a password).
433
protected String SSLPassword = null;
434
public String getSSLPassword() { return SSLPassword; }
435
public void setSSLPassword(String SSLPassword) { this.SSLPassword = SSLPassword; }
441
protected String SSLCipherSuite = "ALL";
442
public String getSSLCipherSuite() { return SSLCipherSuite; }
443
public void setSSLCipherSuite(String SSLCipherSuite) { this.SSLCipherSuite = SSLCipherSuite; }
447
* SSL certificate file.
449
protected String SSLCertificateFile = null;
450
public String getSSLCertificateFile() { return SSLCertificateFile; }
451
public void setSSLCertificateFile(String SSLCertificateFile) { this.SSLCertificateFile = SSLCertificateFile; }
455
* SSL certificate key file.
457
protected String SSLCertificateKeyFile = null;
458
public String getSSLCertificateKeyFile() { return SSLCertificateKeyFile; }
459
public void setSSLCertificateKeyFile(String SSLCertificateKeyFile) { this.SSLCertificateKeyFile = SSLCertificateKeyFile; }
463
* SSL certificate chain file.
465
protected String SSLCertificateChainFile = null;
466
public String getSSLCertificateChainFile() { return SSLCertificateChainFile; }
467
public void setSSLCertificateChainFile(String SSLCertificateChainFile) { this.SSLCertificateChainFile = SSLCertificateChainFile; }
471
* SSL CA certificate path.
473
protected String SSLCACertificatePath = null;
474
public String getSSLCACertificatePath() { return SSLCACertificatePath; }
475
public void setSSLCACertificatePath(String SSLCACertificatePath) { this.SSLCACertificatePath = SSLCACertificatePath; }
479
* SSL CA certificate file.
481
protected String SSLCACertificateFile = null;
482
public String getSSLCACertificateFile() { return SSLCACertificateFile; }
483
public void setSSLCACertificateFile(String SSLCACertificateFile) { this.SSLCACertificateFile = SSLCACertificateFile; }
487
* SSL CA revocation path.
489
protected String SSLCARevocationPath = null;
490
public String getSSLCARevocationPath() { return SSLCARevocationPath; }
491
public void setSSLCARevocationPath(String SSLCARevocationPath) { this.SSLCARevocationPath = SSLCARevocationPath; }
495
* SSL CA revocation file.
497
protected String SSLCARevocationFile = null;
498
public String getSSLCARevocationFile() { return SSLCARevocationFile; }
499
public void setSSLCARevocationFile(String SSLCARevocationFile) { this.SSLCARevocationFile = SSLCARevocationFile; }
505
protected String SSLVerifyClient = "none";
506
public String getSSLVerifyClient() { return SSLVerifyClient; }
507
public void setSSLVerifyClient(String SSLVerifyClient) { this.SSLVerifyClient = SSLVerifyClient; }
513
protected int SSLVerifyDepth = 10;
514
public int getSSLVerifyDepth() { return SSLVerifyDepth; }
515
public void setSSLVerifyDepth(int SSLVerifyDepth) { this.SSLVerifyDepth = SSLVerifyDepth; }
518
// --------------------------------------------------------- Public Methods
522
* Number of keepalive sockets.
524
public int getKeepAliveCount() {
525
if (pollers == null) {
528
int keepAliveCount = 0;
529
for (int i = 0; i < pollers.length; i++) {
530
keepAliveCount += pollers[i].getKeepAliveCount();
532
return keepAliveCount;
538
* Number of sendfile sockets.
540
public int getSendfileCount() {
541
if (sendfiles == null) {
544
int sendfileCount = 0;
545
for (int i = 0; i < sendfiles.length; i++) {
546
sendfileCount += sendfiles[i].getSendfileCount();
548
return sendfileCount;
554
* Return the amount of threads that are managed by the pool.
556
* @return the amount of threads that are managed by the pool
558
public int getCurrentThreadCount() {
559
if (executor != null) {
567
* Return the amount of threads that are in use
569
* @return the amount of threads that are in use
571
public int getCurrentThreadsBusy() {
572
if (executor!=null) {
575
return workers!=null?curThreads - workers.size():0;
581
* Return the state of the endpoint.
583
* @return true if the endpoint is running, false otherwise
585
public boolean isRunning() {
591
* Return the state of the endpoint.
593
* @return true if the endpoint is paused, false otherwise
595
public boolean isPaused() {
600
// ----------------------------------------------- Public Lifecycle Methods
604
* Initialize the endpoint.
612
// Create the root APR memory pool
613
rootPool = Pool.create(0);
614
// Create the pool for the server socket
615
serverSockPool = Pool.create(rootPool);
616
// Create the APR address that will be bound
617
String addressStr = null;
618
if (address == null) {
621
addressStr = address.getHostAddress();
623
int family = Socket.APR_INET;
624
if (Library.APR_HAVE_IPV6) {
625
if (addressStr == null) {
626
if (!OS.IS_BSD && !OS.IS_WIN32 && !OS.IS_WIN64)
627
family = Socket.APR_UNSPEC;
628
} else if (addressStr.indexOf(':') >= 0) {
629
family = Socket.APR_UNSPEC;
633
long inetAddress = Address.info(addressStr, family,
635
// Create the APR server socket
636
serverSock = Socket.create(Address.getInfo(inetAddress).family,
638
Socket.APR_PROTO_TCP, rootPool);
640
Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1);
642
// Deal with the firewalls that tend to drop the inactive sockets
643
Socket.optSet(serverSock, Socket.APR_SO_KEEPALIVE, 1);
644
// Bind the server socket
645
int ret = Socket.bind(serverSock, inetAddress);
647
throw new Exception(sm.getString("endpoint.init.bind", "" + ret, Error.strerror(ret)));
649
// Start listening on the server socket
650
ret = Socket.listen(serverSock, backlog);
652
throw new Exception(sm.getString("endpoint.init.listen", "" + ret, Error.strerror(ret)));
654
if (OS.IS_WIN32 || OS.IS_WIN64) {
655
// On Windows set the reuseaddr flag after the bind/listen
656
Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1);
659
// Sendfile usage on systems which don't support it cause major problems
660
if (useSendfile && !Library.APR_HAS_SENDFILE) {
664
// Initialize thread count defaults for acceptor, poller and sendfile
665
if (acceptorThreadCount == 0) {
666
// FIXME: Doesn't seem to work that well with multiple accept threads
667
acceptorThreadCount = 1;
669
if (pollerThreadCount == 0) {
670
if ((OS.IS_WIN32 || OS.IS_WIN64) && (pollerSize > 1024)) {
671
// The maximum per poller to get reasonable performance is 1024
672
pollerThreadCount = pollerSize / 1024;
673
// Adjust poller size so that it won't reach the limit
674
pollerSize = pollerSize - (pollerSize % 1024);
676
// No explicit poller size limitation
677
pollerThreadCount = 1;
680
if (sendfileThreadCount == 0) {
681
if ((OS.IS_WIN32 || OS.IS_WIN64) && (sendfileSize > 1024)) {
682
// The maximum per poller to get reasonable performance is 1024
683
sendfileThreadCount = sendfileSize / 1024;
684
// Adjust poller size so that it won't reach the limit
685
sendfileSize = sendfileSize - (sendfileSize % 1024);
687
// No explicit poller size limitation
688
// FIXME: Default to one per CPU ?
689
sendfileThreadCount = 1;
693
// Delay accepting of new connections until data is available
694
// Only Linux kernels 2.4 + have that implemented
695
// on other platforms this call is noop and will return APR_ENOTIMPL.
697
if (Socket.optSet(serverSock, Socket.APR_TCP_DEFER_ACCEPT, 1) == Status.APR_ENOTIMPL) {
702
// Initialize SSL if needed
706
int value = SSL.SSL_PROTOCOL_ALL;
707
if ("SSLv2".equalsIgnoreCase(SSLProtocol)) {
708
value = SSL.SSL_PROTOCOL_SSLV2;
709
} else if ("SSLv3".equalsIgnoreCase(SSLProtocol)) {
710
value = SSL.SSL_PROTOCOL_SSLV3;
711
} else if ("TLSv1".equalsIgnoreCase(SSLProtocol)) {
712
value = SSL.SSL_PROTOCOL_TLSV1;
713
} else if ("SSLv2+SSLv3".equalsIgnoreCase(SSLProtocol)) {
714
value = SSL.SSL_PROTOCOL_SSLV2 | SSL.SSL_PROTOCOL_SSLV3;
716
// Create SSL Context
717
sslContext = SSLContext.make(rootPool, value, SSL.SSL_MODE_SERVER);
718
// List the ciphers that the client is permitted to negotiate
719
SSLContext.setCipherSuite(sslContext, SSLCipherSuite);
720
// Load Server key and certificate
721
SSLContext.setCertificate(sslContext, SSLCertificateFile, SSLCertificateKeyFile, SSLPassword, SSL.SSL_AIDX_RSA);
722
// Set certificate chain file
723
SSLContext.setCertificateChainFile(sslContext, SSLCertificateChainFile, false);
724
// Support Client Certificates
725
SSLContext.setCACertificate(sslContext, SSLCACertificateFile, SSLCACertificatePath);
727
SSLContext.setCARevocation(sslContext, SSLCARevocationFile, SSLCARevocationPath);
728
// Client certificate verification
729
value = SSL.SSL_CVERIFY_NONE;
730
if ("optional".equalsIgnoreCase(SSLVerifyClient)) {
731
value = SSL.SSL_CVERIFY_OPTIONAL;
732
} else if ("require".equalsIgnoreCase(SSLVerifyClient)) {
733
value = SSL.SSL_CVERIFY_REQUIRE;
734
} else if ("optionalNoCA".equalsIgnoreCase(SSLVerifyClient)) {
735
value = SSL.SSL_CVERIFY_OPTIONAL_NO_CA;
737
SSLContext.setVerify(sslContext, value, SSLVerifyDepth);
738
// For now, sendfile is not supported with SSL
748
* Start the APR endpoint, creating acceptor, poller and sendfile threads.
752
// Initialize socket if not done before
760
// Create worker collection
761
if (executor == null) {
762
workers = new WorkerStack(maxThreads);
765
// Start poller threads
766
pollers = new Poller[pollerThreadCount];
767
for (int i = 0; i < pollerThreadCount; i++) {
768
pollers[i] = new Poller(false);
770
Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i);
771
pollerThread.setPriority(threadPriority);
772
pollerThread.setDaemon(true);
773
pollerThread.start();
776
// Start comet poller threads
777
cometPollers = new Poller[pollerThreadCount];
778
for (int i = 0; i < pollerThreadCount; i++) {
779
cometPollers[i] = new Poller(true);
780
cometPollers[i].init();
781
Thread pollerThread = new Thread(cometPollers[i], getName() + "-CometPoller-" + i);
782
pollerThread.setPriority(threadPriority);
783
pollerThread.setDaemon(true);
784
pollerThread.start();
787
// Start sendfile threads
789
sendfiles = new Sendfile[sendfileThreadCount];
790
for (int i = 0; i < sendfileThreadCount; i++) {
791
sendfiles[i] = new Sendfile();
793
Thread sendfileThread = new Thread(sendfiles[i], getName() + "-Sendfile-" + i);
794
sendfileThread.setPriority(threadPriority);
795
sendfileThread.setDaemon(true);
796
sendfileThread.start();
800
// Start acceptor threads
801
for (int i = 0; i < acceptorThreadCount; i++) {
802
Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i);
803
acceptorThread.setPriority(threadPriority);
804
acceptorThread.setDaemon(daemon);
805
acceptorThread.start();
813
* Pause the endpoint, which will make it stop accepting new sockets.
815
public void pause() {
816
if (running && !paused) {
824
* Resume the endpoint, which will make it start accepting new sockets
827
public void resume() {
835
* Stop the endpoint. This will cause all processing threads to stop.
841
for (int i = 0; i < pollers.length; i++) {
842
pollers[i].destroy();
845
for (int i = 0; i < cometPollers.length; i++) {
846
cometPollers[i].destroy();
850
for (int i = 0; i < sendfiles.length; i++) {
851
sendfiles[i].destroy();
860
* Deallocate APR memory pools, and close server socket.
862
public void destroy() throws Exception {
866
Pool.destroy(serverSockPool);
868
// Close server socket
869
Socket.close(serverSock);
872
// Close all APR memory pools and resources
873
Pool.destroy(rootPool);
879
// ------------------------------------------------------ Protected Methods
883
* Get a sequence number used for thread naming.
885
protected int getSequence() {
891
* Unlock the server socket accept using a bugus connection.
893
protected void unlockAccept() {
894
java.net.Socket s = null;
895
InetSocketAddress saddr = null;
897
// Need to create a connection to unlock the accept();
898
if (address == null) {
899
saddr = new InetSocketAddress("localhost", port);
901
saddr = new InetSocketAddress(address,port);
903
s = new java.net.Socket();
904
s.setSoTimeout(soTimeout);
905
s.setSoLinger(true ,0);
906
if (log.isDebugEnabled()) {
907
log.debug("About to unlock socket for: " + saddr);
909
s.connect(saddr, unlockTimeout);
911
* In the case of a deferred accept / accept filters we need to
912
* send data to wake up the accept. Send OPTIONS * to bypass even
913
* BSD accept filters. The Acceptor will discard it.
916
OutputStreamWriter sw;
918
sw = new OutputStreamWriter(s.getOutputStream(), "ISO-8859-1");
919
sw.write("OPTIONS * HTTP/1.0\r\n"
920
+ "User-Agent: Tomcat wakeup connection\r\n\r\n");
923
} catch(Exception e) {
924
if (log.isDebugEnabled()) {
925
log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);
931
} catch (Exception e) {
940
* Process the specified connection.
942
protected boolean setSocketOptions(long socket) {
943
// Process the connection
947
// 1: Set socket options: timeout, linger, etc
949
Socket.optSet(socket, Socket.APR_SO_LINGER, soLinger);
951
Socket.optSet(socket, Socket.APR_TCP_NODELAY, (tcpNoDelay ? 1 : 0));
953
Socket.timeoutSet(socket, soTimeout * 1000);
957
if (sslContext != 0) {
958
SSLSocket.attach(sslContext, socket);
959
if (SSLSocket.handshake(socket) != 0) {
960
if (log.isDebugEnabled()) {
961
log.debug(sm.getString("endpoint.err.handshake") + ": " + SSL.getLastError());
967
} catch (Throwable t) {
968
if (log.isDebugEnabled()) {
970
log.debug(sm.getString("endpoint.err.handshake"), t);
972
log.debug(sm.getString("endpoint.err.unexpected"), t);
975
// Tell to close the socket
983
* Create (or allocate) and return an available processor for use in
984
* processing a specific HTTP request, if possible. If the maximum
985
* allowed processors have already been created and are in use, return
986
* <code>null</code> instead.
988
protected Worker createWorkerThread() {
990
synchronized (workers) {
991
if (workers.size() > 0) {
993
return (workers.pop());
995
if ((maxThreads > 0) && (curThreads < maxThreads)) {
997
if (curThreadsBusy == maxThreads) {
998
log.info(sm.getString("endpoint.info.maxThreads",
999
Integer.toString(maxThreads), address,
1000
Integer.toString(port)));
1002
return (newWorkerThread());
1004
if (maxThreads < 0) {
1006
return (newWorkerThread());
1017
* Create and return a new processor suitable for processing HTTP
1018
* requests and returning the corresponding responses.
1020
protected Worker newWorkerThread() {
1022
Worker workerThread = new Worker();
1023
workerThread.start();
1024
return (workerThread);
1030
* Return a new worker thread, and block while to worker is available.
1032
protected Worker getWorkerThread() {
1033
// Allocate a new worker thread
1034
synchronized (workers) {
1035
Worker workerThread;
1036
while ((workerThread = createWorkerThread()) == null) {
1039
} catch (InterruptedException e) {
1043
return workerThread;
1049
* Recycle the specified Processor so that it can be used again.
1051
* @param workerThread The processor to be recycled
1053
protected void recycleWorkerThread(Worker workerThread) {
1054
synchronized (workers) {
1055
workers.push(workerThread);
1063
* Allocate a new poller of the specified size.
1065
protected long allocatePoller(int size, long pool, int timeout) {
1067
return Poll.create(size, pool, 0, timeout * 1000);
1069
if (Status.APR_STATUS_IS_EINVAL(e.getError())) {
1070
log.info(sm.getString("endpoint.poll.limitedpollsize", "" + size));
1073
log.error(sm.getString("endpoint.poll.initfail"), e);
1081
* Process given socket.
1083
protected boolean processSocketWithOptions(long socket) {
1085
if (executor == null) {
1086
getWorkerThread().assignWithOptions(socket);
1088
executor.execute(new SocketWithOptionsProcessor(socket));
1090
} catch (Throwable t) {
1091
// This means we got an OOM or similar creating a thread, or that
1092
// the pool and its queue are full
1093
log.error(sm.getString("endpoint.process.fail"), t);
1101
* Process given socket.
1103
protected boolean processSocket(long socket) {
1105
if (executor == null) {
1106
getWorkerThread().assign(socket);
1108
executor.execute(new SocketProcessor(socket));
1110
} catch (Throwable t) {
1111
// This means we got an OOM or similar creating a thread, or that
1112
// the pool and its queue are full
1113
log.error(sm.getString("endpoint.process.fail"), t);
1121
* Process given socket for an event.
1123
protected boolean processSocket(long socket, SocketStatus status) {
1125
if (executor == null) {
1126
getWorkerThread().assign(socket, status);
1128
executor.execute(new SocketEventProcessor(socket, status));
1130
} catch (Throwable t) {
1131
// This means we got an OOM or similar creating a thread, or that
1132
// the pool and its queue are full
1133
log.error(sm.getString("endpoint.process.fail"), t);
1140
// --------------------------------------------------- Acceptor Inner Class
1144
* Server socket acceptor thread.
1146
protected class Acceptor implements Runnable {
1150
* The background thread that listens for incoming TCP/IP connections and
1151
* hands them off to an appropriate processor.
1155
// Loop until we receive a shutdown command
1158
// Loop if endpoint is paused
1162
} catch (InterruptedException e) {
1168
// Accept the next incoming connection from the server socket
1169
long socket = Socket.accept(serverSock);
1171
* In the case of a deferred accept unlockAccept needs to
1172
* send data. This data will be rubbish, so destroy the
1173
* socket and don't process it.
1175
if (deferAccept && (paused || !running)) {
1176
Socket.destroy(socket);
1179
// Hand this socket off to an appropriate processor
1180
if (!processSocketWithOptions(socket)) {
1181
// Close socket and pool right away
1182
Socket.destroy(socket);
1184
} catch (Throwable t) {
1185
if (running) log.error(sm.getString("endpoint.accept.fail"), t);
1188
// The processor will recycle itself when it finishes
1197
// ----------------------------------------------------- Poller Inner Class
1203
public class Poller implements Runnable {
1205
protected long serverPollset = 0;
1206
protected long pool = 0;
1207
protected long[] desc;
1209
protected long[] addS;
1210
protected volatile int addCount = 0;
1212
protected boolean comet = true;
1214
protected volatile int keepAliveCount = 0;
1215
public int getKeepAliveCount() { return keepAliveCount; }
1217
public Poller(boolean comet) {
1222
* Create the poller. With some versions of APR, the maximum poller size will
1223
* be 62 (recompiling APR is necessary to remove this limitation).
1225
protected void init() {
1226
pool = Pool.create(serverSockPool);
1227
int size = pollerSize / pollerThreadCount;
1228
int timeout = keepAliveTimeout;
1230
timeout = soTimeout;
1232
serverPollset = allocatePoller(size, pool, timeout);
1233
if (serverPollset == 0 && size > 1024) {
1235
serverPollset = allocatePoller(size, pool, timeout);
1237
if (serverPollset == 0) {
1239
serverPollset = allocatePoller(size, pool, timeout);
1241
desc = new long[size * 2];
1243
addS = new long[size];
1248
* Destroy the poller.
1250
protected void destroy() {
1251
// Wait for polltime before doing anything, so that the poller threads
1252
// exit, otherwise parallel descturction of sockets which are still
1253
// in the poller can cause problems
1255
synchronized (this) {
1256
this.wait(pollTime / 1000);
1258
} catch (InterruptedException e) {
1261
// Close all sockets in the add queue
1262
for (int i = 0; i < addCount; i++) {
1264
processSocket(addS[i], SocketStatus.STOP);
1266
Socket.destroy(addS[i]);
1269
// Close all sockets still in the poller
1270
int rv = Poll.pollset(serverPollset, desc);
1272
for (int n = 0; n < rv; n++) {
1274
processSocket(desc[n*2+1], SocketStatus.STOP);
1276
Socket.destroy(desc[n*2+1]);
1286
* Add specified socket and associated pool to the poller. The socket will
1287
* be added to a temporary array, and polled first after a maximum amount
1288
* of time equal to pollTime (in most cases, latency will be much lower,
1291
* @param socket to add to the poller
1293
public void add(long socket) {
1294
synchronized (this) {
1295
// Add socket to the list. Newly added sockets will wait
1296
// at most for pollTime before being polled
1297
if (addCount >= addS.length) {
1298
// Can't do anything: close the socket right away
1300
processSocket(socket, SocketStatus.ERROR);
1302
Socket.destroy(socket);
1306
addS[addCount] = socket;
1313
* The background thread that listens for incoming TCP/IP connections and
1314
* hands them off to an appropriate processor.
1318
long maintainTime = 0;
1319
// Loop until we receive a shutdown command
1321
// Loop if endpoint is paused
1325
} catch (InterruptedException e) {
1330
if (keepAliveCount < 1 && addCount < 1) {
1331
synchronized (this) {
1332
while (keepAliveCount < 1 && addCount < 1) {
1333
// Reset maintain time.
1337
} catch (InterruptedException e) {
1345
// Add sockets which are waiting to the poller
1347
synchronized (this) {
1348
int successCount = 0;
1350
for (int i = (addCount - 1); i >= 0; i--) {
1352
(serverPollset, addS[i], Poll.APR_POLLIN);
1353
if (rv == Status.APR_SUCCESS) {
1356
// Can't do anything: close the socket right away
1358
processSocket(addS[i], SocketStatus.ERROR);
1360
Socket.destroy(addS[i]);
1365
keepAliveCount += successCount;
1371
maintainTime += pollTime;
1372
// Pool for the specified interval
1373
int rv = Poll.poll(serverPollset, pollTime, desc, true);
1375
keepAliveCount -= rv;
1376
for (int n = 0; n < rv; n++) {
1377
// Check for failed sockets and hand this socket off to a worker
1378
if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
1379
|| ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)
1380
|| (comet && (!processSocket(desc[n*2+1], SocketStatus.OPEN)))
1381
|| (!comet && (!processSocket(desc[n*2+1])))) {
1382
// Close socket and clear pool
1384
processSocket(desc[n*2+1], SocketStatus.DISCONNECT);
1386
Socket.destroy(desc[n*2+1]);
1391
} else if (rv < 0) {
1393
/* Any non timeup or interrupted error is critical */
1394
if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) {
1395
if (errn > Status.APR_OS_START_USERERR) {
1396
errn -= Status.APR_OS_START_USERERR;
1398
log.error(sm.getString("endpoint.poll.fail", "" + errn, Error.strerror(errn)));
1399
// Handle poll critical failure
1400
synchronized (this) {
1407
if (soTimeout > 0 && maintainTime > 1000000L && running) {
1408
rv = Poll.maintain(serverPollset, desc, true);
1411
keepAliveCount -= rv;
1412
for (int n = 0; n < rv; n++) {
1413
// Close socket and clear pool
1415
processSocket(desc[n], SocketStatus.TIMEOUT);
1417
Socket.destroy(desc[n]);
1422
} catch (Throwable t) {
1423
log.error(sm.getString("endpoint.poll.error"), t);
1428
synchronized (this) {
1437
// ----------------------------------------------------- Worker Inner Class
1441
* Server processor class.
1443
protected class Worker implements Runnable {
1446
protected Thread thread = null;
1447
protected boolean available = false;
1448
protected long socket = 0;
1449
protected SocketStatus status = null;
1450
protected boolean options = false;
1454
* Process an incoming TCP/IP connection on the specified socket. Any
1455
* exception that occurs during processing must be logged and swallowed.
1456
* <b>NOTE</b>: This method is called from our Connector's thread. We
1457
* must assign it to our own thread so that multiple simultaneous
1458
* requests can be handled.
1460
* @param socket TCP socket to process
1462
protected synchronized void assignWithOptions(long socket) {
1464
// Wait for the Processor to get the previous Socket
1468
} catch (InterruptedException e) {
1472
// Store the newly available Socket and notify our thread
1473
this.socket = socket;
1483
* Process an incoming TCP/IP connection on the specified socket. Any
1484
* exception that occurs during processing must be logged and swallowed.
1485
* <b>NOTE</b>: This method is called from our Connector's thread. We
1486
* must assign it to our own thread so that multiple simultaneous
1487
* requests can be handled.
1489
* @param socket TCP socket to process
1491
protected synchronized void assign(long socket) {
1493
// Wait for the Processor to get the previous Socket
1497
} catch (InterruptedException e) {
1501
// Store the newly available Socket and notify our thread
1502
this.socket = socket;
1511
protected synchronized void assign(long socket, SocketStatus status) {
1513
// Wait for the Processor to get the previous Socket
1517
} catch (InterruptedException e) {
1521
// Store the newly available Socket and notify our thread
1522
this.socket = socket;
1523
this.status = status;
1532
* Await a newly assigned Socket from our Connector, or <code>null</code>
1533
* if we are supposed to shut down.
1535
protected synchronized long await() {
1537
// Wait for the Connector to provide a new Socket
1538
while (!available) {
1541
} catch (InterruptedException e) {
1545
// Notify the Connector that we have received this Socket
1546
long socket = this.socket;
1556
* The background thread that listens for incoming TCP/IP connections and
1557
* hands them off to an appropriate processor.
1561
// Process requests until we receive a shutdown signal
1564
// Wait for the next socket to be assigned
1565
long socket = await();
1569
if (!deferAccept && options) {
1570
if (setSocketOptions(socket)) {
1571
getPoller().add(socket);
1573
// Close socket and pool
1574
Socket.destroy(socket);
1579
// Process the request from this socket
1580
if ((status != null) && (handler.event(socket, status) == Handler.SocketState.CLOSED)) {
1581
// Close socket and pool
1582
Socket.destroy(socket);
1584
} else if ((status == null) && ((options && !setSocketOptions(socket))
1585
|| handler.process(socket) == Handler.SocketState.CLOSED)) {
1586
// Close socket and pool
1587
Socket.destroy(socket);
1592
// Finish up this request
1593
recycleWorkerThread(this);
1601
* Start the background processing thread.
1603
public void start() {
1604
thread = new Thread(this);
1605
thread.setName(getName() + "-" + (++curThreads));
1606
thread.setDaemon(true);
1614
// ----------------------------------------------- SendfileData Inner Class
1618
* SendfileData class.
1620
public static class SendfileData {
1622
public String fileName;
1625
// Range information
1628
// Socket and socket pool
1633
public boolean keepAlive;
1637
// --------------------------------------------------- Sendfile Inner Class
1643
public class Sendfile implements Runnable {
1645
protected long sendfilePollset = 0;
1646
protected long pool = 0;
1647
protected long[] desc;
1648
protected HashMap<Long, SendfileData> sendfileData;
1650
protected volatile int sendfileCount;
1651
public int getSendfileCount() { return sendfileCount; }
1653
protected ArrayList<SendfileData> addS;
1654
protected volatile int addCount;
1657
* Create the sendfile poller. With some versions of APR, the maximum poller size will
1658
* be 62 (reocmpiling APR is necessary to remove this limitation).
1660
protected void init() {
1661
pool = Pool.create(serverSockPool);
1662
int size = sendfileSize / sendfileThreadCount;
1663
sendfilePollset = allocatePoller(size, pool, soTimeout);
1664
if (sendfilePollset == 0 && size > 1024) {
1666
sendfilePollset = allocatePoller(size, pool, soTimeout);
1668
if (sendfilePollset == 0) {
1670
sendfilePollset = allocatePoller(size, pool, soTimeout);
1672
desc = new long[size * 2];
1673
sendfileData = new HashMap<Long, SendfileData>(size);
1674
addS = new ArrayList<SendfileData>();
1679
* Destroy the poller.
1681
protected void destroy() {
1682
// Wait for polltime before doing anything, so that the poller threads
1683
// exit, otherwise parallel descturction of sockets which are still
1684
// in the poller can cause problems
1686
synchronized (this) {
1687
this.wait(pollTime / 1000);
1689
} catch (InterruptedException e) {
1692
// Close any socket remaining in the add queue
1694
for (int i = (addS.size() - 1); i >= 0; i--) {
1695
SendfileData data = addS.get(i);
1696
Socket.destroy(data.socket);
1698
// Close all sockets still in the poller
1699
int rv = Poll.pollset(sendfilePollset, desc);
1701
for (int n = 0; n < rv; n++) {
1702
Socket.destroy(desc[n*2+1]);
1706
sendfileData.clear();
1710
* Add the sendfile data to the sendfile poller. Note that in most cases,
1711
* the initial non blocking calls to sendfile will return right away, and
1712
* will be handled asynchronously inside the kernel. As a result,
1713
* the poller will never be used.
1715
* @param data containing the reference to the data which should be snet
1716
* @return true if all the data has been sent right away, and false
1719
public boolean add(SendfileData data) {
1720
// Initialize fd from data given
1722
data.fdpool = Socket.pool(data.socket);
1724
(data.fileName, File.APR_FOPEN_READ
1725
| File.APR_FOPEN_SENDFILE_ENABLED | File.APR_FOPEN_BINARY,
1727
data.pos = data.start;
1728
// Set the socket to nonblocking mode
1729
Socket.timeoutSet(data.socket, 0);
1731
long nw = Socket.sendfilen(data.socket, data.fd,
1732
data.pos, data.end - data.pos, 0);
1734
if (!(-nw == Status.EAGAIN)) {
1735
Socket.destroy(data.socket);
1739
// Break the loop and add the socket to poller.
1743
data.pos = data.pos + nw;
1744
if (data.pos >= data.end) {
1745
// Entire file has been sent
1746
Pool.destroy(data.fdpool);
1747
// Set back socket to blocking mode
1748
Socket.timeoutSet(data.socket, soTimeout * 1000);
1753
} catch (Exception e) {
1754
log.error(sm.getString("endpoint.sendfile.error"), e);
1757
// Add socket to the list. Newly added sockets will wait
1758
// at most for pollTime before being polled
1759
synchronized (this) {
1768
* Remove socket from the poller.
1770
* @param data the sendfile data which should be removed
1772
protected void remove(SendfileData data) {
1773
int rv = Poll.remove(sendfilePollset, data.socket);
1774
if (rv == Status.APR_SUCCESS) {
1777
sendfileData.remove(new Long(data.socket));
1781
* The background thread that listens for incoming TCP/IP connections and
1782
* hands them off to an appropriate processor.
1786
long maintainTime = 0;
1787
// Loop until we receive a shutdown command
1790
// Loop if endpoint is paused
1794
} catch (InterruptedException e) {
1799
if (sendfileCount < 1 && addCount < 1) {
1800
synchronized (this) {
1801
while (sendfileCount < 1 && addS.size() < 1) {
1802
// Reset maintain time.
1806
} catch (InterruptedException e) {
1814
// Add socket to the poller
1816
synchronized (this) {
1817
int successCount = 0;
1819
for (int i = (addS.size() - 1); i >= 0; i--) {
1820
SendfileData data = addS.get(i);
1821
int rv = Poll.add(sendfilePollset, data.socket, Poll.APR_POLLOUT);
1822
if (rv == Status.APR_SUCCESS) {
1823
sendfileData.put(new Long(data.socket), data);
1826
log.warn(sm.getString("endpoint.sendfile.addfail", "" + rv, Error.strerror(rv)));
1827
// Can't do anything: close the socket right away
1828
Socket.destroy(data.socket);
1832
sendfileCount += successCount;
1839
maintainTime += pollTime;
1840
// Pool for the specified interval
1841
int rv = Poll.poll(sendfilePollset, pollTime, desc, false);
1843
for (int n = 0; n < rv; n++) {
1844
// Get the sendfile state
1845
SendfileData state =
1846
sendfileData.get(new Long(desc[n*2+1]));
1848
if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
1849
|| ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)) {
1850
// Close socket and clear pool
1852
// Destroy file descriptor pool, which should close the file
1853
// Close the socket, as the reponse would be incomplete
1854
Socket.destroy(state.socket);
1857
// Write some data using sendfile
1858
long nw = Socket.sendfilen(state.socket, state.fd,
1860
state.end - state.pos, 0);
1862
// Close socket and clear pool
1864
// Close the socket, as the reponse would be incomplete
1865
// This will close the file too.
1866
Socket.destroy(state.socket);
1870
state.pos = state.pos + nw;
1871
if (state.pos >= state.end) {
1873
if (state.keepAlive) {
1874
// Destroy file descriptor pool, which should close the file
1875
Pool.destroy(state.fdpool);
1876
Socket.timeoutSet(state.socket, soTimeout * 1000);
1877
// If all done put the socket back in the poller for
1878
// processing of further requests
1879
getPoller().add(state.socket);
1881
// Close the socket since this is
1882
// the end of not keep-alive request.
1883
Socket.destroy(state.socket);
1887
} else if (rv < 0) {
1889
/* Any non timeup or interrupted error is critical */
1890
if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) {
1891
if (errn > Status.APR_OS_START_USERERR) {
1892
errn -= Status.APR_OS_START_USERERR;
1894
log.error(sm.getString("endpoint.poll.fail", "" + errn, Error.strerror(errn)));
1895
// Handle poll critical failure
1896
synchronized (this) {
1903
// Call maintain for the sendfile poller
1904
if (soTimeout > 0 && maintainTime > 1000000L && running) {
1905
rv = Poll.maintain(sendfilePollset, desc, true);
1908
for (int n = 0; n < rv; n++) {
1909
// Get the sendfile state
1910
SendfileData state = sendfileData.get(new Long(desc[n]));
1911
// Close socket and clear pool
1913
// Destroy file descriptor pool, which should close the file
1914
// Close the socket, as the response would be incomplete
1915
Socket.destroy(state.socket);
1919
} catch (Throwable t) {
1920
log.error(sm.getString("endpoint.poll.error"), t);
1924
synchronized (this) {
1933
// ------------------------------------------------ Handler Inner Interface
1937
* Bare bones interface used for socket processing. Per thread data is to be
1938
* stored in the ThreadWithAttributes extra folders, or alternately in
1939
* thread local fields.
1941
public interface Handler {
1942
public enum SocketState {
1945
public SocketState process(long socket);
1946
public SocketState event(long socket, SocketStatus status);
1950
// ------------------------------------------------- WorkerStack Inner Class
1953
public class WorkerStack {
1955
protected Worker[] workers = null;
1956
protected int end = 0;
1958
public WorkerStack(int size) {
1959
workers = new Worker[size];
1963
* Put the object into the queue. If the queue is full (for example if
1964
* the queue has been reduced in size) the object will be dropped.
1966
* @param object the object to be appended to the queue (first
1969
public void push(Worker worker) {
1970
if (end < workers.length) {
1971
workers[end++] = worker;
1978
* Get the first object out of the queue. Return null if the queue
1981
public Worker pop() {
1983
return workers[--end];
1989
* Get the first object out of the queue, Return null if the queue
1992
public Worker peek() {
1993
return workers[end];
1997
* Is the queue empty?
1999
public boolean isEmpty() {
2004
* How many elements are there in this queue?
2011
* Resize the queue. If there are too many objects in the queue for the
2012
* new size, drop the excess.
2016
public void resize(int newSize) {
2017
Worker[] newWorkers = new Worker[newSize];
2018
int len = workers.length;
2019
if (newSize < len) {
2022
System.arraycopy(workers, 0, newWorkers, 0, len);
2023
workers = newWorkers;
2028
// ---------------------------------------------- SocketProcessor Inner Class
2032
* This class is the equivalent of the Worker, but will simply use in an
2033
* external Executor thread pool. This will also set the socket options
2034
* and do the handshake.
2036
protected class SocketWithOptionsProcessor implements Runnable {
2038
protected long socket = 0;
2040
public SocketWithOptionsProcessor(long socket) {
2041
this.socket = socket;
2047
if (setSocketOptions(socket)) {
2048
getPoller().add(socket);
2050
// Close socket and pool
2051
Socket.destroy(socket);
2055
// Process the request from this socket
2056
if (!setSocketOptions(socket)
2057
|| handler.process(socket) == Handler.SocketState.CLOSED) {
2058
// Close socket and pool
2059
Socket.destroy(socket);
2069
// ---------------------------------------------- SocketProcessor Inner Class
2073
* This class is the equivalent of the Worker, but will simply use in an
2074
* external Executor thread pool.
2076
protected class SocketProcessor implements Runnable {
2078
protected long socket = 0;
2080
public SocketProcessor(long socket) {
2081
this.socket = socket;
2086
// Process the request from this socket
2087
if (handler.process(socket) == Handler.SocketState.CLOSED) {
2088
// Close socket and pool
2089
Socket.destroy(socket);
2098
// --------------------------------------- SocketEventProcessor Inner Class
2102
* This class is the equivalent of the Worker, but will simply use in an
2103
* external Executor thread pool.
2105
protected class SocketEventProcessor implements Runnable {
2107
protected long socket = 0;
2108
protected SocketStatus status = null;
2110
public SocketEventProcessor(long socket, SocketStatus status) {
2111
this.socket = socket;
2112
this.status = status;
2117
// Process the request from this socket
2118
if (handler.event(socket, status) == Handler.SocketState.CLOSED) {
2119
// Close socket and pool
2120
Socket.destroy(socket);