2
* Copyright 1999,2006 The Apache Software Foundation.
4
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5
* use this file except in compliance with the License. You may obtain a copy of
8
* http://www.apache.org/licenses/LICENSE-2.0
10
* Unless required by applicable law or agreed to in writing, software
11
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13
* License for the specific language governing permissions and limitations under
16
package org.apache.catalina.tribes.transport;
18
import java.io.IOException;
19
import java.net.InetAddress;
20
import java.net.InetSocketAddress;
21
import java.net.ServerSocket;
23
import org.apache.catalina.tribes.ChannelMessage;
24
import org.apache.catalina.tribes.ChannelReceiver;
25
import org.apache.catalina.tribes.MessageListener;
26
import org.apache.catalina.tribes.io.ListenCallback;
27
import org.apache.commons.logging.Log;
32
* <p>Description: </p>
34
* <p>Copyright: Copyright (c) 2005</p>
38
* @author not attributable
41
public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, ThreadPool.ThreadCreator {
43
public static final int OPTION_DIRECT_BUFFER = 0x0004;
46
protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ReceiverBase.class);
48
private MessageListener listener;
49
private String host = "auto";
50
private InetAddress bind;
51
private int port = 4000;
52
private int rxBufSize = 43800;
53
private int txBufSize = 25188;
54
private boolean listen = false;
55
private ThreadPool pool;
56
private boolean direct = true;
57
private long tcpSelectorTimeout = 100;
58
//how many times to search for an available socket
59
private int autoBind = 10;
60
private int maxThreads = 25;
61
private int minThreads = 6;
62
private boolean tcpNoDelay = true;
63
private boolean soKeepAlive = false;
64
private boolean ooBInline = true;
65
private boolean soReuseAddress = true;
66
private boolean soLingerOn = true;
67
private int soLingerTime = 3;
68
private int soTrafficClass = 0x04 | 0x08 | 0x010;
69
private int timeout = 15000; //15 seconds
72
public ReceiverBase() {
78
* @return MessageListener
79
* @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
81
public MessageListener getMessageListener() {
88
* @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
90
public int getPort() {
94
public int getRxBufSize() {
98
public int getTxBufSize() {
103
* @deprecated use getMinThreads()/getMaxThreads()
106
public int getTcpThreadCount() {
107
return getMinThreads();
113
* @param listener MessageListener
114
* @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
116
public void setMessageListener(MessageListener listener) {
117
this.listener = listener;
120
public void setTcpListenPort(int tcpListenPort) {
121
this.port = tcpListenPort;
124
public void setTcpListenAddress(String tcpListenHost) {
125
this.host = tcpListenHost;
128
public void setRxBufSize(int rxBufSize) {
129
this.rxBufSize = rxBufSize;
132
public void setTxBufSize(int txBufSize) {
133
this.txBufSize = txBufSize;
136
public void setTcpThreadCount(int tcpThreadCount) {
137
setMinThreads(tcpThreadCount);
141
* @return Returns the bind.
143
public InetAddress getBind() {
146
if ("auto".equals(host)) {
147
host = java.net.InetAddress.getLocalHost().getHostAddress();
149
if (log.isDebugEnabled())
150
log.debug("Starting replication listener on address:"+ host);
151
bind = java.net.InetAddress.getByName(host);
152
} catch (IOException ioe) {
153
log.error("Failed bind replication listener on address:"+ host, ioe);
160
* recursive bind to find the next available port
161
* @param socket ServerSocket
162
* @param portstart int
165
* @throws IOException
167
protected int bind(ServerSocket socket, int portstart, int retries) throws IOException {
168
while ( retries > 0 ) {
170
InetSocketAddress addr = new InetSocketAddress(getBind(), portstart);
172
setTcpListenPort(portstart);
173
log.info("Receiver Server Socket bound to:"+addr);
175
}catch ( IOException x) {
177
if ( retries <= 0 ) throw x;
179
retries = bind(socket,portstart,retries);
185
public void messageDataReceived(ChannelMessage data) {
186
if ( this.listener != null ) {
187
listener.messageReceived(data);
191
public int getWorkerThreadOptions() {
193
if ( getDirect() ) options = options | OPTION_DIRECT_BUFFER;
199
* @param bind The bind to set.
201
public void setBind(java.net.InetAddress bind) {
206
public int getTcpListenPort() {
210
public boolean getDirect() {
216
public void setDirect(boolean direct) {
217
this.direct = direct;
222
public String getHost() {
227
public long getTcpSelectorTimeout() {
228
return tcpSelectorTimeout;
231
public boolean doListen() {
235
public MessageListener getListener() {
239
public ThreadPool getPool() {
243
public String getTcpListenAddress() {
247
public int getAutoBind() {
251
public int getMaxThreads() {
255
public int getMinThreads() {
259
public boolean getTcpNoDelay() {
263
public boolean getSoKeepAlive() {
267
public boolean getOoBInline() {
272
public boolean getSoLingerOn() {
276
public int getSoLingerTime() {
280
public boolean getSoReuseAddress() {
281
return soReuseAddress;
284
public int getSoTrafficClass() {
285
return soTrafficClass;
288
public int getTimeout() {
292
public void setTcpSelectorTimeout(long selTimeout) {
293
tcpSelectorTimeout = selTimeout;
296
public void setListen(boolean doListen) {
297
this.listen = doListen;
300
public void setHost(String host) {
304
public void setListener(MessageListener listener) {
305
this.listener = listener;
308
public void setLog(Log log) {
312
public void setPool(ThreadPool pool) {
316
public void setPort(int port) {
320
public void setAutoBind(int autoBind) {
321
this.autoBind = autoBind;
322
if ( this.autoBind <= 0 ) this.autoBind = 1;
325
public void setMaxThreads(int maxThreads) {
326
this.maxThreads = maxThreads;
329
public void setMinThreads(int minThreads) {
330
this.minThreads = minThreads;
333
public void setTcpNoDelay(boolean tcpNoDelay) {
334
this.tcpNoDelay = tcpNoDelay;
337
public void setSoKeepAlive(boolean soKeepAlive) {
338
this.soKeepAlive = soKeepAlive;
341
public void setOoBInline(boolean ooBInline) {
342
this.ooBInline = ooBInline;
346
public void setSoLingerOn(boolean soLingerOn) {
347
this.soLingerOn = soLingerOn;
350
public void setSoLingerTime(int soLingerTime) {
351
this.soLingerTime = soLingerTime;
354
public void setSoReuseAddress(boolean soReuseAddress) {
355
this.soReuseAddress = soReuseAddress;
358
public void setSoTrafficClass(int soTrafficClass) {
359
this.soTrafficClass = soTrafficClass;
362
public void setTimeout(int timeout) {
363
this.timeout = timeout;