~ubuntu-branches/ubuntu/raring/libjboss-remoting-java/raring

« back to all changes in this revision

Viewing changes to tests/org/jboss/test/remoting/callback/pull/blocking/BlockingPullCallbackTestCase.java

  • Committer: Package Import Robot
  • Author(s): Torsten Werner
  • Date: 2011-09-09 14:01:03 UTC
  • mto: This revision was merged to the branch mainline in revision 9.
  • Revision ID: package-import@ubuntu.com-20110909140103-o8ucrolqt5g25k57
Tags: upstream-2.5.3.SP1
ImportĀ upstreamĀ versionĀ 2.5.3.SP1

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*
2
 
* JBoss, Home of Professional Open Source
3
 
* Copyright 2005, JBoss Inc., and individual contributors as indicated
4
 
* by the @authors tag. See the copyright.txt in the distribution for a
5
 
* full listing of individual contributors.
6
 
*
7
 
* This is free software; you can redistribute it and/or modify it
8
 
* under the terms of the GNU Lesser General Public License as
9
 
* published by the Free Software Foundation; either version 2.1 of
10
 
* the License, or (at your option) any later version.
11
 
*
12
 
* This software is distributed in the hope that it will be useful,
13
 
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
 
* Lesser General Public License for more details.
16
 
*
17
 
* You should have received a copy of the GNU Lesser General Public
18
 
* License along with this software; if not, write to the Free
19
 
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20
 
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21
 
*/
22
 
package org.jboss.test.remoting.callback.pull.blocking;
23
 
 
24
 
import java.lang.reflect.Field;
25
 
import java.net.InetAddress;
26
 
import java.util.ArrayList;
27
 
import java.util.HashMap;
28
 
import java.util.HashSet;
29
 
import java.util.Iterator;
30
 
import java.util.List;
31
 
import java.util.Map;
32
 
import java.util.Set;
33
 
 
34
 
import javax.management.MBeanServer;
35
 
 
36
 
import junit.framework.TestCase;
37
 
 
38
 
import org.apache.log4j.ConsoleAppender;
39
 
import org.apache.log4j.Level;
40
 
import org.apache.log4j.Logger;
41
 
import org.apache.log4j.PatternLayout;
42
 
import org.jboss.remoting.Client;
43
 
import org.jboss.remoting.InvocationRequest;
44
 
import org.jboss.remoting.InvokerLocator;
45
 
import org.jboss.remoting.ServerInvocationHandler;
46
 
import org.jboss.remoting.ServerInvoker;
47
 
import org.jboss.remoting.callback.Callback;
48
 
import org.jboss.remoting.callback.CallbackPoller;
49
 
import org.jboss.remoting.callback.HandleCallbackException;
50
 
import org.jboss.remoting.callback.InvokerCallbackHandler;
51
 
import org.jboss.remoting.transport.Connector;
52
 
import org.jboss.remoting.transport.PortUtil;
53
 
 
54
 
 
55
 
/** 
56
 
 * Tests the blocking mode for pull callbacks.
57
 
 * 
58
 
 * See JBREM-641.
59
 
 * 
60
 
 * @author <a href="ron.sigal@jboss.com">Ron Sigal</a>
61
 
 * @version $Revision: 3015 $
62
 
 * <p>
63
 
 * Copyright May 2, 2007
64
 
 * </p>
65
 
 */
66
 
public class BlockingPullCallbackTestCase extends TestCase
67
 
