2
* Copyright 1999,2004-2006 The Apache Software Foundation.
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
* you may not use this file except in compliance with the License.
6
* You may obtain a copy of the License at
8
* http://www.apache.org/licenses/LICENSE-2.0
10
* Unless required by applicable law or agreed to in writing, software
11
* distributed under the License is distributed on an "AS IS" BASIS,
12
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
* See the License for the specific language governing permissions and
14
* limitations under the License.
16
package org.apache.catalina.tribes.demos;
18
import java.io.Serializable;
19
import java.util.Random;
21
import org.apache.catalina.tribes.ByteMessage;
22
import org.apache.catalina.tribes.ChannelException;
23
import org.apache.catalina.tribes.ChannelListener;
24
import org.apache.catalina.tribes.ManagedChannel;
25
import org.apache.catalina.tribes.Member;
26
import org.apache.catalina.tribes.MembershipListener;
27
import org.apache.catalina.tribes.io.XByteBuffer;
28
import org.apache.catalina.tribes.Channel;
29
import java.io.Externalizable;
35
* <p>Description: </p>
37
* <p>Copyright: Copyright (c) 2005</p>
41
* @author not attributable
44
public class LoadTest implements MembershipListener,ChannelListener, Runnable {
45
protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(LoadTest.class);
46
public static int size = 1020;
47
public static Object mutex = new Object();
48
public boolean doRun = true;
50
public long bytesReceived = 0;
51
public int messagesReceived = 0;
52
public boolean send = true;
53
public boolean debug = false;
54
public int msgCount = 100;
55
ManagedChannel channel=null;
56
public int statsInterval = 10000;
57
public long pause = 0;
58
public boolean breakonChannelException = false;
59
public boolean async = false;
60
public long receiveStart = 0;
61
public int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
63
static int messageSize = 0;
65
public static long messagesSent = 0;
66
public static long messageStartSendTime = 0;
67
public static long messageEndSendTime = 0;
68
public static int threadCount = 0;
70
public static synchronized void startTest() {
72
if ( messageStartSendTime == 0 ) messageStartSendTime = System.currentTimeMillis();
75
public static synchronized void endTest() {
77
if ( messageEndSendTime == 0 && threadCount==0 ) messageEndSendTime = System.currentTimeMillis();
81
public static synchronized long addSendStats(long count) {
86
private static void printSendStats(long counter, int messageSize) {
87
float cnt = (float)counter;
88
float size = (float)messageSize;
89
float time = (float)(System.currentTimeMillis()-messageStartSendTime) / 1000f;
90
log.info("****SEND STATS-"+Thread.currentThread().getName()+"*****"+
91
"\n\tMessage count:"+counter+
92
"\n\tTotal bytes :"+(long)(size*cnt)+
93
"\n\tTotal seconds:"+(time)+
94
"\n\tBytes/second :"+(size*cnt/time)+
95
"\n\tMBytes/second:"+(size*cnt/time/1024f/1024f));
100
public LoadTest(ManagedChannel channel,
107
this.channel = channel;
109
this.msgCount = msgCount;
112
this.statsInterval = stats;
113
this.breakonChannelException = breakOnEx;
122
LoadMessage msg = new LoadMessage();
123
int messageSize = LoadTest.messageSize;
127
while (total < msgCount) {
128
if (channel.getMembers().length == 0 || (!send)) {
129
synchronized (mutex) {
132
} catch (InterruptedException x) {
133
log.info("Thread interrupted from wait");
138
msg.setMsgNr((int)++total);
141
printArray(msg.getMessage());
143
channel.send(channel.getMembers(), msg, channelOptions);
145
if ( debug) System.out.println("Pausing sender for "+pause+" ms.");
148
} catch (ChannelException x) {
149
log.error("Unable to send message:"+x.getMessage(),x);
150
Member[] faulty = x.getFaultyMembers();
151
for (int i=0; i<faulty.length; i++ ) log.error("Faulty: "+faulty[i]);
153
if ( this.breakonChannelException ) throw x;
156
if ( (counter % statsInterval) == 0 && (counter > 0)) {
157
//add to the global counter
158
counter = addSendStats(counter);
159
//print from the global counter
160
//printSendStats(LoadTest.messagesSent, LoadTest.messageSize, LoadTest.messageSendTime);
161
printSendStats(LoadTest.messagesSent, LoadTest.messageSize);
166
}catch ( Exception x ) {
167
log.error("Captured error while sending:"+x.getMessage());
168
if ( debug ) log.error("",x);
169
printSendStats(LoadTest.messagesSent, LoadTest.messageSize);
179
* @param member Member
180
* @todo Implement this org.apache.catalina.tribes.MembershipListener
183
public void memberAdded(Member member) {
184
log.info("Member added:"+member);
185
synchronized (mutex) {
193
* @param member Member
194
* @todo Implement this org.apache.catalina.tribes.MembershipListener
197
public void memberDisappeared(Member member) {
198
log.info("Member disappeared:"+member);
201
public boolean accept(Serializable msg, Member mbr){
202
return (msg instanceof LoadMessage) || (msg instanceof ByteMessage);
205
public void messageReceived(Serializable msg, Member mbr){
206
if ( receiveStart == 0 ) receiveStart = System.currentTimeMillis();
208
if ( msg instanceof LoadMessage ) {
209
printArray(((LoadMessage)msg).getMessage());
213
if ( msg instanceof ByteMessage && !(msg instanceof LoadMessage)) {
214
LoadMessage tmp = new LoadMessage();
215
tmp.setMessage(((ByteMessage)msg).getMessage());
221
bytesReceived+=((LoadMessage)msg).getMessage().length;
223
if ( (messagesReceived%statsInterval)==0 || (messagesReceived==msgCount)) {
224
float bytes = (float)(((LoadMessage)msg).getMessage().length*messagesReceived);
225
float seconds = ((float)(System.currentTimeMillis()-receiveStart)) / 1000f;
226
log.info("****RECEIVE STATS-"+Thread.currentThread().getName()+"*****"+
227
"\n\tMessage count :"+(long)messagesReceived+
228
"\n\tTotal bytes :"+(long)bytes+
229
"\n\tTime since 1st:"+seconds+" seconds"+
230
"\n\tBytes/second :"+(bytes/seconds)+
231
"\n\tMBytes/second :"+(bytes/seconds/1024f/1024f));
237
public static void printArray(byte[] data) {
238
System.out.print("{");
239
for (int i=0; i<data.length; i++ ) {
240
System.out.print(data[i]);
241
System.out.print(",");
243
System.out.println("} size:"+data.length);
248
//public static class LoadMessage extends ByteMessage implements Serializable {
249
public static class LoadMessage extends ByteMessage implements Serializable {
251
public static byte[] outdata = new byte[size];
252
public static Random r = new Random(System.currentTimeMillis());
253
public static int getMessageSize (LoadMessage msg) {
254
int messageSize = msg.getMessage().length;
255
if ( ((Object)msg) instanceof ByteMessage ) return messageSize;
257
messageSize = XByteBuffer.serialize(new LoadMessage()).length;
258
log.info("Average message size:" + messageSize + " bytes");
259
} catch (Exception x) {
260
log.error("Unable to calculate test message size.", x);
265
protected byte[] message = getMessage();
266
protected int nr = -1;
268
r.nextBytes(outdata);
271
public LoadMessage() {
275
public LoadMessage(int nr) {
279
public int getMsgNr() {
280
return XByteBuffer.toInt(getMessage(),0);
283
public void setMsgNr(int nr) {
284
XByteBuffer.toBytes(nr,getMessage(),0);
287
public byte[] getMessage() {
288
if ( message == null ) {
289
byte[] data = new byte[size+4];
290
XByteBuffer.toBytes(nr,data,0);
291
System.arraycopy(outdata, 0, data, 4, outdata.length);
297
public void setMessage(byte[] data) {
302
public static void usage() {
303
System.out.println("Tribes Load tester.");
304
System.out.println("The load tester can be used in sender or received mode or both");
305
System.out.println("Usage:\n\t"+
306
"java LoadTest [options]\n\t"+
308
"[-mode receive|send|both] \n\t\t"+
310
"[-count messagecount] \n\t\t"+
311
"[-stats statinterval] \n\t\t"+
312
"[-pause nrofsecondstopausebetweensends] \n\t\t"+
313
"[-threads numberofsenderthreads] \n\t\t"+
314
"[-size messagesize] \n\t\t"+
315
"[-sendoptions channeloptions] \n\t\t"+
316
"[-break (halts execution on exception)]\n"+
317
"\tChannel options:"+
318
ChannelCreator.usage()+"\n\n"+
320
"java LoadTest -port 4004\n\t"+
321
"java LoadTest -bind 192.168.0.45 -port 4005\n\t"+
322
"java LoadTest -bind 192.168.0.45 -port 4005 -mbind 192.168.0.45 -count 100 -stats 10\n");
325
public static void main(String[] args) throws Exception {
327
boolean debug = false;
331
boolean breakOnEx = false;
333
int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
334
if ( args.length == 0 ) {
335
args = new String[] {"-help"};
337
for (int i = 0; i < args.length; i++) {
338
if ("-threads".equals(args[i])) {
339
threads = Integer.parseInt(args[++i]);
340
} else if ("-count".equals(args[i])) {
341
count = Integer.parseInt(args[++i]);
342
System.out.println("Sending "+count+" messages.");
343
} else if ("-pause".equals(args[i])) {
344
pause = Long.parseLong(args[++i])*1000;
345
} else if ("-break".equals(args[i])) {
347
} else if ("-stats".equals(args[i])) {
348
stats = Integer.parseInt(args[++i]);
349
System.out.println("Stats every "+stats+" message");
350
} else if ("-sendoptions".equals(args[i])) {
351
channelOptions = Integer.parseInt(args[++i]);
352
System.out.println("Setting send options to "+channelOptions);
353
} else if ("-size".equals(args[i])) {
354
size = Integer.parseInt(args[++i])-4;
355
System.out.println("Message size will be:"+(size+4)+" bytes");
356
} else if ("-mode".equals(args[i])) {
357
if ( "receive".equals(args[++i]) ) send = false;
358
} else if ("-debug".equals(args[i])) {
360
} else if ("-help".equals(args[i]))
368
ManagedChannel channel = (ManagedChannel)ChannelCreator.createChannel(args);
370
LoadTest test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
371
test.channelOptions = channelOptions;
372
LoadMessage msg = new LoadMessage();
374
messageSize = LoadMessage.getMessageSize(msg);
375
channel.addChannelListener(test);
376
channel.addMembershipListener(test);
377
channel.start(channel.DEFAULT);
378
Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
379
while ( threads > 1 ) {
380
Thread t = new Thread(test);
384
test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
388
System.out.println("System test complete, sleeping to let threads finish.");
389
Thread.sleep(60*1000*60);
392
public static class Shutdown extends Thread {
393
ManagedChannel channel = null;
394
public Shutdown(ManagedChannel channel) {
395
this.channel = channel;
399
System.out.println("Shutting down...");
400
SystemExit exit = new SystemExit(5000);
401
exit.setDaemon(true);
404
channel.stop(channel.DEFAULT);
406
}catch ( Exception x ) {
409
System.out.println("Channel stopped.");
412
public static class SystemExit extends Thread {
414
public SystemExit(long delay) {
420
}catch ( Exception x ) {