~ubuntu-branches/ubuntu/intrepid/tomcat5.5/intrepid

« back to all changes in this revision

Viewing changes to container/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2006-09-27 11:19:17 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20060927111917-wov6fmkz3x6rsl68
Tags: 5.5.17-1ubuntu1
(Build-) depend on libmx4j-java (>= 3.0).

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright 1999,2004-2006 The Apache Software Foundation.
 
3
 * 
 
4
 * Licensed under the Apache License, Version 2.0 (the "License");
 
5
 * you may not use this file except in compliance with the License.
 
6
 * You may obtain a copy of the License at
 
7
 * 
 
8
 *      http://www.apache.org/licenses/LICENSE-2.0
 
9
 * 
 
10
 * Unless required by applicable law or agreed to in writing, software
 
11
 * distributed under the License is distributed on an "AS IS" BASIS,
 
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
13
 * See the License for the specific language governing permissions and
 
14
 * limitations under the License.
 
15
 */
 
16
package org.apache.catalina.tribes.demos;
 
17
 
 
18
import java.io.Serializable;
 
19
import java.util.Random;
 
20
 
 
21
import org.apache.catalina.tribes.ByteMessage;
 
22
import org.apache.catalina.tribes.ChannelException;
 
23
import org.apache.catalina.tribes.ChannelListener;
 
24
import org.apache.catalina.tribes.ManagedChannel;
 
25
import org.apache.catalina.tribes.Member;
 
26
import org.apache.catalina.tribes.MembershipListener;
 
27
import org.apache.catalina.tribes.io.XByteBuffer;
 
28
import org.apache.catalina.tribes.Channel;
 
29
import java.io.Externalizable;
 
30
 
 
31
 
 
32
/**
 
33
 * <p>Title: </p>
 
34
 *
 
35
 * <p>Description: </p>
 
36
 *
 
37
 * <p>Copyright: Copyright (c) 2005</p>
 
38
 *
 
39
 * <p>Company: </p>
 
40
 *
 
41
 * @author not attributable
 
42
 * @version 1.0
 
43
 */
 
