2
* JBoss, Home of Professional Open Source
3
* Copyright 2005, JBoss Inc., and individual contributors as indicated
4
* by the @authors tag. See the copyright.txt in the distribution for a
5
* full listing of individual contributors.
7
* This is free software; you can redistribute it and/or modify it
8
* under the terms of the GNU Lesser General Public License as
9
* published by the Free Software Foundation; either version 2.1 of
10
* the License, or (at your option) any later version.
12
* This software is distributed in the hope that it will be useful,
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
* Lesser General Public License for more details.
17
* You should have received a copy of the GNU Lesser General Public
18
* License along with this software; if not, write to the Free
19
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
22
package org.jboss.remoting.callback;
24
import java.util.ArrayList;
25
import java.util.HashMap;
26
import java.util.Iterator;
27
import java.util.List;
29
import java.util.Timer;
30
import java.util.TimerTask;
32
import org.jboss.logging.Logger;
33
import org.jboss.remoting.Client;
34
import org.jboss.remoting.ServerInvoker;
35
import org.jboss.remoting.transport.ClientInvoker;
38
* CallbackPoller is used to simulate push callbacks on transports that don't support
39
* bidirectional connections. It will periodically pull callbacks from the server
40
* and pass them to the InvokerCallbackHandler.
42
* @author <a href="mailto:tom.elrod@jboss.com">Tom Elrod</a>
43
* @author <a href="mailto:ron.sigal@jboss.com">Ron Sigal</a>
45
public class CallbackPoller extends TimerTask implements Runnable
48
* Implementation note.
50
* CallbackPoller uses two, or possibly three, threads. The first thread is the
51
* Timer thread, which periodically pulls callbacks from the server and adds them
52
* to toHandleList. The second thread takes callbacks from toHandleList, passes
53
* them to the CallbackHandler, and, if an acknowledgement is requested for a
54
* callback, it adds the callback to toAcknowledgeList. The third thread, which is
55
* created in response to the first callback for which an acknowledgement is requested,
56
* takes the contents of toAcknowledgeList and acknowledges them in a batch.
58
* CallbackPoller will not shut down until all received callbacks have been processed
59
* by the CallbackHandler and acknowledgements have been sent for all callbacks for
60
* which acknowledgements have been requested.
64
* Default polling period for getting callbacks from the server.
65
* Default is 5000 milliseconds.
67
public static final long DEFAULT_POLL_PERIOD = 5000;
70
* Default timeout for getting callbacks in blocking mode.
71
* Default is 5000 milliseconds.
73
public static final int DEFAULT_BLOCKING_TIMEOUT = 5000;
76
* Default number of exceptions before callback polling wil be shut down.
79
public static final int DEFAULT_MAX_ERROR_COUNT = 5;
82
* The key value to use to specify if stop() should wait for the call to
83
* org.jboss.remoting.Client.getCallbacks() should return. The default
84
* behavior is do a synchronized shutdown for nonblocking callbacks and
85
* a nonsynchronized shutdown for blocking callbacks.
87
public static final String SYNCHRONIZED_SHUTDOWN = "doSynchronizedShutdown";
90
* The key value to use to specify the desired poll period
91
* within the metadata Map.
93
public static final String CALLBACK_POLL_PERIOD = "callbackPollPeriod";
95
/** Use java.util.timer.schedule(). */
96
public static final String SCHEDULE_FIXED_RATE = "scheduleFixedRate";
98
/** Use java.util.timer.scheduleAtFixedRate(). */
99
public static final String SCHEDULE_FIXED_DELAY = "scheduleFixedDelay";
102
* The key to use to specify the number of errors before callback polling
105
public static final String MAX_ERROR_COUNT = "maxErrorCount";
107
/** The key to use in metadata Map to request statistics. The associated
109
public static final String REPORT_STATISTICS = "reportStatistics";
111
private Client client = null;
112
private InvokerCallbackHandler callbackHandler = null;
113
private Map metadata = null;
114
private Object callbackHandlerObject = null;
115
private boolean blocking = false;
116
private boolean synchronizedShutdown = false;
117
private long pollPeriod = DEFAULT_POLL_PERIOD;
119
private String scheduleMode = SCHEDULE_FIXED_RATE;
120
private boolean reportStatistics;
121
private boolean running;
122
private int maxErrorCount = -1;
123
private int errorCount;
124
private boolean useAllParams;
126
private ArrayList toHandleList = new ArrayList();
127
private ArrayList toAcknowledgeList = new ArrayList();
128
private HandleThread handleThread;
129
private AcknowledgeThread acknowledgeThread;
130
private BlockingPollerThread blockingPollerThread;
132
private static final Logger log = Logger.getLogger(CallbackPoller.class);
135
public CallbackPoller(Client client, InvokerCallbackHandler callbackhandler, Map metadata, Object callbackHandlerObject)
137
this.client = client;
138
this.callbackHandler = callbackhandler;
139
this.metadata = new HashMap(metadata);
140
this.callbackHandlerObject = callbackHandlerObject;
143
public void start() throws Exception
145
if (callbackHandler == null)
147
throw new NullPointerException("Can not poll for callbacks when InvokerCallbackHandler is null.");
155
throw new NullPointerException("Can not poll for callbacks when Client is null.");
158
configureParameters();
160
handleThread = new HandleThread("HandleThread");
161
handleThread.start();
162
if (log.isTraceEnabled()) log.trace("blocking: " + blocking);
165
if (maxErrorCount == -1)
166
maxErrorCount = DEFAULT_MAX_ERROR_COUNT;
169
metadata.put(Client.THROW_CALLBACK_EXCEPTION, "true");
170
blockingPollerThread = new BlockingPollerThread();
171
blockingPollerThread.start();
175
timer = new Timer(true);
176
if (SCHEDULE_FIXED_DELAY.equals(scheduleMode))
177
timer.schedule(this, pollPeriod, pollPeriod);
179
timer.scheduleAtFixedRate(this, pollPeriod, pollPeriod);
183
public synchronized void run()
185
// need to pull callbacks from server and give them to callback handler
188
if (log.isTraceEnabled()) log.trace(this + " getting callbacks for " + callbackHandler);
189
List callbacks = client.getCallbacks(callbackHandler, metadata);
190
if (log.isTraceEnabled()) log.trace(this + " callback count: " + (callbacks == null ? 0 : callbacks.size()));
192
if (callbacks != null && callbacks.size() > 0)
194
synchronized (toHandleList)
196
toHandleList.addAll(callbacks);
197
if (toHandleList.size() == callbacks.size())
198
toHandleList.notify();
202
if (reportStatistics)
203
reportStatistics(callbacks);
205
catch (Throwable throwable)
213
log.info(this + " Error getting callbacks from server.");
214
log.debug(this + " Error getting callbacks from server.", throwable);
215
String errorMessage = throwable.getMessage();
216
if (errorMessage != null)
218
if (errorMessage.startsWith("Could not find listener id"))
220
log.error("Client no longer has InvokerCallbackHandler (" +
222
") registered. Shutting down callback polling");
226
if (errorMessage.startsWith("Can not make remoting client invocation " +
227
"due to not being connected to server."))
229
log.error("Client no longer connected. Shutting down callback polling");
234
if (maxErrorCount >= 0)
236
if (++errorCount > maxErrorCount)
238
log.error("Error limit of " + maxErrorCount +
239
" exceeded. Shutting down callback polling");
253
* stop() will not return until all received callbacks have been processed
254
* by the CallbackHandler and acknowledgements have been sent for all callbacks for
255
* which acknowledgements have been requested.
257
public void stop(int timeout)
259
log.debug(this + " is shutting down");
276
if (synchronizedShutdown)
278
// run() and stop() are synchronized so that stop() will wait until run() has finished
279
// adding any callbacks it has received to toHandleList. Therefore, once cancel()
280
// returns, no more callbacks will arrive from the server.
291
log.debug(this + " has shut down");
295
private void shutdown()
297
// HandleThread.shutdown() will not return until all received callbacks have been
298
// processed and, if necessary, added to toAcknowledgeList.
299
if (handleThread != null)
301
handleThread.shutdown();
305
// AcknowledgeThread.shutdown() will not return until acknowledgements have been sent
306
// for all callbacks for which acknowledgements have been requested.
307
if (acknowledgeThread != null)
309
acknowledgeThread.shutdown();
310
acknowledgeThread = null;
315
class BlockingPollerThread extends Thread
317
public BlockingPollerThread()
319
String threadName = getName();
320
int i = threadName.indexOf('-');
321
String threadNumber = null;
323
threadNumber = threadName.substring(i+1);
325
threadNumber = Long.toString(System.currentTimeMillis());
326
String pollerString = CallbackPoller.this.toString();
327
String address = pollerString.substring(pollerString.indexOf('@'));
328
setName("CallbackPoller:" + threadNumber + "[" + address + "]");
336
CallbackPoller.this.run();
342
class HandleThread extends Thread
344
boolean running = true;
346
ArrayList toHandleListCopy = new ArrayList();
349
HandleThread(String name)
357
synchronized (toHandleList)
359
if (toHandleList.isEmpty() && running)
365
catch (InterruptedException e)
367
log.debug("unexpected interrupt");
372
// If toHandleList is empty, then running must be false. We return
373
// only when both conditions are true.
374
if (toHandleList.isEmpty())
377
toHandleList.notify();
381
toHandleListCopy.addAll(toHandleList);
382
toHandleList.clear();
385
while (!toHandleListCopy.isEmpty())
389
callback = (Callback) toHandleListCopy.remove(0);
390
callback.setCallbackHandleObject(callbackHandlerObject);
391
callbackHandler.handleCallback(callback);
393
catch (HandleCallbackException e)
395
log.error("Error delivering callback to callback handler (" + callbackHandler + ").", e);
398
checkForAcknowledgeRequest(callback);
404
* Once CallbackPoller.stop() has called HandleThread.shutdown(), CallbackPoller.run()
405
* has terminated and no additional callbacks will be received. shutdown() will
406
* not return until HandleThread has processed all received callbacks.
408
* Either run() or shutdown() will enter its own synchronized block first.
410
* case 1): run() enters its synchronized block first:
411
* If toHandleList is empty, then run() will reach toHandleList.wait(), shutdown()
412
* will wake up run(), and run() will exit. If toHandleList is not empty, then run()
413
* will process all outstanding callbacks and return to its synchronized block. At
414
* this point, either case 1) (with toHandleList empty) or case 2) applies.
416
* case 2): shutdown() enters its synchronized block first:
417
* run() will process all outstanding callbacks and return to its synchronized block.
418
* After shutdown() reaches toHandleList.wait(), run() will enter its synchronized
419
* block, find running == false and toHandleList empty, and it will exit.
421
protected void shutdown()
423
log.debug(this + " is shutting down");
424
synchronized (toHandleList)
427
toHandleList.notify();
434
catch (InterruptedException ignored) {}
437
log.debug(this + " has shut down");
443
class AcknowledgeThread extends Thread
445
boolean running = true;
447
ArrayList toAcknowledgeListCopy = new ArrayList();
449
AcknowledgeThread(String name)
457
synchronized (toAcknowledgeList)
459
while (toAcknowledgeList.isEmpty() && running)
463
toAcknowledgeList.wait();
465
catch (InterruptedException e)
467
log.debug("unexpected interrupt");
472
// If toAcknowledgeList is empty, then running must be false. We return
473
// only when both conditions are true.
474
if (toAcknowledgeList.isEmpty())
477
toAcknowledgeList.notify();
481
toAcknowledgeListCopy.addAll(toAcknowledgeList);
482
toAcknowledgeList.clear();
487
if (log.isTraceEnabled())
489
Iterator it = toAcknowledgeListCopy.iterator();
492
Callback cb = (Callback) it.next();
493
Map map = cb.getReturnPayload();
494
log.trace("acknowledging: " + map.get(ServerInvokerCallbackHandler.CALLBACK_ID));
497
client.acknowledgeCallbacks(callbackHandler, toAcknowledgeListCopy);
498
toAcknowledgeListCopy.clear();
502
log.error("Error acknowledging callback for callback handler (" + callbackHandler + ").", t);
508
* Once CallbackPoller.stop() has called AcknowledgeThread.shutdown(), HandleThread
509
* has terminated and no additional callbacks will be added to toAcknowledgeList.
510
* shutdown() will not return until AcknowledgeThread has acknowledged all callbacks
511
* in toAcknowledgeList.
513
* Either run() or shutdown() will enter its own synchronized block first.
515
* case 1): run() enters its synchronized block first:
516
* If toAcknowledgeList is empty, then run() will reach toAcknowledgeList.wait(),
517
* shutdown() will wake up run(), and run() will exit. If toAcknowledgeList is not
518
* empty, then run() will process all callbacks in toAcknowledgeList and return to
519
* its synchronized block. At this point, either case 1) (with toAcknowledgeList
520
* empty) or case 2) applies.
522
* case 2): shutdown() enters its synchronized block first:
523
* run() will process all callbacks in toAcknowledgeList and return to its
524
* synchronized block. After shutdown() reaches toAcknowledgeList.wait(), run()
525
* will enter its synchronized block, find running == false and toAcknowledgeList
526
* empty, and it will exit.
528
public void shutdown()
530
log.debug(this + " is shutting down");
531
synchronized (toAcknowledgeList)
534
toAcknowledgeList.notify();
539
toAcknowledgeList.wait();
541
catch (InterruptedException ignored) {}
544
log.debug(this + " has shut down");
550
private void checkForAcknowledgeRequest(Callback callback)
552
Map returnPayload = callback.getReturnPayload();
553
if (returnPayload != null)
555
Object callbackId = returnPayload.get(ServerInvokerCallbackHandler.CALLBACK_ID);
556
if (callbackId != null)
558
Object o = returnPayload.get(ServerInvokerCallbackHandler.REMOTING_ACKNOWLEDGES_PUSH_CALLBACKS);
559
if (o instanceof String && Boolean.valueOf((String)o).booleanValue() ||
560
o instanceof Boolean && ((Boolean)o).booleanValue())
562
synchronized (toAcknowledgeList)
564
toAcknowledgeList.add(callback);
565
if (toAcknowledgeList.size() == 1)
567
if (acknowledgeThread == null)
569
acknowledgeThread = new AcknowledgeThread("AcknowledgeThread");
570
acknowledgeThread.start();
574
toAcknowledgeList.notify();
584
private void configureParameters()
586
Map config = new HashMap();
587
ClientInvoker invoker = client.getInvoker();
590
config.putAll(invoker.getLocator().getParameters());
592
config.putAll(client.getConfiguration());
593
config.putAll(metadata);
595
Object val = config.get(Client.USE_ALL_PARAMS);
598
if (val instanceof String)
600
useAllParams = Boolean.valueOf((String) val).booleanValue();
604
log.warn("Value for " + Client.USE_ALL_PARAMS + " must be of type " +
605
String.class.getName() + " and is " + val.getClass().getName());
608
log.debug(this + ": useAllParams: " + useAllParams);
614
val = config.get(ServerInvoker.BLOCKING_MODE);
617
if (val instanceof String)
619
if (ServerInvoker.BLOCKING.equals(val))
622
synchronizedShutdown = false;
624
else if (ServerInvoker.NONBLOCKING.equals(val))
627
synchronizedShutdown = true;
631
log.warn("Value for " + ServerInvoker.BLOCKING_MODE +
632
" configuration is " + val + ". Must be either " +
633
ServerInvoker.BLOCKING + " or " + ServerInvoker.NONBLOCKING +
634
". Using " + ServerInvoker.BLOCKING + ".");
639
log.warn("Value for " + ServerInvoker.BLOCKING_MODE +
640
" configuration must be of type " + String.class.getName() +
641
" and is of type " + val.getClass().getName());
645
// Default blocking mode on server is nonblocking.
647
metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCKING);
649
val = config.get(ServerInvoker.BLOCKING_TIMEOUT);
652
if (val instanceof String)
656
int blockingTimeout = Integer.parseInt((String) val);
657
metadata.put(ServerInvoker.TIMEOUT, Integer.toString(blockingTimeout));
659
catch (NumberFormatException e)
661
log.warn("Error converting " + ServerInvoker.BLOCKING_TIMEOUT + " to type long. " + e.getMessage());
666
log.warn("Value for " + ServerInvoker.BLOCKING_TIMEOUT + " configuration must be of type " + String.class.getName() +
667
" and is " + val.getClass().getName());
671
val = config.get(SYNCHRONIZED_SHUTDOWN);
674
if (val instanceof String)
676
synchronizedShutdown = Boolean.valueOf((String) val).booleanValue();
680
log.warn("Value for " + SYNCHRONIZED_SHUTDOWN + " must be of type " + String.class.getName() +
681
" and is " + val.getClass().getName());
685
val = config.get(CALLBACK_POLL_PERIOD);
688
if (val instanceof String)
692
pollPeriod = Long.parseLong((String) val);
694
catch (NumberFormatException e)
696
log.warn("Error converting " + CALLBACK_POLL_PERIOD + " to type long. " + e.getMessage());
701
log.warn("Value for " + CALLBACK_POLL_PERIOD + " configuration must be of type " + String.class.getName() +
702
" and is " + val.getClass().getName());
705
val = config.get(CALLBACK_SCHEDULE_MODE);
708
if (val instanceof String)
710
if (SCHEDULE_FIXED_DELAY.equals(val) || SCHEDULE_FIXED_RATE.equals(val))
712
scheduleMode = (String) val;
716
log.warn("Unrecognized value for " + CALLBACK_SCHEDULE_MODE + ": " + val);
717
log.warn("Using " + scheduleMode);
722
log.warn("Value for " + CALLBACK_SCHEDULE_MODE + " must be of type " + String.class.getName() +
723
" and is " + val.getClass().getName());
726
val = config.get(MAX_ERROR_COUNT);
729
if (val instanceof String)
733
maxErrorCount = Integer.parseInt((String) val);
735
catch (NumberFormatException e)
737
log.warn("Error converting " + MAX_ERROR_COUNT + " to type int. " + e.getMessage());
742
log.warn("Value for " + MAX_ERROR_COUNT + " configuration must be of type " + String.class.getName() +
743
" and is " + val.getClass().getName());
746
if (config.get(REPORT_STATISTICS) != null)
748
reportStatistics = true;
753
private void reportStatistics(List callbacks)
756
int toAcknowledge = 0;
758
synchronized (toHandleList)
760
toHandle = toHandleList.size() + handleThread.toHandleListCopy.size();
763
synchronized (toAcknowledgeList)
765
if (acknowledgeThread != null)
766
toAcknowledge = toAcknowledgeList.size() + acknowledgeThread.toAcknowledgeListCopy.size();
769
StringBuffer message = new StringBuffer("\n");
770
message.append("================================\n")
771
.append(" retrieved " + callbacks.size() + " callbacks\n")
772
.append(" callbacks waiting to be processed: " + toHandle + "\n")
773
.append(" callbacks waiting to be acknowledged: " + toAcknowledge + "\n")
774
.append("================================");
780
* The key value to use in metadata Map to specify the desired scheduling mode.
782
public static final String CALLBACK_SCHEDULE_MODE = "scheduleMode";
b'\\ No newline at end of file'