{
68
 
   public static int port;
69
 
   
70
 
   private static Logger log = Logger.getLogger(BlockingPullCallbackTestCase.class);
71
 
   
72
 
   private static final String INVOCATION_TEST =     "invocationTest";
73
 
   private static final String CALLBACK_TEST =       "callbackTest";
74
 
   private static final String COUNTER =             "counter";
75
 
   private static final String CALLBACK_DELAY_KEY =  "callbackDelay";
76
 
   
77
 
   private static boolean firstTime = true;
78
 
   
79
 
   // remoting server connector
80
 
   private Connector connector;
81
 
   private InvokerLocator serverLocator;
82
 
   private SampleInvocationHandler invocationHandler;
83
 
 
84
 
   
85
 
   /**
86
 
    * Sets up target remoting server.
87
 
    */
88
 
   public void setUp() throws Exception
89
 
   {
90
 
      if (firstTime)
91
 
      {
92
 
         firstTime = false;
93
 
         Logger.getLogger("org.jboss.remoting").setLevel(Level.INFO);
94
 
         Logger.getLogger("org.jboss.test.remoting").setLevel(Level.INFO);
95
 
         String pattern = "[%d{ABSOLUTE}] [%t] %5p (%F:%L) - %m%n";
96
 
         PatternLayout layout = new PatternLayout(pattern);
97
 
         ConsoleAppender consoleAppender = new ConsoleAppender(layout);
98
 
         Logger.getRootLogger().addAppender(consoleAppender);  
99
 
      }
100
 
   }
101
 
 
102
 
   
103
 
   /**
104
 
    * Shuts down the server
105
 
    */
106
 
   public void tearDown()
107
 
   {
108
 
      if (connector != null)
109
 
      {
110
 
         connector.stop();
111
 
         connector.destroy();
112
 
      }
113
 
   }
114
 
   
115
 
   
116
 
   /**
117
 
    * Tests blocking and nonblocking direct calls to Client.getCallbacks().
118
 
    */
119
 
   public void testBlockingPullCallback() throws Throwable
120
 
   {
121
 
      log.info("entering " + getName());
122
 
      int CALLBACK_DELAY = 5000;
123
 
      
124
 
      // Start server.
125
 
      String host = InetAddress.getLocalHost().getHostAddress();
126
 
      port = PortUtil.findFreePort(host);
127
 
      String locatorURI = getTransport() + "://" + host + ":" + port; 
128
 
      serverLocator = new InvokerLocator(locatorURI);
129
 
      log.info("Starting remoting server with locator uri of: " + locatorURI);
130
 
      HashMap config = new HashMap();
131
 
      config.put(InvokerLocator.FORCE_REMOTE, "true");
132
 
      addExtraServerConfig(config);
133
 
      connector = new Connector(serverLocator, config);
134
 
      connector.create();
135
 
      invocationHandler = new SampleInvocationHandler();
136
 
      connector.addInvocationHandler("sample", invocationHandler);
137
 
      connector.start();
138
 
      
139
 
      // Create client.
140
 
      HashMap clientConfig = new HashMap();
141
 
      clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
142
 
      addExtraClientConfig(clientConfig);
143
 
      Client client = new Client(serverLocator, clientConfig);
144
 
      client.connect();
145
 
      log.info("client is connected");
146
 
      
147
 
      SimpleCallbackHandler callbackHandler = new SimpleCallbackHandler();
148
 
      client.addListener(callbackHandler);
149
 
      log.info("client added callback handler for pull callbacks");
150
 
      
151
 
      // Test for good connection.
152
 
      Integer count = new Integer(17);
153
 
      HashMap metadata = new HashMap();
154
 
      metadata.put(COUNTER, count);
155
 
      Integer response = (Integer) client.invoke(INVOCATION_TEST, metadata);
156
 
      assertEquals(17, response.intValue());
157
 
      log.info("client.invoke(INVOCATION_TEST, metadata) successful");
158
 
 
159
 
      // Test nonblocking callbacks.
160
 
      HashMap pullerMetadata = new HashMap();
161
 
      pullerMetadata.put(CALLBACK_DELAY_KEY, Integer.toString(CALLBACK_DELAY));
162
 
      pullerMetadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.NONBLOCKING);
163
 
      CallbackPuller puller = new CallbackPuller(client, callbackHandler, pullerMetadata);
164
 
      puller.start();
165
 
      Thread.sleep(2000);
166
 
      // Should have received empty list of callbacks.
167
 
      assertTrue(puller.done);
168
 
      assertNotNull(puller.callbacks);
169
 
      assertTrue(puller.callbacks.isEmpty());
170
 
      
171
 
      // Drain stored callback.
172
 
      Thread.sleep(CALLBACK_DELAY);
173
 
      metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.NONBLOCKING);
174
 
      List callbacks = client.getCallbacks(callbackHandler, metadata);
175
 
      assertNotNull(callbacks);
176
 
      assertEquals(1, callbacks.size());
177
 
      
178
 
      // Test blocking callbacks.
179
 
      pullerMetadata.clear();
180
 
      pullerMetadata.put(CALLBACK_DELAY_KEY, Integer.toString(CALLBACK_DELAY));
181
 
      pullerMetadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCKING);
182
 
      pullerMetadata.put(ServerInvoker.BLOCKING_TIMEOUT, Integer.toString(CALLBACK_DELAY * 2));
183
 
      puller = new CallbackPuller(client, callbackHandler, pullerMetadata);
184
 
      puller.start();
185
 
      Thread.sleep(2000);
186
 
      assertFalse(puller.done);
187
 
      // Should not have returned from getCallbacks() yet.
188
 
      assertNull(puller.callbacks);
189
 
      Thread.sleep(CALLBACK_DELAY);
190
 
      assertTrue(puller.done);
191
 
      assertNotNull(puller.callbacks);
192
 
      assertEquals(1, puller.callbacks.size());
193
 
      
194
 
      client.removeListener(callbackHandler);
195
 
      client.disconnect();
196
 
   }
197
 
   
198
 
   
199
 
   /**
200
 
    * Tests configuration of blocking timeout in server configuration map.
201
 
    */
202
 
   public void testBlockingPullCallbackServerConfiguration() throws Throwable
