2
* JBoss, Home of Professional Open Source
3
* Copyright 2005, JBoss Inc., and individual contributors as indicated
4
* by the @authors tag. See the copyright.txt in the distribution for a
5
* full listing of individual contributors.
7
* This is free software; you can redistribute it and/or modify it
8
* under the terms of the GNU Lesser General Public License as
9
* published by the Free Software Foundation; either version 2.1 of
10
* the License, or (at your option) any later version.
12
* This software is distributed in the hope that it will be useful,
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
* Lesser General Public License for more details.
17
* You should have received a copy of the GNU Lesser General Public
18
* License along with this software; if not, write to the Free
19
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
22
package org.jboss.test.remoting.callback.pull;
24
import java.lang.reflect.Field;
25
import java.net.InetAddress;
26
import java.util.ArrayList;
27
import java.util.HashMap;
28
import java.util.List;
31
import javax.management.MBeanServer;
33
import junit.framework.TestCase;
35
import org.apache.log4j.ConsoleAppender;
36
import org.apache.log4j.Level;
37
import org.apache.log4j.Logger;
38
import org.apache.log4j.PatternLayout;
39
import org.jboss.remoting.Client;
40
import org.jboss.remoting.InvocationRequest;
41
import org.jboss.remoting.InvokerLocator;
42
import org.jboss.remoting.ServerInvocationHandler;
43
import org.jboss.remoting.ServerInvoker;
44
import org.jboss.remoting.callback.Callback;
45
import org.jboss.remoting.callback.CallbackListener;
46
import org.jboss.remoting.callback.CallbackPoller;
47
import org.jboss.remoting.callback.HandleCallbackException;
48
import org.jboss.remoting.callback.InvokerCallbackHandler;
49
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
50
import org.jboss.remoting.transport.Connector;
51
import org.jboss.remoting.transport.PortUtil;
52
import org.jboss.util.id.GUID;
56
* Verifies that CallbackPoller shuts down when max error count is exceeded.
58
* @author <a href="ron.sigal@jboss.com">Ron Sigal</a>
59
* @version $Revision: 2903 $
61
* Copyright June 18, 2007
64
public class CallbackPollerShutdownTestCase extends TestCase
66
public static int port;
68
private static Logger log = Logger.getLogger(CallbackPollerShutdownTestCase.class);
69
private static boolean firstTime = true;
71
// remoting server connector
72
private Connector connector;
73
private InvokerLocator serverLocator;
74
private SampleInvocationHandler invocationHandler;
78
* Sets up target remoting server.
80
public void setUp() throws Exception
85
Logger.getLogger("org.jboss.remoting").setLevel(Level.INFO);
86
Logger.getLogger("org.jboss.test.remoting").setLevel(Level.INFO);
87
String pattern = "[%d{ABSOLUTE}] [%t] %5p (%F:%L) - %m%n";
88
PatternLayout layout = new PatternLayout(pattern);
89
ConsoleAppender consoleAppender = new ConsoleAppender(layout);
90
Logger.getRootLogger().addAppender(consoleAppender);
96
* Shuts down the server
98
public void tearDown()
100
if (connector != null)
109
* Verifies that CallbackPoller in blocking mode shuts down after max error
110
* count has been exceeded.
112
public void testBlockingCallbackPoller() throws Throwable
114
log.info("entering " + getName());
117
String host = InetAddress.getLocalHost().getHostAddress();
118
port = PortUtil.findFreePort(host);
119
String locatorURI = getTransport() + "://" + host + ":" + port;
120
serverLocator = new InvokerLocator(locatorURI);
121
log.info("Starting remoting server with locator uri of: " + locatorURI);
122
HashMap config = new HashMap();
123
config.put(InvokerLocator.FORCE_REMOTE, "true");
124
addExtraServerConfig(config);
125
connector = new Connector(serverLocator, config);
127
invocationHandler = new SampleInvocationHandler();
128
connector.addInvocationHandler("sample", invocationHandler);
132
HashMap clientConfig = new HashMap();
133
clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
134
addExtraClientConfig(clientConfig);
135
Client client = new Client(serverLocator, clientConfig);
137
log.info("client is connected");
139
// Add callback handler.
140
SimpleCallbackHandler callbackHandler = new SimpleCallbackHandler(client);
141
Map metadata = new HashMap();
142
metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCKING);
143
metadata.put(ServerInvoker.BLOCKING_TIMEOUT, "500");
144
metadata.put(CallbackPoller.MAX_ERROR_COUNT, "4");
145
client.addListener(callbackHandler, metadata);
146
log.info("client added callback handler for pull callbacks");
148
// Test for good connection.
149
assertEquals("abc", client.invoke("abc"));
151
// Give time for CallbackPoller$AcknowledgeThread to start.
154
// Get necessary fields.
155
Field field = Client.class.getDeclaredField("callbackPollers");
156
field.setAccessible(true);
157
Map callbackPollers = (Map) field.get(client);
158
assertEquals(1, callbackPollers.size());
159
CallbackPoller poller = (CallbackPoller) callbackPollers.values().iterator().next();
160
field.setAccessible(true);
161
Class[] classes = CallbackPoller.class.getDeclaredClasses();
162
Class handleThreadClass = null;
163
Class acknowledgeThreadClass = null;
164
for (int i = 0; i < classes.length; i++)
166
Class c = classes[i];
167
log.info(c.getName());
168
String fqn = c.getName();
169
String name = fqn.substring(fqn.indexOf('$') + 1);
171
if ("HandleThread".equals(name))
172
handleThreadClass = c;
173
else if ("AcknowledgeThread".equals(name))
174
acknowledgeThreadClass = c;
177
assertNotNull(handleThreadClass);
178
assertNotNull(acknowledgeThreadClass);
179
field = CallbackPoller.class.getDeclaredField("handleThread");
180
field.setAccessible(true);
181
Thread handleThread = (Thread) field.get(poller);
182
log.info("handleThread: " + handleThread);
183
field = CallbackPoller.class.getDeclaredField("acknowledgeThread");
184
field.setAccessible(true);
185
Thread acknowledgeThread = (Thread) field.get(poller);
186
log.info("acknowledgeThread: " + handleThread);
190
// Wait for CallbackPoller to shut down.
192
field = handleThreadClass.getDeclaredField("done");
193
field.setAccessible(true);
194
boolean handleThreadDone = ((Boolean) field.get(handleThread)).booleanValue();
195
field = acknowledgeThreadClass.getDeclaredField("done");
196
field.setAccessible(true);
197
boolean acknowledgeThreadDone = ((Boolean) field.get(acknowledgeThread)).booleanValue();
198
field = CallbackPoller.class.getDeclaredField("running");
199
field.setAccessible(true);
200
boolean pollerRunning = ((Boolean) field.get(poller)).booleanValue();
201
assertTrue(handleThreadDone);
202
assertTrue(acknowledgeThreadDone);
203
assertFalse(pollerRunning);
205
client.setDisconnectTimeout(0);
206
client.removeListener(callbackHandler);
212
* Verifies that CallbackPoller in nonblocking mode shuts down after max error
213
* count has been exceeded.
215
public void testNonBlockingCallbackPoller() throws Throwable
217
log.info("entering " + getName());
220
String host = InetAddress.getLocalHost().getHostAddress();
221
port = PortUtil.findFreePort(host);
222
String locatorURI = getTransport() + "://" + host + ":" + port;
223
serverLocator = new InvokerLocator(locatorURI);
224
log.info("Starting remoting server with locator uri of: " + locatorURI);
225
HashMap config = new HashMap();
226
config.put(InvokerLocator.FORCE_REMOTE, "true");
227
addExtraServerConfig(config);
228
connector = new Connector(serverLocator, config);
230
invocationHandler = new SampleInvocationHandler();
231
connector.addInvocationHandler("sample", invocationHandler);
235
HashMap clientConfig = new HashMap();
236
clientConfig.put(InvokerLocator.FORCE_REMOTE, "true");
237
addExtraClientConfig(clientConfig);
238
Client client = new Client(serverLocator, clientConfig);
240
log.info("client is connected");
242
// Add callback handler.
243
SimpleCallbackHandler callbackHandler = new SimpleCallbackHandler(client);
244
Map metadata = new HashMap();
245
metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.NONBLOCKING);
246
metadata.put(CallbackPoller.CALLBACK_POLL_PERIOD, "500");
247
metadata.put(ServerInvoker.BLOCKING_TIMEOUT, "500");
248
metadata.put(CallbackPoller.MAX_ERROR_COUNT, "4");
249
client.addListener(callbackHandler, metadata);
250
log.info("client added callback handler for pull callbacks");
252
// Test for good connection.
253
assertEquals("abc", client.invoke("abc"));
255
// Give time for CallbackPoller$AcknowledgeThread to start.
258
// Get necessary fields.
259
Field field = Client.class.getDeclaredField("callbackPollers");
260
field.setAccessible(true);
261
Map callbackPollers = (Map) field.get(client);
262
assertEquals(1, callbackPollers.size());
263
CallbackPoller poller = (CallbackPoller) callbackPollers.values().iterator().next();
264
field.setAccessible(true);
265
Class[] classes = CallbackPoller.class.getDeclaredClasses();
266
Class handleThreadClass = null;
267
Class acknowledgeThreadClass = null;
268
for (int i = 0; i < classes.length; i++)
270
Class c = classes[i];
271
log.info(c.getName());
272
String fqn = c.getName();
273
String name = fqn.substring(fqn.indexOf('$') + 1);
275
if ("HandleThread".equals(name))
276
handleThreadClass = c;
277
else if ("AcknowledgeThread".equals(name))
278
acknowledgeThreadClass = c;
281
assertNotNull(handleThreadClass);
282
assertNotNull(acknowledgeThreadClass);
283
field = CallbackPoller.class.getDeclaredField("handleThread");
284
field.setAccessible(true);
285
Thread handleThread = (Thread) field.get(poller);
286
log.info("handleThread: " + handleThread);
287
field = CallbackPoller.class.getDeclaredField("acknowledgeThread");
288
field.setAccessible(true);
289
Thread acknowledgeThread = (Thread) field.get(poller);
290
log.info("acknowledgeThread: " + handleThread);
294
// Wait for CallbackPoller to shut down.
296
field = handleThreadClass.getDeclaredField("done");
297
field.setAccessible(true);
298
boolean handleThreadDone = ((Boolean) field.get(handleThread)).booleanValue();
299
field = acknowledgeThreadClass.getDeclaredField("done");
300
field.setAccessible(true);
301
boolean acknowledgeThreadDone = ((Boolean) field.get(acknowledgeThread)).booleanValue();
302
field = CallbackPoller.class.getDeclaredField("running");
303
field.setAccessible(true);
304
boolean pollerRunning = ((Boolean) field.get(poller)).booleanValue();
305
assertTrue(handleThreadDone);
306
assertTrue(acknowledgeThreadDone);
307
assertFalse(pollerRunning);
309
client.setDisconnectTimeout(0);
310
client.removeListener(callbackHandler);
315
protected String getTransport()
321
protected void addExtraClientConfig(Map config) {}
322
protected void addExtraServerConfig(Map config) {}
325
static class SampleInvocationHandler implements ServerInvocationHandler, CallbackListener
327
public void addListener(InvokerCallbackHandler callbackHandler)
329
log.info("sending callback with request for acknowledgement");
330
HashMap returnPayload = new HashMap();
331
returnPayload.put(ServerInvokerCallbackHandler.CALLBACK_LISTENER, this);
332
returnPayload.put(ServerInvokerCallbackHandler.CALLBACK_ID, new GUID());
333
returnPayload.put(ServerInvokerCallbackHandler.REMOTING_ACKNOWLEDGES_PUSH_CALLBACKS, "true");
334
Callback callback = new Callback("callback");
335
callback.setReturnPayload(returnPayload);
339
callbackHandler.handleCallback(callback);
341
catch (HandleCallbackException e)
347
public Object invoke(final InvocationRequest invocation) throws Throwable
349
return invocation.getParameter();
352
public void removeListener(InvokerCallbackHandler callbackHandler) {}
353
public void setMBeanServer(MBeanServer server) {}
354
public void setInvoker(ServerInvoker invoker) {}
355
public void acknowledgeCallback(InvokerCallbackHandler callbackHandler, Object callbackId, Object response)
357
log.info(callbackId + " acknowledged");
362
static class SimpleCallbackHandler implements InvokerCallbackHandler
364
List callbacks = new ArrayList();
367
public SimpleCallbackHandler(Client client)
369
this.client = client;
372
public void handleCallback(Callback callback) throws HandleCallbackException
376
callbacks.add(callback);
377
log.info("received callback");
b'\\ No newline at end of file'