2
* JBoss, Home of Professional Open Source
3
* Copyright 2005, JBoss Inc., and individual contributors as indicated
4
* by the @authors tag. See the copyright.txt in the distribution for a
5
* full listing of individual contributors.
7
* This is free software; you can redistribute it and/or modify it
8
* under the terms of the GNU Lesser General Public License as
9
* published by the Free Software Foundation; either version 2.1 of
10
* the License, or (at your option) any later version.
12
* This software is distributed in the hope that it will be useful,
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
* Lesser General Public License for more details.
17
* You should have received a copy of the GNU Lesser General Public
18
* License along with this software; if not, write to the Free
19
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
22
package org.jboss.test.remoting.callback.pull.blocking;
24
import java.lang.reflect.Field;
25
import java.net.InetAddress;
26
import java.util.ArrayList;
27
import java.util.HashMap;
28
import java.util.HashSet;
29
import java.util.Iterator;
30
import java.util.List;
34
import javax.management.MBeanServer;
36
import junit.framework.TestCase;
38
import org.apache.log4j.ConsoleAppender;
39
import org.apache.log4j.Level;
40
import org.apache.log4j.Logger;
41
import org.apache.log4j.PatternLayout;
42
import org.jboss.remoting.Client;
43
import org.jboss.remoting.InvocationRequest;
44
import org.jboss.remoting.InvokerLocator;
45
import org.jboss.remoting.ServerInvocationHandler;
46
import org.jboss.remoting.ServerInvoker;
47
import org.jboss.remoting.callback.Callback;
48
import org.jboss.remoting.callback.CallbackPoller;
49
import org.jboss.remoting.callback.HandleCallbackException;
50
import org.jboss.remoting.callback.InvokerCallbackHandler;
51
import org.jboss.remoting.transport.Connector;
52
import org.jboss.remoting.transport.PortUtil;
56
* Tests the blocking mode for pull callbacks.
60
* @author <a href="ron.sigal@jboss.com">Ron Sigal</a>
61
* @version $Revision: 3015 $
63
* Copyright May 2, 2007
66
public class BlockingPullCallbackTestCase extends TestCase
68
public static int port;
70
private static Logger log = Logger.getLogger(BlockingPullCallbackTestCase.class);
72
private static final String INVOCATION_TEST = "invocationTest";
73
private static final String CALLBACK_TEST = "callbackTest";
74
private static final String COUNTER = "counter";
75
private static final String CALLBACK_DELAY_KEY = "callbackDelay";
77
private static boolean firstTime = true;
79
// remoting server connector
80
private Connector connector;
81
private InvokerLocator serverLocator;
82
private SampleInvocationHandler invocationHandler;
86
* Sets up target remoting server.
88
public void setUp() throws Exception
93
Logger.getLogger("org.jboss.remoting").setLevel(Level.INFO);
94
Logger.getLogger("org.jboss.test.remoting").setLevel(Level.INFO);
95
String pattern = "[%d{ABSOLUTE}] [%t] %5p (%F:%L) - %m%n";
96
PatternLayout layout = new PatternLayout(pattern);
97
ConsoleAppender consoleAppender = new ConsoleAppender(layout);
98
Logger.getRootLogger().addAppender(consoleAppender);
104
* Shuts down the server
106
public void tearDown()
108
if (connector != null)
117
* Tests blocking and nonblocking direct calls to Client.getCallbacks().
119
public void testBlockingPullCallback() throws Throwable
121
log.info("entering " + getName());
122
int CALLBACK_DELAY = 5000;
125
String host = InetAddress.getLocalHost().getHostAddress();
126
port = PortUtil.findFreePort(host);
127
String locatorURI = getTransport() + "://" + host + ":" + port;
128
serverLocator = new InvokerLocator(locatorURI);
129
log.info("Starting remoting server with locator uri of: " + locatorURI);
130
HashMap config = new HashMap();
131
config.put(InvokerLocator.FORCE_REMOTE, "true");
132
addExtraServerConfig(config);
133
connector = new Connector(serverLocator, config);
135
invocationHandler = new SampleInvocationHandler();
136
connector.addInvocationHandler("sample", invocationHandler);
140
HashMap clientConfig = new HashMap();
141
clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
142
addExtraClientConfig(clientConfig);
143
Client client = new Client(serverLocator, clientConfig);
145
log.info("client is connected");
147
SimpleCallbackHandler callbackHandler = new SimpleCallbackHandler();
148
client.addListener(callbackHandler);
149
log.info("client added callback handler for pull callbacks");
151
// Test for good connection.
152
Integer count = new Integer(17);
153
HashMap metadata = new HashMap();
154
metadata.put(COUNTER, count);
155
Integer response = (Integer) client.invoke(INVOCATION_TEST, metadata);
156
assertEquals(17, response.intValue());
157
log.info("client.invoke(INVOCATION_TEST, metadata) successful");
159
// Test nonblocking callbacks.
160
HashMap pullerMetadata = new HashMap();
161
pullerMetadata.put(CALLBACK_DELAY_KEY, Integer.toString(CALLBACK_DELAY));
162
pullerMetadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.NONBLOCKING);
163
CallbackPuller puller = new CallbackPuller(client, callbackHandler, pullerMetadata);
166
// Should have received empty list of callbacks.
167
assertTrue(puller.done);
168
assertNotNull(puller.callbacks);
169
assertTrue(puller.callbacks.isEmpty());
171
// Drain stored callback.
172
Thread.sleep(CALLBACK_DELAY);
173
metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.NONBLOCKING);
174
List callbacks = client.getCallbacks(callbackHandler, metadata);
175
assertNotNull(callbacks);
176
assertEquals(1, callbacks.size());
178
// Test blocking callbacks.
179
pullerMetadata.clear();
180
pullerMetadata.put(CALLBACK_DELAY_KEY, Integer.toString(CALLBACK_DELAY));
181
pullerMetadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCKING);
182
pullerMetadata.put(ServerInvoker.BLOCKING_TIMEOUT, Integer.toString(CALLBACK_DELAY * 2));
183
puller = new CallbackPuller(client, callbackHandler, pullerMetadata);
186
assertFalse(puller.done);
187
// Should not have returned from getCallbacks() yet.
188
assertNull(puller.callbacks);
189
Thread.sleep(CALLBACK_DELAY);
190
assertTrue(puller.done);
191
assertNotNull(puller.callbacks);
192
assertEquals(1, puller.callbacks.size());
194
client.removeListener(callbackHandler);
200
* Tests configuration of blocking timeout in server configuration map.
202
public void testBlockingPullCallbackServerConfiguration() throws Throwable
204
log.info("entering " + getName());
207
String host = InetAddress.getLocalHost().getHostAddress();
208
port = PortUtil.findFreePort(host);
209
String locatorURI = getTransport() + "://" + host + ":" + port;
210
serverLocator = new InvokerLocator(locatorURI);
211
log.info("Starting remoting server with locator uri of: " + locatorURI);
212
HashMap config = new HashMap();
213
config.put(InvokerLocator.FORCE_REMOTE, "true");
215
int blockingTimeout = ServerInvoker.DEFAULT_BLOCKING_TIMEOUT + 2000;
216
config.put(ServerInvoker.BLOCKING_TIMEOUT, Integer.toString(blockingTimeout));
217
addExtraServerConfig(config);
218
connector = new Connector(serverLocator, config);
220
invocationHandler = new SampleInvocationHandler();
221
connector.addInvocationHandler("sample", invocationHandler);
225
HashMap clientConfig = new HashMap();
226
clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
227
addExtraClientConfig(clientConfig);
228
Client client = new Client(serverLocator, clientConfig);
230
log.info("client is connected");
232
SimpleCallbackHandler callbackHandler = new SimpleCallbackHandler();
233
client.addListener(callbackHandler);
234
log.info("client added callback handler for pull callbacks");
236
// Test for good connection.
237
Integer count = new Integer(17);
238
HashMap metadata = new HashMap();
239
metadata.put(COUNTER, count);
240
Integer response = (Integer) client.invoke(INVOCATION_TEST, metadata);
241
assertEquals(17, response.intValue());
242
log.info("client.invoke(INVOCATION_TEST, metadata) successful");
244
// Test blocking callbacks.
246
// Set CALLBACK_DELAY == default blocking timeout + 1000. If the default blocking
247
// timeout were in effect, Client.getCallbacks() would time out and return without
248
// getting a callback.
249
int CALLBACK_DELAY = blockingTimeout - 1000;
250
Map pullerMetadata = new HashMap();
251
pullerMetadata.put(CALLBACK_DELAY_KEY, Integer.toString(CALLBACK_DELAY));
252
pullerMetadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCKING);
253
CallbackPuller puller = new CallbackPuller(client, callbackHandler, pullerMetadata);
255
Thread.sleep(CALLBACK_DELAY - 1000);
257
// Callback has not been created yet, so should not have returned from getCallbacks().
258
assertFalse(puller.done);
259
assertNull(puller.callbacks);
262
// Callback has been created, so should have returned from getCallbacks().
263
assertTrue(puller.done);
264
assertNotNull(puller.callbacks);
265
assertEquals(1, puller.callbacks.size());
267
client.removeListener(callbackHandler);
273
* Tests configuration of blocking timeout by Client.addListener() metadata.
274
* In this case, a CallbackPoller is created.
276
public void testBlockingCallbackPollerInitialMetadataConfiguration() throws Throwable
278
log.info("entering " + getName());
281
String host = InetAddress.getLocalHost().getHostAddress();
282
port = PortUtil.findFreePort(host);
283
String locatorURI = getTransport() + "://" + host + ":" + port;
284
serverLocator = new InvokerLocator(locatorURI);
285
log.info("Starting remoting server with locator uri of: " + locatorURI);
286
HashMap config = new HashMap();
287
config.put(InvokerLocator.FORCE_REMOTE, "true");
288
int serverBlockingTimeout = ServerInvoker.DEFAULT_BLOCKING_TIMEOUT + 2000;
289
config.put(ServerInvoker.BLOCKING_TIMEOUT, Integer.toString(serverBlockingTimeout));
290
addExtraServerConfig(config);
291
connector = new Connector(serverLocator, config);
293
invocationHandler = new SampleInvocationHandler();
294
connector.addInvocationHandler("sample", invocationHandler);
298
HashMap clientConfig = new HashMap();
299
clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
300
addExtraClientConfig(clientConfig);
301
Client client = new Client(serverLocator, clientConfig);
303
log.info("client is connected");
305
// Test for good connection.
306
Integer count = new Integer(17);
307
HashMap metadata = new HashMap();
308
metadata.put(COUNTER, count);
309
Integer response = (Integer) client.invoke(INVOCATION_TEST, metadata);
310
assertEquals(17, response.intValue());
311
log.info("client.invoke(INVOCATION_TEST, metadata) successful");
313
// Test blocking callbacks.
315
// Reset blocking timeout to server blocking timeout + 2000, and set
316
// CALLBACK_DELAY to server blocking timeout + 1000. If the server's blocking
317
// timeout were in effect, Client.getCallbacks() would time out and return without
318
// getting a callback.
319
int clientBlockingTimeout = serverBlockingTimeout + 2000;
320
int CALLBACK_DELAY = serverBlockingTimeout + 1000;
322
metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCKING);
323
metadata.put(ServerInvoker.BLOCKING_TIMEOUT, Integer.toString(clientBlockingTimeout));
324
SimpleCallbackHandler callbackHandler = new SimpleCallbackHandler();
325
client.addListener(callbackHandler, metadata);
326
log.info("client added callback handler for pull callbacks");
329
CallbackCreator creator = new CallbackCreator(client, CALLBACK_DELAY);
331
Thread.sleep(CALLBACK_DELAY - 1000);
333
// Callback has not been created yet, so should not have returned from getCallbacks().
334
assertEquals(0, callbackHandler.callbacks.size());
337
// Callback has been created, so should have returned from getCallbacks().
338
assertEquals(1, callbackHandler.callbacks.size());
340
client.removeListener(callbackHandler);
346
* Tests configuration of blocking timeout by Client.getCallbacks() metadata.
348
public void testBlockingPullCallbackInvocationMetadataConfiguration() throws Throwable
350
log.info("entering " + getName());
353
String host = InetAddress.getLocalHost().getHostAddress();
354
port = PortUtil.findFreePort(host);
355
String locatorURI = getTransport() + "://" + host + ":" + port;
356
serverLocator = new InvokerLocator(locatorURI);
357
log.info("Starting remoting server with locator uri of: " + locatorURI);
358
HashMap config = new HashMap();
359
config.put(InvokerLocator.FORCE_REMOTE, "true");
361
int serverBlockingTimeout = ServerInvoker.DEFAULT_BLOCKING_TIMEOUT + 2000;
362
config.put(ServerInvoker.BLOCKING_TIMEOUT, Integer.toString(serverBlockingTimeout));
363
addExtraServerConfig(config);
364
connector = new Connector(serverLocator, config);
366
invocationHandler = new SampleInvocationHandler();
367
connector.addInvocationHandler("sample", invocationHandler);
371
HashMap clientConfig = new HashMap();
372
clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
373
addExtraClientConfig(clientConfig);
374
Client client = new Client(serverLocator, clientConfig);
376
log.info("client is connected");
378
SimpleCallbackHandler callbackHandler = new SimpleCallbackHandler();
379
client.addListener(callbackHandler);
380
log.info("client added callback handler for pull callbacks");
382
// Test for good connection.
383
Integer count = new Integer(17);
384
HashMap metadata = new HashMap();
385
metadata.put(COUNTER, count);
386
Integer response = (Integer) client.invoke(INVOCATION_TEST, metadata);
387
assertEquals(17, response.intValue());
388
log.info("client.invoke(INVOCATION_TEST, metadata) successful");
390
// Test blocking callbacks.
392
// Reset blocking timeout to server blocking timeout + 2000, and set
393
// CALLBACK_DELAY to server blocking timeout + 1000. If the server's blocking
394
// timeout were in effect, Client.getCallbacks() would time out and return without
395
// getting a callback.
396
int clientBlockingTimeout = serverBlockingTimeout + 2000;
397
int CALLBACK_DELAY = serverBlockingTimeout + 1000;
398
Map pullerMetadata = new HashMap();
399
pullerMetadata.put(CALLBACK_DELAY_KEY, Integer.toString(CALLBACK_DELAY));
400
pullerMetadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCKING);
401
pullerMetadata.put(ServerInvoker.BLOCKING_TIMEOUT, Integer.toString(clientBlockingTimeout));
402
CallbackPuller puller = new CallbackPuller(client, callbackHandler, pullerMetadata);
404
Thread.sleep(CALLBACK_DELAY - 1000);
406
// Callback has not been created yet, so should not have returned from getCallbacks().
407
assertFalse(puller.done);
408
assertNull(puller.callbacks);
411
// Callback has been created, so should have returned from getCallbacks().
412
assertTrue(puller.done);
413
assertNotNull(puller.callbacks);
414
assertEquals(1, puller.callbacks.size());
416
client.removeListener(callbackHandler);
422
* Tests CallbackPoller in nonblocking mode.
424
public void testNonBlockingCallbackPoller() throws Throwable
426
log.info("entering " + getName());
429
String host = InetAddress.getLocalHost().getHostAddress();
430
port = PortUtil.findFreePort(host);
431
String locatorURI = getTransport() + "://" + host + ":" + port;
432
serverLocator = new InvokerLocator(locatorURI);
433
log.info("Starting remoting server with locator uri of: " + locatorURI);
434
HashMap config = new HashMap();
435
config.put(InvokerLocator.FORCE_REMOTE, "true");
436
addExtraServerConfig(config);
437
connector = new Connector(serverLocator, config);
439
invocationHandler = new SampleInvocationHandler();
440
connector.addInvocationHandler("sample", invocationHandler);
444
HashMap clientConfig = new HashMap();
445
clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
446
addExtraClientConfig(clientConfig);
447
Client client = new Client(serverLocator, clientConfig);
449
log.info("client is connected");
451
int CALLBACK_DELAY = 5000;
452
SimpleCallbackHandler callbackHandler = new SimpleCallbackHandler();
453
Map metadata = new HashMap();
454
metadata.put(CallbackPoller.CALLBACK_POLL_PERIOD, Integer.toString(CALLBACK_DELAY));
455
metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.NONBLOCKING);
456
client.addListener(callbackHandler, metadata);
457
log.info("client added callback handler for pull callbacks");
459
// Test for good connection.
460
Integer count = new Integer(17);
462
metadata = new HashMap();
463
metadata.put(COUNTER, count);
464
Integer response = (Integer) client.invoke(INVOCATION_TEST, metadata);
465
assertEquals(17, response.intValue());
466
log.info("client.invoke(INVOCATION_TEST, metadata) successful");
468
// Test nonblocking behavior.
470
// Callback will be created after CallbackPoller's first poll.
471
CallbackCreator creator = new CallbackCreator(client, CALLBACK_DELAY);
473
Thread.sleep(CALLBACK_DELAY);
474
assertEquals(0, callbackHandler.callbacks.size());
476
// CallbackPoller hasn't made second poll yet.
477
assertEquals(0, callbackHandler.callbacks.size());
478
Thread.sleep(CALLBACK_DELAY);
479
assertEquals(1, callbackHandler.callbacks.size());
481
client.removeListener(callbackHandler);
487
* Tests CallbackPoller in blocking mode.
489
public void testBlockingCallbackPoller() throws Throwable
491
log.info("entering " + getName());
494
String host = InetAddress.getLocalHost().getHostAddress();
495
port = PortUtil.findFreePort(host);
496
String locatorURI = getTransport() + "://" + host + ":" + port;
497
serverLocator = new InvokerLocator(locatorURI);
498
log.info("Starting remoting server with locator uri of: " + locatorURI);
499
HashMap config = new HashMap();
500
config.put(InvokerLocator.FORCE_REMOTE, "true");
501
addExtraServerConfig(config);
502
connector = new Connector(serverLocator, config);
504
invocationHandler = new SampleInvocationHandler();
505
connector.addInvocationHandler("sample", invocationHandler);
509
HashMap clientConfig = new HashMap();
510
clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
511
addExtraClientConfig(clientConfig);
512
Client client = new Client(serverLocator, clientConfig);
514
log.info("client is connected");
516
int CALLBACK_DELAY = 5000;
517
SimpleCallbackHandler callbackHandler = new SimpleCallbackHandler();
518
Map metadata = new HashMap();
519
metadata.put(CallbackPoller.CALLBACK_POLL_PERIOD, Integer.toString(CALLBACK_DELAY));
520
metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCKING);
521
client.addListener(callbackHandler, metadata);
522
log.info("client added callback handler for pull callbacks");
524
// Test for good connection.
525
Integer count = new Integer(17);
527
metadata = new HashMap();
528
metadata.put(COUNTER, count);
529
Integer response = (Integer) client.invoke(INVOCATION_TEST, metadata);
530
assertEquals(17, response.intValue());
531
log.info("client.invoke(INVOCATION_TEST, metadata) successful");
533
// Test blocking behavior.
534
CallbackCreator creator = new CallbackCreator(client, CALLBACK_DELAY);
536
assertEquals(0, callbackHandler.callbacks.size());
537
Thread.sleep(CALLBACK_DELAY + 1000);
538
assertEquals(1, callbackHandler.callbacks.size());
540
client.removeListener(callbackHandler);
546
* Tests blocking timeout set on server.
548
public void testBlockingCallbackPollerShutdownServerConfig() throws Throwable
550
log.info("entering " + getName());
553
String host = InetAddress.getLocalHost().getHostAddress();
554
port = PortUtil.findFreePort(host);
555
String locatorURI = getTransport() + "://" + host + ":" + port;
556
serverLocator = new InvokerLocator(locatorURI);
557
log.info("Starting remoting server with locator uri of: " + locatorURI);
558
HashMap config = new HashMap();
559
config.put(InvokerLocator.FORCE_REMOTE, "true");
560
config.put(ServerInvoker.BLOCKING_TIMEOUT, "4000");
561
addExtraServerConfig(config);
562
connector = new Connector(serverLocator, config);
564
invocationHandler = new SampleInvocationHandler();
565
connector.addInvocationHandler("sample", invocationHandler);
569
HashMap clientConfig = new HashMap();
570
clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
571
addExtraClientConfig(clientConfig);
572
final Client client = new Client(serverLocator, clientConfig);
574
log.info("client is connected");
576
// Test for good connection.
577
Integer count = new Integer(17);
578
Map metadata = new HashMap();
580
metadata = new HashMap();
581
metadata.put(COUNTER, count);
582
Integer response = (Integer) client.invoke(INVOCATION_TEST, metadata);
583
assertEquals(17, response.intValue());
584
log.info("client.invoke(INVOCATION_TEST, metadata) successful");
586
// Register callback handler.
587
Field field = Client.class.getDeclaredField("callbackPollers");
588
field.setAccessible(true);
589
Map callbackPollers = (Map) field.get(client);
590
final SimpleCallbackHandler callbackHandler = new SimpleCallbackHandler();
591
metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCKING);
592
client.addListener(callbackHandler, metadata);
593
assertEquals(1, callbackPollers.size());
594
CallbackPoller callbackPoller = (CallbackPoller) callbackPollers.values().iterator().next();
595
field = CallbackPoller.class.getDeclaredField("blockingPollerThread");
596
field.setAccessible(true);
597
Thread blockingPollerThread = (Thread) field.get(callbackPoller);
598
assertNotNull(blockingPollerThread);
599
log.info("client added callback handler for pull callbacks");
601
// Test blocking timeout.
608
client.removeListener(callbackHandler);
618
assertFalse(blockingPollerThread.isAlive());
620
client.removeListener(callbackHandler);
626
* Tests blocking timeout set on client.
628
public void testBlockingCallbackPollerShutdownClientConfig() throws Throwable
630
log.info("entering " + getName());
633
String host = InetAddress.getLocalHost().getHostAddress();
634
port = PortUtil.findFreePort(host);
635
String locatorURI = getTransport() + "://" + host + ":" + port;
636
serverLocator = new InvokerLocator(locatorURI);
637
log.info("Starting remoting server with locator uri of: " + locatorURI);
638
HashMap config = new HashMap();
639
config.put(InvokerLocator.FORCE_REMOTE, "true");
640
config.put(ServerInvoker.BLOCKING_TIMEOUT, "4000");
641
addExtraServerConfig(config);
642
connector = new Connector(serverLocator, config);
644
invocationHandler = new SampleInvocationHandler();
645
connector.addInvocationHandler("sample", invocationHandler);
649
HashMap clientConfig = new HashMap();
650
clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
651
addExtraClientConfig(clientConfig);
652
final Client client = new Client(serverLocator, clientConfig);
654
log.info("client is connected");
656
// Test for good connection.
657
Integer count = new Integer(17);
658
Map metadata = new HashMap();
660
metadata = new HashMap();
661
metadata.put(COUNTER, count);
662
Integer response = (Integer) client.invoke(INVOCATION_TEST, metadata);
663
assertEquals(17, response.intValue());
664
log.info("client.invoke(INVOCATION_TEST, metadata) successful");
666
// Register callback handler.
667
Field field = Client.class.getDeclaredField("callbackPollers");
668
field.setAccessible(true);
669
Map callbackPollers = (Map) field.get(client);
670
final SimpleCallbackHandler callbackHandler = new SimpleCallbackHandler();
671
metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCKING);
672
metadata.put(ServerInvoker.BLOCKING_TIMEOUT, "10000");
673
client.addListener(callbackHandler, metadata);
674
assertEquals(1, callbackPollers.size());
675
CallbackPoller callbackPoller = (CallbackPoller) callbackPollers.values().iterator().next();
676
field = CallbackPoller.class.getDeclaredField("blockingPollerThread");
677
field.setAccessible(true);
678
Thread blockingPollerThread = (Thread) field.get(callbackPoller);
679
assertNotNull(blockingPollerThread);
680
log.info("client added callback handler for pull callbacks");
682
// Test blocking timeout.
689
client.removeListener(callbackHandler);
699
assertTrue(blockingPollerThread.isAlive());
701
assertTrue(blockingPollerThread.isAlive());
703
assertFalse(blockingPollerThread.isAlive());
705
client.removeListener(callbackHandler);
710
protected String getTransport()
716
protected void addExtraClientConfig(Map config) {}
717
protected void addExtraServerConfig(Map config) {}
720
static class SampleInvocationHandler implements ServerInvocationHandler
722
public Set callbackHandlers = new HashSet();
723
private int counter = 0;
725
public void addListener(InvokerCallbackHandler callbackHandler)
727
log.info("Adding callback listener.");
728
callbackHandlers.add(callbackHandler);
731
public Object invoke(final InvocationRequest invocation) throws Throwable
733
Object payload = invocation.getParameter();
734
if (INVOCATION_TEST.equals(payload))
736
Map requestMap = invocation.getRequestPayload();
737
Integer counter = (Integer) requestMap.get(COUNTER);
740
else if (CALLBACK_TEST.equals(payload))
748
Map requestMap = invocation.getRequestPayload();
749
int delay = Integer.parseInt((String) requestMap.get(CALLBACK_DELAY_KEY));
751
Iterator it = callbackHandlers.iterator();
754
InvokerCallbackHandler callbackHandler = (InvokerCallbackHandler) it.next();
755
log.info("sending callback: " + ++counter);
756
callbackHandler.handleCallback(new Callback("callback"));
758
log.info("sent callback");
760
catch (HandleCallbackException e)
762
log.error("Unable to send callback");
764
catch (InterruptedException e)
775
throw new Exception("unrecognized invocation: " + payload);
779
public void removeListener(InvokerCallbackHandler callbackHandler) {}
780
public void setMBeanServer(MBeanServer server) {}
781
public void setInvoker(ServerInvoker invoker) {}
785
static class SimpleCallbackHandler implements InvokerCallbackHandler
787
List callbacks = new ArrayList();
789
public void handleCallback(Callback callback) throws HandleCallbackException
791
callbacks.add(callback);
792
log.info("received callback");
796
static class CallbackPuller extends Thread
799
InvokerCallbackHandler callbackHandler;
804
public CallbackPuller(Client client,
805
InvokerCallbackHandler callbackHandler,
808
this.client = client;
809
this.callbackHandler = callbackHandler;
810
this.metadata = metadata;
818
client.invoke(CALLBACK_TEST, metadata);
819
log.info("back from client.invoke(CALLBACK_TEST, metadata)");
820
callbacks = client.getCallbacks(callbackHandler, metadata);
821
log.info("callbacks count: " + callbacks.size());
831
static class CallbackCreator extends Thread
837
public CallbackCreator(Client client,int delay)
839
this.client = client;
848
HashMap metadata = new HashMap();
849
metadata.put(CALLBACK_DELAY_KEY, Integer.toString(delay));
850
client.invoke(CALLBACK_TEST, metadata);
851
log.info("back from client.invoke(CALLBACK_TEST)");
b'\\ No newline at end of file'