203
 
   {
204
 
      log.info("entering " + getName());
205
 
      
206
 
      // Start server.
207
 
      String host = InetAddress.getLocalHost().getHostAddress();
208
 
      port = PortUtil.findFreePort(host);
209
 
      String locatorURI = getTransport() + "://" + host + ":" + port; 
210
 
      serverLocator = new InvokerLocator(locatorURI);
211
 
      log.info("Starting remoting server with locator uri of: " + locatorURI);
212
 
      HashMap config = new HashMap();
213
 
      config.put(InvokerLocator.FORCE_REMOTE, "true");
214
 
      
215
 
      int blockingTimeout = ServerInvoker.DEFAULT_BLOCKING_TIMEOUT + 2000;
216
 
      config.put(ServerInvoker.BLOCKING_TIMEOUT, Integer.toString(blockingTimeout));
217
 
      addExtraServerConfig(config);
218
 
      connector = new Connector(serverLocator, config);
219
 
      connector.create();
220
 
      invocationHandler = new SampleInvocationHandler();
221
 
      connector.addInvocationHandler("sample", invocationHandler);
222
 
      connector.start();
223
 
      
224
 
      // Create client.
225
 
      HashMap clientConfig = new HashMap();
226
 
      clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
227
 
      addExtraClientConfig(clientConfig);
228
 
      Client client = new Client(serverLocator, clientConfig);
229
 
      client.connect();
230
 
      log.info("client is connected");
231
 
      
232
 
      SimpleCallbackHandler callbackHandler = new SimpleCallbackHandler();
233
 
      client.addListener(callbackHandler);
234
 
      log.info("client added callback handler for pull callbacks");
235
 
      
236
 
      // Test for good connection.
237
 
      Integer count = new Integer(17);
238
 
      HashMap metadata = new HashMap();
239
 
      metadata.put(COUNTER, count);
240
 
      Integer response = (Integer) client.invoke(INVOCATION_TEST, metadata);
241
 
      assertEquals(17, response.intValue());
242
 
      log.info("client.invoke(INVOCATION_TEST, metadata) successful");
243
 
      
244
 
      // Test blocking callbacks.
245
 
      //
246
 
      // Set CALLBACK_DELAY == default blocking timeout + 1000.  If the default blocking
247
 
      // timeout were in effect, Client.getCallbacks() would time out and return without
248
 
      // getting a callback.
249
 
      int CALLBACK_DELAY = blockingTimeout - 1000;
250
 
      Map pullerMetadata = new HashMap();
251
 
      pullerMetadata.put(CALLBACK_DELAY_KEY, Integer.toString(CALLBACK_DELAY));
252
 
      pullerMetadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCKING);
253
 
      CallbackPuller puller = new CallbackPuller(client, callbackHandler, pullerMetadata);
254
 
      puller.start();
255
 
      Thread.sleep(CALLBACK_DELAY - 1000);
256
 
      
257
 
      // Callback has not been created yet, so should not have returned from getCallbacks().      
258
 
      assertFalse(puller.done);
259
 
      assertNull(puller.callbacks);
260
 
      Thread.sleep(2000);
261
 
      
262
 
      // Callback has been created, so should have returned from getCallbacks().
263
 
      assertTrue(puller.done);
264
 
      assertNotNull(puller.callbacks);
265
 
      assertEquals(1, puller.callbacks.size());
266
 
      
267
 
      client.removeListener(callbackHandler);
268
 
      client.disconnect();
269
 
   }
270
 
   
271
 
   
272
 
   /**
273
 
    * Tests configuration of blocking timeout by Client.addListener() metadata.
274
 
    * In this case, a CallbackPoller is created.
275
 
    */
276
 
   public void testBlockingCallbackPollerInitialMetadataConfiguration() throws Throwable
277
 
   {
278
 
      log.info("entering " + getName());
279
 
      
280
 
      // Start server.
281
 
      String host = InetAddress.getLocalHost().getHostAddress();
282
 
      port = PortUtil.findFreePort(host);
283
 
      String locatorURI = getTransport() + "://" + host + ":" + port; 
284
 
      serverLocator = new InvokerLocator(locatorURI);
285
 
      log.info("Starting remoting server with locator uri of: " + locatorURI);
286
 
      HashMap config = new HashMap();
287
 
      config.put(InvokerLocator.FORCE_REMOTE, "true");
288
 
      int serverBlockingTimeout = ServerInvoker.DEFAULT_BLOCKING_TIMEOUT + 2000;
289
 
      config.put(ServerInvoker.BLOCKING_TIMEOUT, Integer.toString(serverBlockingTimeout));
290
 
      addExtraServerConfig(config);
291
 
      connector = new Connector(serverLocator, config);
292
 
      connector.create();
293
 
      invocationHandler = new SampleInvocationHandler();
294
 
      connector.addInvocationHandler("sample", invocationHandler);
295
 
      connector.start();
296
 
      
297
 
      // Create client.
298
 
      HashMap clientConfig = new HashMap();
299
 
      clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
300
 
      addExtraClientConfig(clientConfig);
301
 
      Client client = new Client(serverLocator, clientConfig);
302
 
      client.connect();
303
 
      log.info("client is connected");
304
 
      
305
 
      // Test for good connection.
306
 
      Integer count = new Integer(17);
307
 
      HashMap metadata = new HashMap();
308
 
      metadata.put(COUNTER, count);
309
 
      Integer response = (Integer) client.invoke(INVOCATION_TEST, metadata);
310
 
      assertEquals(17, response.intValue());
311
 
      log.info("client.invoke(INVOCATION_TEST, metadata) successful");
312
 
      
313
 
      // Test blocking callbacks.
314
 
      //
315
 
      // Reset blocking timeout to server blocking timeout + 2000, and set
316
 
      // CALLBACK_DELAY to server blocking timeout + 1000.  If the server's blocking
317
 
      // timeout were in effect, Client.getCallbacks() would time out and return without
318
 
      // getting a callback.
319
 
      int clientBlockingTimeout = serverBlockingTimeout + 2000;
320
 
      int CALLBACK_DELAY = serverBlockingTimeout + 1000;
321
 
      metadata.clear();
322
 
      metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCKING);