44
public class LoadTest implements MembershipListener,ChannelListener, Runnable {
 
45
    protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(LoadTest.class);
 
46
    public static int size = 1020;
 
47
    public static Object mutex = new Object();
 
48
    public boolean doRun = true;
 
49
    
 
50
    public long bytesReceived = 0;
 
51
    public int  messagesReceived = 0;
 
52
    public boolean send = true;
 
53
    public boolean debug = false;
 
54
    public int msgCount = 100;
 
55
    ManagedChannel channel=null;
 
56
    public int statsInterval = 10000;
 
57
    public long pause = 0;
 
58
    public boolean breakonChannelException = false;
 
59
    public boolean async = false;
 
60
    public long receiveStart = 0;
 
61
    public int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
 
62
    
 
63
    static int messageSize = 0;
 
64
    
 
65
    public static long messagesSent = 0;
 
66
    public static long messageStartSendTime = 0;
 
67
    public static long messageEndSendTime = 0;
 
68
    public static int  threadCount = 0;
 
69
    
 
70
    public static synchronized void startTest() {
 
71
        threadCount++;
 
72
        if ( messageStartSendTime == 0 ) messageStartSendTime = System.currentTimeMillis();
 
73
    }
 
74
    
 
75
    public static synchronized void endTest() {
 
76
        threadCount--;
 
77
        if ( messageEndSendTime == 0 && threadCount==0 ) messageEndSendTime = System.currentTimeMillis();
 
78
    }
 
79
 
 
80
    
 
81
    public static synchronized long addSendStats(long count) {
 
82
        messagesSent+=count;
 
83
        return 0l;
 
84
    }    
 
85
    
 
86
    private static void printSendStats(long counter, int messageSize) {
 
87
        float cnt = (float)counter;
 
88
        float size = (float)messageSize;
 
89
        float time = (float)(System.currentTimeMillis()-messageStartSendTime) / 1000f;
 
90
        log.info("****SEND STATS-"+Thread.currentThread().getName()+"*****"+
 
91
                 "\n\tMessage count:"+counter+
 
92
                 "\n\tTotal bytes  :"+(long)(size*cnt)+
 
93
                 "\n\tTotal seconds:"+(time)+
 
94
                 "\n\tBytes/second :"+(size*cnt/time)+
 
95
                 "\n\tMBytes/second:"+(size*cnt/time/1024f/1024f));
 
96
    }
 
97
 
 
98
    
 
99
    
 
100
    public LoadTest(ManagedChannel channel, 
 
101
                    boolean send,
 
102
                    int msgCount,
 
103
                    boolean debug,
 
104
                    long pause,
 
105
                    int stats,
 
106
                    boolean breakOnEx) {
 
107
        this.channel = channel;
 
108
        this.send = send;
 
109
        this.msgCount = msgCount;
 
110
        this.debug = debug;
 
111
        this.pause = pause;
 
112
        this.statsInterval = stats;
 
113
        this.breakonChannelException = breakOnEx;
 
114
    }
 
115
    
 
116
    
 
117
    
 
118
    public void run() {
 
119
        
 
120
        long counter = 0;
 
121
        long total = 0;
 
122
        LoadMessage msg = new LoadMessage();
 
123
        int messageSize = LoadTest.messageSize;
 
124
        
 
125
        try {
 
126
            startTest();
 
127
            while (total < msgCount) {
 
128
                if (channel.getMembers().length == 0 || (!send)) {
 
129
                    synchronized (mutex) {
 
130
                        try {
 
131
                            mutex.wait();
 
132
                        } catch (InterruptedException x) {
 
133
                            log.info("Thread interrupted from wait");
 
134
                        }
 
135
                    }
 
136
                } else {
 
137
                    try {
 
138
                        msg.setMsgNr((int)++total);
 
139
                        counter++;
 
140
                        if (debug) {
 
141
                            printArray(msg.getMessage());
 
142
                        }
 
143
                        channel.send(channel.getMembers(), msg, channelOptions);
 
144
                        if ( pause > 0 ) {
 
145
                            if ( debug) System.out.println("Pausing sender for "+pause+" ms.");
 
146
                            Thread.sleep(pause);
 
147
                        }
 
148
                    } catch (ChannelException x) {
 
149
                        log.error("Unable to send message:"+x.getMessage(),x);
 
150
                        Member[] faulty = x.getFaultyMembers();
 
151
                        for (int i=0; i<faulty.length; i++ ) log.error("Faulty: "+faulty[i]);
 
152
                        --counter;
 
153
                        if ( this.breakonChannelException ) throw x;
 
154
                    }
 
155
                }
 
156
                if ( (counter % statsInterval) == 0 && (counter > 0)) {
 
157
                    //add to the global counter
 
158
                    counter = addSendStats(counter);
 
159
                    //print from the global counter
 
160
                    //printSendStats(LoadTest.messagesSent, LoadTest.messageSize, LoadTest.messageSendTime);
 
161
                    printSendStats(LoadTest.messagesSent, LoadTest.messageSize);
 
162
                    
 
163
                }
 
164
 
 
165
            }
 
166
        }catch ( Exception x ) {
 
167
            log.error("Captured error while sending:"+x.getMessage());
 
168
            if ( debug ) log.error("",x);
 
169
            printSendStats(LoadTest.messagesSent, LoadTest.messageSize);
 
170
        }
 
171
        endTest();
 
172
    }
 
173
 
 
174
    
 
175
 
 
176
    /**
 
177
     * memberAdded
 
178
     *
 
179
     * @param member Member
 
180
     * @todo Implement this org.apache.catalina.tribes.MembershipListener
 
181
     *   method
 
182
     */
 
183
    public void memberAdded(Member member) {
 
184
        log.info("Member added:"+member);
 
185
        synchronized (mutex) {
 
186
            mutex.notifyAll();
 
187
        }
 
188
    }
 
189
 
 
190
    /**
 
191
     * memberDisappeared
 
192
     *
 
193
     * @param member Member
 
194
     * @todo Implement this org.apache.catalina.tribes.MembershipListener
 
195
     *   method
 
196
     */
 
197
    public void memberDisappeared(Member member) {
 
198
        log.info("Member disappeared:"+member);
 
199
    }
 
200
    
 
201
    public boolean accept(Serializable msg, Member mbr){ 
 
202
       return (msg instanceof LoadMessage) || (msg instanceof ByteMessage);
 
203
    }
 
204
    
 
205
    public void messageReceived(Serializable msg, Member mbr){ 
 
206
        if ( receiveStart == 0 ) receiveStart = System.currentTimeMillis();
 
207
        if ( debug ) {
 
208
            if ( msg instanceof LoadMessage ) {
 
209
                printArray(((LoadMessage)msg).getMessage());
 
210
            }
 
211
        }
 
212
        
 
213
        if ( msg instanceof ByteMessage && !(msg instanceof LoadMessage)) {
 
214
            LoadMessage tmp = new LoadMessage();
 
215
            tmp.setMessage(((ByteMessage)msg).getMessage());
 
216
            msg = tmp;
 
217
            tmp = null;
 
218
        }
 
219
        
 
220
        
 
221
        bytesReceived+=((LoadMessage)msg).getMessage().length;
 
222
        messagesReceived++;
 
223
        if ( (messagesReceived%statsInterval)==0 || (messagesReceived==msgCount)) {
 
224
            float bytes = (float)(((LoadMessage)msg).getMessage().length*messagesReceived);
 
225
            float seconds = ((float)(System.currentTimeMillis()-receiveStart)) / 1000f;
 
226
            log.info("****RECEIVE STATS-"+Thread.currentThread().getName()+"*****"+
 
227
                     "\n\tMessage count :"+(long)messagesReceived+
 
228
                     "\n\tTotal bytes   :"+(long)bytes+
 
229
                     "\n\tTime since 1st:"+seconds+" seconds"+
 
230
                     "\n\tBytes/second  :"+(bytes/seconds)+
 
231
                     "\n\tMBytes/second :"+(bytes/seconds/1024f/1024f));
 
232
 
 
233
        }
 
234
    }
 
235
    
 
236
    
 
237
    public static void printArray(byte[] data) {
 
238
        System.out.print("{");
 
239
        for (int i=0; i<data.length; i++ ) {
 
240
            System.out.print(data[i]);
 
241
            System.out.print(",");
 
242
        }
 
243
        System.out.println("} size:"+data.length);
 
244
    }
 
245
 
 
246
    
 
247
    
 
248
    //public static class LoadMessage extends ByteMessage implements Serializable  {
 
249
    public static class LoadMessage extends ByteMessage  implements Serializable {
 
250
        
 
251
        public static byte[] outdata = new byte[size];
 
252
        public static Random r = new Random(System.currentTimeMillis());
 
253
        public static int getMessageSize (LoadMessage msg) {
 
254
            int messageSize = msg.getMessage().length;
 
255
            if ( ((Object)msg) instanceof ByteMessage ) return messageSize;
 
256
            try {
 
257
                messageSize  = XByteBuffer.serialize(new LoadMessage()).length;
 
258
                log.info("Average message size:" + messageSize + " bytes");
 
259
            } catch (Exception x) {
 
260
                log.error("Unable to calculate test message size.", x);
 
261
            }
 
262
            return messageSize;
 
263
        }
 
264
        
 
265
        protected byte[] message = getMessage();
 
266
        protected int nr = -1;
 
267
        static {
 
268
            r.nextBytes(outdata);
 
269
        }
 
270
        
 
271
        public LoadMessage() {
 
272
            
 
273
        }
 
274
        
 
275
        public LoadMessage(int nr) {
 
276
            this.nr = nr;
 
277
        }
 
278
        
 
279
        public int getMsgNr() {
 
280
            return XByteBuffer.toInt(getMessage(),0);
 
281
        }
 
282
        
 
283
        public void setMsgNr(int nr) {
 
284
            XByteBuffer.toBytes(nr,getMessage(),0);
 
285
        }
 
286
        
 
287
        public byte[] getMessage() {
 
288
            if ( message == null ) {
 
289
                byte[] data = new byte[size+4];
 
290
                XByteBuffer.toBytes(nr,data,0);
 
291
                System.arraycopy(outdata, 0, data, 4, outdata.length);
 
292
                this.message = data;
 
293
            }
 
294
            return message;
 
295
        }
 
296
        
 
297
        public void setMessage(byte[] data) {
 
298
            this.message = data;
 
299
        }
 
300
    }
 
301
    
 
302
    public static void usage() {
 
303
        System.out.println("Tribes Load tester.");
 
304
        System.out.println("The load tester can be used in sender or received mode or both");
 
305
        System.out.println("Usage:\n\t"+
 
306
                           "java LoadTest [options]\n\t"+
 
307
                           "Options:\n\t\t"+
 
308
                           "[-mode receive|send|both]  \n\t\t"+
 
309
                           "[-debug]  \n\t\t"+
 
310
                           "[-count messagecount]  \n\t\t"+
 
311
                           "[-stats statinterval]  \n\t\t"+
 
312
                           "[-pause nrofsecondstopausebetweensends]  \n\t\t"+
 
313
                           "[-threads numberofsenderthreads]  \n\t\t"+
 
314
                           "[-size messagesize]  \n\t\t"+
 
315
                           "[-sendoptions channeloptions]  \n\t\t"+
 
316
                           "[-break (halts execution on exception)]\n"+
 
317
                           "\tChannel options:"+
 
318
                           ChannelCreator.usage()+"\n\n"+
 
319
                           "Example:\n\t"+
 
320
                           "java LoadTest -port 4004\n\t"+
 
321
                           "java LoadTest -bind 192.168.0.45 -port 4005\n\t"+
 
322
                           "java LoadTest -bind 192.168.0.45 -port 4005 -mbind 192.168.0.45 -count 100 -stats 10\n");
 
323
    }
 
324
    
 
325
    public static void main(String[] args) throws Exception {
 
326
        boolean send = true;
 
327
        boolean debug = false;
 
328
        long pause = 0;
 
329
        int count = 1000000;
 
330
        int stats = 10000;
 
331
        boolean breakOnEx = false;
 
332
        int threads = 1;
 
333
        int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
 
334
        if ( args.length == 0 ) {
 
335
            args = new String[] {"-help"};
 
336
        }
 
337
        for (int i = 0; i < args.length; i++) {
 
338
            if ("-threads".equals(args[i])) {
 
339
                threads = Integer.parseInt(args[++i]);
 
340
            } else if ("-count".equals(args[i])) {
 
341
                count = Integer.parseInt(args[++i]);
 
342
                System.out.println("Sending "+count+" messages.");
 
343
            } else if ("-pause".equals(args[i])) {
 
344
                pause = Long.parseLong(args[++i])*1000;
 
345
            } else if ("-break".equals(args[i])) {
 
346
                breakOnEx = true;
 
347
            } else if ("-stats".equals(args[i])) {
 
348
                stats = Integer.parseInt(args[++i]);
 
349
                System.out.println("Stats every "+stats+" message");
 
350
            } else if ("-sendoptions".equals(args[i])) {
 
351
                channelOptions = Integer.parseInt(args[++i]);
 
352
                System.out.println("Setting send options to "+channelOptions);
 
353
            } else if ("-size".equals(args[i])) {
 
354
                size = Integer.parseInt(args[++i])-4;
 
355
                System.out.println("Message size will be:"+(size+4)+" bytes");
 
356
            } else if ("-mode".equals(args[i])) {
 
357
                if ( "receive".equals(args[++i]) ) send = false;
 
358
            } else if ("-debug".equals(args[i])) {
 
359
                debug = true;
 
360
            } else if ("-help".equals(args[i])) 
 
361
            {
 
362
                usage();
 
363
                System.exit(1);
 
364
            }
 
365
        }
 
366
        
 
367
        
 
368
        ManagedChannel channel = (ManagedChannel)ChannelCreator.createChannel(args);
 
369
        
 
370
        LoadTest test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
 
371
        test.channelOptions = channelOptions;
 
372
        LoadMessage msg = new LoadMessage();
 
373
        
 
374
        messageSize = LoadMessage.getMessageSize(msg);
 
375
        channel.addChannelListener(test);
 
376
        channel.addMembershipListener(test);
 
377
        channel.start(channel.DEFAULT);
 
378
        Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
 
379
        while ( threads > 1 ) {
 
380
            Thread t = new Thread(test);
 
381
            t.setDaemon(true);
 
382
            t.start();
 
383
            threads--;
 
384
            test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
 
385
        }
 
386
        test.run();
 
387
        
 
388
        System.out.println("System test complete, sleeping to let threads finish.");
 
389
        Thread.sleep(60*1000*60);
 
390
    } 
 
391
    
 
392
    public static class Shutdown extends Thread {
 
393
        ManagedChannel channel = null;
 
394
        public Shutdown(ManagedChannel channel) {
 
395
            this.channel = channel;
 
396
        }
 
397
        
 
398
        public void run() {
 
399
            System.out.println("Shutting down...");
 
400
            SystemExit exit = new SystemExit(5000);
 
401
            exit.setDaemon(true);
 
402
            exit.start();
 
403
            try {
 
404
                channel.stop(channel.DEFAULT);
 
405
                
 
406
            }catch ( Exception x ) {
 
407
                x.printStackTrace();
 
408
            }
 
409
            System.out.println("Channel stopped.");
 
410
        }
 
411
    }
 
412
    public static class SystemExit extends Thread {
 
413
        private long delay;
 
414
        public SystemExit(long delay) {
 
415
            this.delay = delay;
 
416
        }
 
417
        public void run () {
 
418
            try {
 
419
                Thread.sleep(delay);
 
420
            }catch ( Exception x ) {
 
421
                x.printStackTrace();
 
422
            }
 
423
            System.exit(0);
 
424
 
 
425
        }
 
426
    }
 
427
    
 
428
}