323
 
      metadata.put(ServerInvoker.BLOCKING_TIMEOUT, Integer.toString(clientBlockingTimeout));
324
 
      SimpleCallbackHandler callbackHandler = new SimpleCallbackHandler();
325
 
      client.addListener(callbackHandler, metadata);
326
 
      log.info("client added callback handler for pull callbacks");
327
 
      
328
 
      // Create callback.
329
 
      CallbackCreator creator = new CallbackCreator(client, CALLBACK_DELAY);
330
 
      creator.start();
331
 
      Thread.sleep(CALLBACK_DELAY - 1000);
332
 
      
333
 
      // Callback has not been created yet, so should not have returned from getCallbacks().
334
 
      assertEquals(0, callbackHandler.callbacks.size());
335
 
      Thread.sleep(2000);
336
 
      
337
 
      // Callback has been created, so should have returned from getCallbacks().
338
 
      assertEquals(1, callbackHandler.callbacks.size());
339
 
      
340
 
      client.removeListener(callbackHandler);
341
 
      client.disconnect();
342
 
   }
343
 
   
344
 
   
345
 
   /**
346
 
    * Tests configuration of blocking timeout by Client.getCallbacks() metadata.
347
 
    */
348
 
   public void testBlockingPullCallbackInvocationMetadataConfiguration() throws Throwable
349
 
   {
350
 
      log.info("entering " + getName());
351
 
      
352
 
      // Start server.
353
 
      String host = InetAddress.getLocalHost().getHostAddress();
354
 
      port = PortUtil.findFreePort(host);
355
 
      String locatorURI = getTransport() + "://" + host + ":" + port; 
356
 
      serverLocator = new InvokerLocator(locatorURI);
357
 
      log.info("Starting remoting server with locator uri of: " + locatorURI);
358
 
      HashMap config = new HashMap();
359
 
      config.put(InvokerLocator.FORCE_REMOTE, "true");
360
 
      
361
 
      int serverBlockingTimeout = ServerInvoker.DEFAULT_BLOCKING_TIMEOUT + 2000;
362
 
      config.put(ServerInvoker.BLOCKING_TIMEOUT, Integer.toString(serverBlockingTimeout));
363
 
      addExtraServerConfig(config);
364
 
      connector = new Connector(serverLocator, config);
365
 
      connector.create();
366
 
      invocationHandler = new SampleInvocationHandler();
367
 
      connector.addInvocationHandler("sample", invocationHandler);
368
 
      connector.start();
369
 
      
370
 
      // Create client.
371
 
      HashMap clientConfig = new HashMap();
372
 
      clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
373
 
      addExtraClientConfig(clientConfig);
374
 
      Client client = new Client(serverLocator, clientConfig);
375
 
      client.connect();
376
 
      log.info("client is connected");
377
 
      
378
 
      SimpleCallbackHandler callbackHandler = new SimpleCallbackHandler();
379
 
      client.addListener(callbackHandler);
380
 
      log.info("client added callback handler for pull callbacks");
381
 
      
382
 
      // Test for good connection.
383
 
      Integer count = new Integer(17);
384
 
      HashMap metadata = new HashMap();
385
 
      metadata.put(COUNTER, count);
386
 
      Integer response = (Integer) client.invoke(INVOCATION_TEST, metadata);
387
 
      assertEquals(17, response.intValue());
388
 
      log.info("client.invoke(INVOCATION_TEST, metadata) successful");
389
 
      
390
 
      // Test blocking callbacks.
391
 
      //
392
 
      // Reset blocking timeout to server blocking timeout + 2000, and set
393
 
      // CALLBACK_DELAY to server blocking timeout + 1000.  If the server's blocking
394
 
      // timeout were in effect, Client.getCallbacks() would time out and return without
395
 
      // getting a callback.
396
 
      int clientBlockingTimeout = serverBlockingTimeout + 2000;
397
 
      int CALLBACK_DELAY = serverBlockingTimeout + 1000;
398
 
      Map pullerMetadata = new HashMap();
399
 
      pullerMetadata.put(CALLBACK_DELAY_KEY, Integer.toString(CALLBACK_DELAY));
400
 
      pullerMetadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCKING);
401
 
      pullerMetadata.put(ServerInvoker.BLOCKING_TIMEOUT, Integer.toString(clientBlockingTimeout));
402
 
      CallbackPuller puller = new CallbackPuller(client, callbackHandler, pullerMetadata);
403
 
      puller.start();
404
 
      Thread.sleep(CALLBACK_DELAY - 1000);
405
 
      
406
 
      // Callback has not been created yet, so should not have returned from getCallbacks().      
407
 
      assertFalse(puller.done);
408
 
      assertNull(puller.callbacks);
409
 
      Thread.sleep(2000);
410
 
      
411
 
      // Callback has been created, so should have returned from getCallbacks().
412
 
      assertTrue(puller.done);
413
 
      assertNotNull(puller.callbacks);
414
 
      assertEquals(1, puller.callbacks.size());
415
 
      
416
 
      client.removeListener(callbackHandler);
417
 
      client.disconnect();
418
 
   }
419
 
   
420
 
   
421
 
   /**
422
 
    * Tests CallbackPoller in nonblocking mode.
423
 
    */
424
 
   public void testNonBlockingCallbackPoller() throws Throwable
425
 
   {
426
 
      log.info("entering " + getName());
427
 
      
428
 
      // Start server.
429
 
      String host = InetAddress.getLocalHost().getHostAddress();
430
 
      port = PortUtil.findFreePort(host);
431
 
      String locatorURI = getTransport() + "://" + host + ":" + port; 
432
 
      serverLocator = new InvokerLocator(locatorURI);
433
 
      log.info("Starting remoting server with locator uri of: " + locatorURI);
434
 
      HashMap config = new HashMap();
435
 
      config.put(InvokerLocator.FORCE_REMOTE, "true");
436
 
      addExtraServerConfig(config);
437
 
      connector = new Connector(serverLocator, config);
438
 
      connector.create();
439
 
      invocationHandler = new SampleInvocationHandler();
440
 
      connector.addInvocationHandler("sample", invocationHandler);
441
 
      connector.start();
442
 
      
443
 
      // Create client.
444
 
      HashMap clientConfig = new HashMap();
445
 
      clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
446
 
      addExtraClientConfig(clientConfig);
447
 
      Client client = new Client(serverLocator, clientConfig);
448
 
      client.connect();
449
 
      log.info("client is connected");
450
 
      
451
 
      int CALLBACK_DELAY = 5000;
452
 
      SimpleCallbackHandler callbackHandler = new SimpleCallbackHandler();
453
 
      Map metadata = new HashMap();
454
 
      metadata.put(CallbackPoller.CALLBACK_POLL_PERIOD, Integer.toString(CALLBACK_DELAY));
455
 
      metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.NONBLOCKING);
456
 
      client.addListener(callbackHandler, metadata);
457
 
      log.info("client added callback handler for pull callbacks");
458
 
      
459
 
      // Test for good connection.
460
 
      Integer count = new Integer(17);
461
 
      metadata.clear();
462
 
      metadata = new HashMap();
463
 
      metadata.put(COUNTER, count);
464
 
      Integer response = (Integer) client.invoke(INVOCATION_TEST, metadata);
465
 
      assertEquals(17, response.intValue());
466
 
      log.info("client.invoke(INVOCATION_TEST, metadata) successful");
467
 
      
468
 
      // Test nonblocking behavior.
469
 
      Thread.sleep(1000);
470
 
      // Callback will be created after CallbackPoller's first poll.
471
 
      CallbackCreator creator = new CallbackCreator(client, CALLBACK_DELAY);
472
 
      creator.start();
473
 
      Thread.sleep(CALLBACK_DELAY);
474
 
      assertEquals(0, callbackHandler.callbacks.size());
475
 
      Thread.sleep(2000);
476
 
      // CallbackPoller hasn't made second poll yet.
477
 
      assertEquals(0, callbackHandler.callbacks.size());
478
 
      Thread.sleep(CALLBACK_DELAY); 
479
 
      assertEquals(1, callbackHandler.callbacks.size());
480
 
      
481
 
      client.removeListener(callbackHandler);
482
 
      client.disconnect();
483
 
   }
484
 
 
485
 
   
486
 
   /**
487
 
    * Tests CallbackPoller in blocking mode.
488
 
    */
489
 
   public void testBlockingCallbackPoller() throws Throwable
490
 
   {
491
 
      log.info("entering " + getName());
492
 
      
493
 
      // Start server.
494
 
      String host = InetAddress.getLocalHost().getHostAddress();
495
 
      port = PortUtil.findFreePort(host);
496
 
      String locatorURI = getTransport() + "://" + host + ":" + port; 
497
 
      serverLocator = new InvokerLocator(locatorURI);
498
 
      log.info("Starting remoting server with locator uri of: " + locatorURI);
499
 
      HashMap config = new HashMap();
500
 
      config.put(InvokerLocator.FORCE_REMOTE, "true");
501
 
      addExtraServerConfig(config);
502
 
      connector = new Connector(serverLocator, config);
503
 
      connector.create();
504
 
      invocationHandler = new SampleInvocationHandler();
505
 
      connector.addInvocationHandler("sample", invocationHandler);
506
 
      connector.start();
507
 
      
508
 
      // Create client.
509
 
      HashMap clientConfig = new HashMap();
510
 
      clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
511
 
      addExtraClientConfig(clientConfig);
512
 
      Client client = new Client(serverLocator, clientConfig);
513
 
      client.connect();
514
 
      log.info("client is connected");
515
 
      
516
 
      int CALLBACK_DELAY = 5000;
517
 
      SimpleCallbackHandler callbackHandler = new SimpleCallbackHandler();
518
 
      Map metadata = new HashMap();
519
 
      metadata.put(CallbackPoller.CALLBACK_POLL_PERIOD, Integer.toString(CALLBACK_DELAY));
520
 
      metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCKING);
521
 
      client.addListener(callbackHandler, metadata);
522
 
      log.info("client added callback handler for pull callbacks");
523
 
      
524
 
      // Test for good connection.
525
 
      Integer count = new Integer(17);
526
 
      metadata.clear();
527
 
      metadata = new HashMap();
528
 
      metadata.put(COUNTER, count);
529
 
      Integer response = (Integer) client.invoke(INVOCATION_TEST, metadata);
530
 
      assertEquals(17, response.intValue());
531
 
      log.info("client.invoke(INVOCATION_TEST, metadata) successful");
532
 
      
533
 
      // Test blocking behavior.
534
 
      CallbackCreator creator = new CallbackCreator(client, CALLBACK_DELAY);
535
 
      creator.start();
536
 
      assertEquals(0, callbackHandler.callbacks.size());
537
 
      Thread.sleep(CALLBACK_DELAY + 1000);
538
 
      assertEquals(1, callbackHandler.callbacks.size());
539
 
      
540
 
      client.removeListener(callbackHandler);
541
 
      client.disconnect();
542
 
   }
543
 
 
544
 
   
545
 
   /**
546
 
    * Tests blocking timeout set on server.
547
 
    */
548
 
   public void testBlockingCallbackPollerShutdownServerConfig() throws Throwable
549
 
   {
550
 
      log.info("entering " + getName());
551
 
      
552
 
      // Start server.
553
 
      String host = InetAddress.getLocalHost().getHostAddress();
554
 
      port = PortUtil.findFreePort(host);
555
 
      String locatorURI = getTransport() + "://" + host + ":" + port; 
556
 
      serverLocator = new InvokerLocator(locatorURI);
557
 
      log.info("Starting remoting server with locator uri of: " + locatorURI);
558
 
      HashMap config = new HashMap();
559
 
      config.put(InvokerLocator.FORCE_REMOTE, "true");
560
 
      config.put(ServerInvoker.BLOCKING_TIMEOUT, "4000");
561
 
      addExtraServerConfig(config);
562
 
      connector = new Connector(serverLocator, config);
563
 
      connector.create();
564
 
      invocationHandler = new SampleInvocationHandler();
565
 
      connector.addInvocationHandler("sample", invocationHandler);
566
 
      connector.start();
567
 
      
568
 
      // Create client.
569
 
      HashMap clientConfig = new HashMap();
570
 
      clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
571
 
      addExtraClientConfig(clientConfig);
572
 
      final Client client = new Client(serverLocator, clientConfig);
573
 
      client.connect();
574
 
      log.info("client is connected");
575
 
      
576
 
      // Test for good connection.
577
 
      Integer count = new Integer(17);
578
 
      Map metadata = new HashMap();
579
 
      metadata.clear();
580
 
      metadata = new HashMap();
581
 
      metadata.put(COUNTER, count);
582
 
      Integer response = (Integer) client.invoke(INVOCATION_TEST, metadata);
583
 
      assertEquals(17, response.intValue());
584
 
      log.info("client.invoke(INVOCATION_TEST, metadata) successful");
585
 
      
586
 
      // Register callback handler.
587
 
      Field field = Client.class.getDeclaredField("callbackPollers");
588
 
      field.setAccessible(true);
589
 
      Map callbackPollers = (Map) field.get(client);
590
 
      final SimpleCallbackHandler callbackHandler = new SimpleCallbackHandler();
591
 
      metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCKING);
592
 
      client.addListener(callbackHandler, metadata);
593
 
      assertEquals(1, callbackPollers.size());
594
 
      CallbackPoller callbackPoller = (CallbackPoller) callbackPollers.values().iterator().next();
595
 
      field = CallbackPoller.class.getDeclaredField("blockingPollerThread");
596
 
      field.setAccessible(true);
597
 
      Thread blockingPollerThread = (Thread) field.get(callbackPoller);
598
 
      assertNotNull(blockingPollerThread);
599
 
      log.info("client added callback handler for pull callbacks");
600
 
      
601
 
      // Test blocking timeout.
602
 
      new Thread()
603
 
      {
604
 
         public void run()
605
 
         {
606
 
            try
607
 
            {
608
 
               client.removeListener(callbackHandler);
609
 
            }
610
 
            catch (Throwable e)
611
 
            {
612
 
               e.printStackTrace();
613
 
            }
614
 
         }
615
 
      }.start();
616
 
      
617
 
      Thread.sleep(6000);
618
 
      assertFalse(blockingPollerThread.isAlive());
619
 
      
620
 
      client.removeListener(callbackHandler);
621
 
      client.disconnect();
622
 
   }
623
 
   
624
 
   
625
 
   /**
626
 
    * Tests blocking timeout set on client.
627
 
    */
628
 
   public void testBlockingCallbackPollerShutdownClientConfig() throws Throwable
629
 
   {
630
 
      log.info("entering " + getName());
631
 
      
632
 
      // Start server.
633
 
      String host = InetAddress.getLocalHost().getHostAddress();
634
 
      port = PortUtil.findFreePort(host);
635
 
      String locatorURI = getTransport() + "://" + host + ":" + port; 
636
 
      serverLocator = new InvokerLocator(locatorURI);
637
 
      log.info("Starting remoting server with locator uri of: " + locatorURI);
638
 
      HashMap config = new HashMap();
639
 
      config.put(InvokerLocator.FORCE_REMOTE, "true");
640
 
      config.put(ServerInvoker.BLOCKING_TIMEOUT, "4000");
641
 
      addExtraServerConfig(config);
642
 
      connector = new Connector(serverLocator, config);
643
 
      connector.create();
644
 
      invocationHandler = new SampleInvocationHandler();
645
 
      connector.addInvocationHandler("sample", invocationHandler);
646
 
      connector.start();
647
 
      
648
 
      // Create client.
649
 
      HashMap clientConfig = new HashMap();
650
 
      clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
651
 
      addExtraClientConfig(clientConfig);
652
 
      final Client client = new Client(serverLocator, clientConfig);
653
 
      client.connect();
654
 
      log.info("client is connected");
655
 
      
656
 
      // Test for good connection.
657
 
      Integer count = new Integer(17);
658
 
      Map metadata = new HashMap();
659
 
      metadata.clear();
660
 
      metadata = new HashMap();
661
 
      metadata.put(COUNTER, count);
662
 
      Integer response = (Integer) client.invoke(INVOCATION_TEST, metadata);
663
 
      assertEquals(17, response.intValue());
664
 
      log.info("client.invoke(INVOCATION_TEST, metadata) successful");
665
 
      
666
 
      // Register callback handler.
667
 
      Field field = Client.class.getDeclaredField("callbackPollers");
668
 
      field.setAccessible(true);
669
 
      Map callbackPollers = (Map) field.get(client);
670
 
      final SimpleCallbackHandler callbackHandler = new SimpleCallbackHandler();
671
 
      metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCKING);
672
 
      metadata.put(ServerInvoker.BLOCKING_TIMEOUT, "10000");
673
 
      client.addListener(callbackHandler, metadata);
674
 
      assertEquals(1, callbackPollers.size());
675
 
      CallbackPoller callbackPoller = (CallbackPoller) callbackPollers.values().iterator().next();
676
 
      field = CallbackPoller.class.getDeclaredField("blockingPollerThread");
677
 
      field.setAccessible(true);
678
 
      Thread blockingPollerThread = (Thread) field.get(callbackPoller);
679
 
      assertNotNull(blockingPollerThread);
680
 
      log.info("client added callback handler for pull callbacks");
681
 
      
682
 
      // Test blocking timeout.
683
 
      new Thread()
684
 
      {
685
 
         public void run()
686
 
         {
687
 
            try
688
 
            {
689
 
               client.removeListener(callbackHandler);
690
 
            }
691
 
            catch (Throwable e)
692
 
            {
693
 
               e.printStackTrace();
694
 
            }
695
 
         }
696
 
      }.start();
697
 
      
698
 
      Thread.sleep(2000);
699
 
      assertTrue(blockingPollerThread.isAlive());
700
 
      Thread.sleep(4000);
701
 
      assertTrue(blockingPollerThread.isAlive());
702
 
      Thread.sleep(15000);
703
 
      assertFalse(blockingPollerThread.isAlive());
704
 
      
705
 
      client.removeListener(callbackHandler);
706
 
      client.disconnect();
707
 
   }
708
 
   
709
 
   
710
 
   protected String getTransport()
711
 
   {
712
 
      return "socket";
713
 
   }
714
 
   
715
 
   
716
 
   protected void addExtraClientConfig(Map config) {}
717
 
   protected void addExtraServerConfig(Map config) {}
718
 
   
719
 
 
720
 
   static class SampleInvocationHandler implements ServerInvocationHandler
721
 
   {
722
 
      public Set callbackHandlers = new HashSet();
723
 
      private int counter = 0;
724
 
      
725
 
      public void addListener(InvokerCallbackHandler callbackHandler)
726
 
      {
727
 
         log.info("Adding callback listener.");
728
 
         callbackHandlers.add(callbackHandler);
729
 
      }
730
 
 
731
 
      public Object invoke(final InvocationRequest invocation) throws Throwable
732
 
      {
733
 
         Object payload = invocation.getParameter();
734
 
         if (INVOCATION_TEST.equals(payload))
735
 
         {
736
 
            Map requestMap = invocation.getRequestPayload();
737
 
            Integer counter = (Integer) requestMap.get(COUNTER);
738
 
            return counter;
739
 
         }
740
 
         else if (CALLBACK_TEST.equals(payload))
741
 
         {
742
 
            new Thread()
743
 
            {
744
 
               public void run()
745
 
               {
746
 
                  try
747
 
                  {
748
 
                     Map requestMap = invocation.getRequestPayload();
749
 
                     int delay = Integer.parseInt((String) requestMap.get(CALLBACK_DELAY_KEY));
750
 
                     Thread.sleep(delay);
751
 
                     Iterator it = callbackHandlers.iterator();
752
 
                     while (it.hasNext())
753
 
                     {
754
 
                        InvokerCallbackHandler callbackHandler = (InvokerCallbackHandler) it.next();
755
 
                        log.info("sending callback: " + ++counter);
756
 
                        callbackHandler.handleCallback(new Callback("callback"));
757
 
                     }
758
 
                     log.info("sent callback");
759
 
                  }
760
 
                  catch (HandleCallbackException e)
761
 
                  {
762
 
                     log.error("Unable to send callback");
763
 
                  }
764
 
                  catch (InterruptedException e)
765
 
                  {
766
 
                     e.printStackTrace();
767
 
                  }
768
 
               }
769
 
            }.start();
770
 
 
771
 
            return null;
772
 
         }
773
 
         else
774
 
         {
775
 
            throw new Exception("unrecognized invocation: " + payload);
776
 
         }
777
 
      }
778
 
      
779
 
      public void removeListener(InvokerCallbackHandler callbackHandler) {}
780
 
      public void setMBeanServer(MBeanServer server) {}
781
 
      public void setInvoker(ServerInvoker invoker) {}
782
 
   }
783
 
   
784
 
   
785
 
   static class SimpleCallbackHandler implements InvokerCallbackHandler
786
 
   {
787
 
      List callbacks = new ArrayList();
788
 
      
789
 
      public void handleCallback(Callback callback) throws HandleCallbackException
790
 
      {
791
 
         callbacks.add(callback);
792
 
         log.info("received callback");
793
 
      }
794
 
   }
795
 
   
796
 
   static class CallbackPuller extends Thread
797
 
   {
798
 
      Client client;
799
 
      InvokerCallbackHandler callbackHandler;
800
 
      Map metadata;
801
 
      List callbacks;
802
 
      boolean done;
803
 
      
804
 
      public CallbackPuller(Client client,
805
 
                            InvokerCallbackHandler callbackHandler,
806
 
                            Map metadata)
807
 
      {
808
 
         this.client = client;
809
 
         this.callbackHandler = callbackHandler;
810
 
         this.metadata = metadata;
811
 
      }
812
 
      
813
 
      public void run()
814
 
      {
815
 
 
816
 
         try
817
 
         {
818
 
            client.invoke(CALLBACK_TEST, metadata);
819
 
            log.info("back from client.invoke(CALLBACK_TEST, metadata)");
820
 
            callbacks = client.getCallbacks(callbackHandler, metadata);
821
 
            log.info("callbacks count: " + callbacks.size());
822
 
            done = true;
823
 
         }
824
 
         catch (Throwable e)
825
 
         {
826
 
            e.printStackTrace();
827
 
         }
828
 
      }
829
 
   }
830
 
   
831
 
   static class CallbackCreator extends Thread
832
 
   {
833
 
      Client client;
834
 
      int delay;
835
 
      boolean done;
836
 
      
837
 
      public CallbackCreator(Client client,int delay)
838
 
      {
839
 
         this.client = client;
840
 
         this.delay = delay;
841
 
      }
842
 
      
843
 
      public void run()
844
 
      {
845
 
 
846
 
         try
847
 
         {
848
 
            HashMap metadata = new HashMap();
849
 
            metadata.put(CALLBACK_DELAY_KEY, Integer.toString(delay));
850
 
            client.invoke(CALLBACK_TEST, metadata);
851
 
            log.info("back from client.invoke(CALLBACK_TEST)");
852
 
            done = true;
853
 
         }
854
 
         catch (Throwable e)
855
 
         {
856
 
            e.printStackTrace();
857
 
         }
858
 
      }
859
 
   }
860
 
}
 
 
b'\\ No newline at end of file'