~ubuntu-branches/ubuntu/raring/libjboss-remoting-java/raring

« back to all changes in this revision

Viewing changes to src/main/org/jboss/remoting/callback/CallbackPoller.java

  • Committer: Package Import Robot
  • Author(s): Torsten Werner
  • Date: 2011-09-09 14:01:03 UTC
  • mfrom: (1.1.6 upstream)
  • Revision ID: package-import@ubuntu.com-20110909140103-hqokx61534tas9rg
Tags: 2.5.3.SP1-1
* Newer but not newest upstream release. Do not build samples.
* Change debian/watch to upstream's svn repo.
* Add patch to fix compile error caused by tomcat update.
  (Closes: #628303)
* Switch to source format 3.0.
* Switch to debhelper level 7.
* Remove useless Depends.
* Update Standards-Version: 3.9.2.
* Update README.source.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
* JBoss, Home of Professional Open Source
 
3
* Copyright 2005, JBoss Inc., and individual contributors as indicated
 
4
* by the @authors tag. See the copyright.txt in the distribution for a
 
5
* full listing of individual contributors.
 
6
*
 
7
* This is free software; you can redistribute it and/or modify it
 
8
* under the terms of the GNU Lesser General Public License as
 
9
* published by the Free Software Foundation; either version 2.1 of
 
10
* the License, or (at your option) any later version.
 
11
*
 
12
* This software is distributed in the hope that it will be useful,
 
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
 
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 
15
* Lesser General Public License for more details.
 
16
*
 
17
* You should have received a copy of the GNU Lesser General Public
 
18
* License along with this software; if not, write to the Free
 
19
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 
20
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 
21
*/
 
22
package org.jboss.remoting.callback;
 
23
 
 
24
import java.util.ArrayList;
 
25
import java.util.HashMap;
 
26
import java.util.Iterator;
 
27
import java.util.List;
 
28
import java.util.Map;
 
29
import java.util.Timer;
 
30
import java.util.TimerTask;
 
31
 
 
32
import org.jboss.logging.Logger;
 
33
import org.jboss.remoting.Client;
 
34
import org.jboss.remoting.ServerInvoker;
 
35
import org.jboss.remoting.transport.ClientInvoker;
 
36
 
 
37
/**
 
38
 * CallbackPoller is used to simulate push callbacks on transports that don't support
 
39
 * bidirectional connections.  It will periodically pull callbacks from the server
 
40
 * and pass them to the InvokerCallbackHandler.
 
41
 *
 
42
 * @author <a href="mailto:tom.elrod@jboss.com">Tom Elrod</a>
 
43
 * @author <a href="mailto:ron.sigal@jboss.com">Ron Sigal</a>
 
44
 */
 
45
public class CallbackPoller extends TimerTask implements Runnable
 
46
{
 
47
   /*
 
48
    * Implementation note.
 
49
    *
 
50
    * CallbackPoller uses two, or possibly three, threads.  The first thread is the
 
51
    * Timer thread, which periodically pulls callbacks from the server and adds them
 
52
    * to toHandleList.  The second thread takes callbacks from toHandleList, passes
 
53
    * them to the CallbackHandler, and, if an acknowledgement is requested for a
 
54
    * callback, it adds the callback to toAcknowledgeList.  The third thread, which is
 
55
    * created in response to the first callback for which an acknowledgement is requested,
 
56
    * takes the contents of toAcknowledgeList and acknowledges them in a batch.
 
57
    *
 
58
    * CallbackPoller will not shut down until all received callbacks have been processed
 
59
    * by the CallbackHandler and acknowledgements have been sent for all callbacks for
 
60
    * which acknowledgements have been requested.
 
61
    */
 
62
 
 
63
   /**
 
64
    * Default polling period for getting callbacks from the server.
 
65
    * Default is 5000 milliseconds.
 
66
    */
 
67
   public static final long DEFAULT_POLL_PERIOD = 5000;
 
68
   
 
69
   /**
 
70
    * Default timeout for getting callbacks in blocking mode.
 
71
    * Default is 5000 milliseconds.
 
72
    */
 
73
   public static final int DEFAULT_BLOCKING_TIMEOUT = 5000;
 
74
   
 
75
   /**
 
76
    * Default number of exceptions before callback polling wil be shut down.
 
77
    * Default is 5.
 
78
    */
 
79
   public static final int DEFAULT_MAX_ERROR_COUNT = 5;
 
80
   
 
81
   /**
 
82
    * The key value to use to specify if stop() should wait for the call to
 
83
    * org.jboss.remoting.Client.getCallbacks() should return.  The default 
 
84
    * behavior is do a synchronized shutdown for nonblocking callbacks and
 
85
    * a nonsynchronized shutdown for blocking callbacks.
 
86
    */
 
87
   public static final String SYNCHRONIZED_SHUTDOWN = "doSynchronizedShutdown";
 
88
   
 
89
   /**
 
90
    * The key value to use to specify the desired poll period
 
91
    * within the metadata Map.
 
92
    */
 
93
   public static final String CALLBACK_POLL_PERIOD = "callbackPollPeriod";
 
94
 
 
95
   /** Use java.util.timer.schedule(). */
 
96
   public static final String SCHEDULE_FIXED_RATE = "scheduleFixedRate";
 
97
 
 
98
   /** Use java.util.timer.scheduleAtFixedRate(). */
 
99
   public static final String SCHEDULE_FIXED_DELAY = "scheduleFixedDelay";
 
100
   
 
101
   /**
 
102
    * The key to use to specify the number of errors before callback polling
 
103
    * will be shut down.
 
104
    */
 
105
   public static final String MAX_ERROR_COUNT = "maxErrorCount";
 
106
 
 
107
   /** The key to use in metadata Map to request statistics.  The associated
 
108
    *  is ignored. */
 
109
   public static final String REPORT_STATISTICS = "reportStatistics";
 
110
 
 
111
   private Client client = null;
 
112
   private InvokerCallbackHandler callbackHandler = null;
 
113
   private Map metadata = null;
 
114
   private Object callbackHandlerObject = null;
 
115
   private boolean blocking = false;
 
116
   private boolean synchronizedShutdown = false;
 
117
   private long pollPeriod = DEFAULT_POLL_PERIOD;
 
118
   private Timer timer;
 
119
   private String scheduleMode = SCHEDULE_FIXED_RATE;
 
120
   private boolean reportStatistics;
 
121
   private boolean running;
 
122
   private int maxErrorCount = -1;
 
123
   private int errorCount;
 
124
   private boolean useAllParams;
 
125
 
 
126
   private ArrayList toHandleList = new ArrayList();
 
127
   private ArrayList toAcknowledgeList = new ArrayList();
 
128
   private HandleThread handleThread;
 
129
   private AcknowledgeThread acknowledgeThread;
 
130
   private BlockingPollerThread blockingPollerThread;
 
131
 
 
132
   private static final Logger log = Logger.getLogger(CallbackPoller.class);
 
133
 
 
134
 
 
135
   public CallbackPoller(Client client, InvokerCallbackHandler callbackhandler, Map metadata, Object callbackHandlerObject)
 
136
   {
 
137
      this.client = client;
 
138
      this.callbackHandler = callbackhandler;
 
139
      this.metadata = new HashMap(metadata);
 
140
      this.callbackHandlerObject = callbackHandlerObject;
 
141
   }
 
142
 
 
143
   public void start() throws Exception
 
144
   {
 
145
      if (callbackHandler == null)
 
146
      {
 
147
         throw new NullPointerException("Can not poll for callbacks when InvokerCallbackHandler is null.");
 
148
      }
 
149
      if (client != null)
 
150
      {
 
151
         client.connect();
 
152
      }
 
153
      else
 
154
      {
 
155
         throw new NullPointerException("Can not poll for callbacks when Client is null.");
 
156
      }
 
157
 
 
158
      configureParameters();
 
159
 
 
160
      handleThread = new HandleThread("HandleThread");
 
161
      handleThread.start();
 
162
      if (log.isTraceEnabled()) log.trace("blocking: " + blocking);
 
163
      if (blocking)
 
164
      {
 
165
         if (maxErrorCount == -1)
 
166
            maxErrorCount = DEFAULT_MAX_ERROR_COUNT;
 
167
         
 
168
         running = true;
 
169
         metadata.put(Client.THROW_CALLBACK_EXCEPTION, "true");
 
170
         blockingPollerThread = new BlockingPollerThread();
 
171
         blockingPollerThread.start();
 
172
      }
 
173
      else
 
174
      {
 
175
         timer = new Timer(true);
 
176
         if (SCHEDULE_FIXED_DELAY.equals(scheduleMode))
 
177
            timer.schedule(this, pollPeriod, pollPeriod);
 
178
         else
 
179
            timer.scheduleAtFixedRate(this, pollPeriod, pollPeriod);
 
180
      }
 
181
   }
 
182
 
 
183
   public synchronized void run()
 
184
   {
 
185
      // need to pull callbacks from server and give them to callback handler
 
186
      try
 
187
      {
 
188
         if (log.isTraceEnabled()) log.trace(this + " getting callbacks for " + callbackHandler);
 
189
         List callbacks = client.getCallbacks(callbackHandler, metadata);
 
190
         if (log.isTraceEnabled()) log.trace(this + " callback count: " + (callbacks == null ? 0 : callbacks.size()));
 
191
 
 
192
         if (callbacks != null && callbacks.size() > 0)
 
193
         {
 
194
            synchronized (toHandleList)
 
195
            {
 
196
               toHandleList.addAll(callbacks);
 
197
               if (toHandleList.size() == callbacks.size())
 
198
                  toHandleList.notify();
 
199
            }
 
200
         }
 
201
 
 
202
         if (reportStatistics)
 
203
            reportStatistics(callbacks);
 
204
      }
 
205
      catch (Throwable throwable)
 
206
      {
 
207
         if (!running)
 
208
         {
 
209
            stop();
 
210
            return;
 
211
         }
 
212
         
 
213
         log.info(this + " Error getting callbacks from server.");
 
214
         log.debug(this + " Error getting callbacks from server.", throwable);
 
215
         String errorMessage = throwable.getMessage();
 
216
         if (errorMessage != null)
 
217
         {
 
218
            if (errorMessage.startsWith("Could not find listener id"))
 
219
            {
 
220
               log.error("Client no longer has InvokerCallbackHandler (" + 
 
221
                          callbackHandler +
 
222
                         ") registered.  Shutting down callback polling");
 
223
               stop();
 
224
               return;
 
225
            }
 
226
            if (errorMessage.startsWith("Can not make remoting client invocation " +
 
227
                                        "due to not being connected to server."))
 
228
            {
 
229
               log.error("Client no longer connected.  Shutting down callback polling");
 
230
               stop();
 
231
               return;
 
232
            }
 
233
         }
 
234
         if (maxErrorCount >= 0)
 
235
         {
 
236
            if (++errorCount > maxErrorCount)
 
237
            {
 
238
               log.error("Error limit of " + maxErrorCount + 
 
239
                          " exceeded.  Shutting down callback polling");
 
240
               stop();
 
241
               return;
 
242
            }
 
243
         }
 
244
      }
 
245
   }
 
246
   
 
247
   public void stop()
 
248
   {
 
249
      stop(-1);
 
250
   }
 
251
 
 
252
   /**
 
253
    * stop() will not return until all received callbacks have been processed
 
254
    * by the CallbackHandler and acknowledgements have been sent for all callbacks for
 
255
    * which acknowledgements have been requested.
 
256
    */
 
257
   public void stop(int timeout)
 
258
   {
 
259
      log.debug(this + " is shutting down");
 
260
      running = false;
 
261
      
 
262
      if (!blocking)
 
263
      {
 
264
         cancel();
 
265
         
 
266
         if (timer != null)
 
267
         {
 
268
            timer.cancel();
 
269
            timer = null;
 
270
         }
 
271
      }
 
272
      
 
273
      if (timeout == 0)
 
274
         return;
 
275
      
 
276
      if (synchronizedShutdown)
 
277
      {
 
278
         // run() and stop() are synchronized so that stop() will wait until run() has finished
 
279
         // adding any callbacks it has received to toHandleList.  Therefore, once cancel()
 
280
         // returns, no more callbacks will arrive from the server.
 
281
         synchronized (this)
 
282
         {
 
283
            shutdown();
 
284
         }
 
285
      }
 
286
      else
 
287
      {
 
288
         shutdown();
 
289
      }
 
290
 
 
291
      log.debug(this + " has shut down");
 
292
   }
 
293
 
 
294
   
 
295
   private void shutdown()
 
296
   {
 
297
      // HandleThread.shutdown() will not return until all received callbacks have been
 
298
      // processed and, if necessary, added to toAcknowledgeList.
 
299
      if (handleThread != null)
 
300
      {
 
301
         handleThread.shutdown();
 
302
         handleThread = null;
 
303
      }
 
304
 
 
305
      // AcknowledgeThread.shutdown() will not return until acknowledgements have been sent
 
306
      // for all callbacks for which acknowledgements have been requested.
 
307
      if (acknowledgeThread != null)
 
308
      {
 
309
         acknowledgeThread.shutdown();
 
310
         acknowledgeThread = null;
 
311
      }
 
312
   }
 
313
   
 
314
   
 
315
   class BlockingPollerThread extends Thread
 
316
   {
 
317
      public BlockingPollerThread()
 
318
      {
 
319
         String threadName = getName();
 
320
         int i = threadName.indexOf('-');
 
321
         String threadNumber = null;
 
322
         if (i >= 0)
 
323
            threadNumber = threadName.substring(i+1);
 
324
         else
 
325
            threadNumber = Long.toString(System.currentTimeMillis());
 
326
         String pollerString = CallbackPoller.this.toString();
 
327
         String address = pollerString.substring(pollerString.indexOf('@'));
 
328
         setName("CallbackPoller:" + threadNumber + "[" + address + "]");
 
329
         setDaemon(true);
 
330
      }
 
331
 
 
332
      public void run()
 
333
      {
 
334
         while (running)
 
335
         {
 
336
            CallbackPoller.this.run();
 
337
         }
 
338
      }
 
339
   }  
 
340
 
 
341
   
 
342
   class HandleThread extends Thread
 
343
   {
 
344
      boolean running = true;
 
345
      boolean done;
 
346
      ArrayList toHandleListCopy = new ArrayList();
 
347
      Callback callback;
 
348
 
 
349
      HandleThread(String name)
 
350
      {
 
351
         super(name);
 
352
      }
 
353
      public void run()
 
354
      {
 
355
         while (true)
 
356
         {
 
357
            synchronized (toHandleList)
 
358
            {
 
359
               if (toHandleList.isEmpty() && running)
 
360
               {
 
361
                  try
 
362
                  {
 
363
                     toHandleList.wait();
 
364
                  }
 
365
                  catch (InterruptedException e)
 
366
                  {
 
367
                     log.debug("unexpected interrupt");
 
368
                     continue;
 
369
                  }
 
370
               }
 
371
 
 
372
               // If toHandleList is empty, then running must be false.  We return
 
373
               // only when both conditions are true.
 
374
               if (toHandleList.isEmpty())
 
375
               {
 
376
                  done = true;
 
377
                  toHandleList.notify();
 
378
                  return;
 
379
               }
 
380
 
 
381
               toHandleListCopy.addAll(toHandleList);
 
382
               toHandleList.clear();
 
383
            }
 
384
 
 
385
            while (!toHandleListCopy.isEmpty())
 
386
            {
 
387
               try
 
388
               {
 
389
                  callback = (Callback) toHandleListCopy.remove(0);
 
390
                  callback.setCallbackHandleObject(callbackHandlerObject);
 
391
                  callbackHandler.handleCallback(callback);
 
392
               }
 
393
               catch (HandleCallbackException e)
 
394
               {
 
395
                  log.error("Error delivering callback to callback handler (" + callbackHandler + ").", e);
 
396
               }
 
397
 
 
398
               checkForAcknowledgeRequest(callback);
 
399
            }
 
400
         }
 
401
      }
 
402
 
 
403
      /**
 
404
       *  Once CallbackPoller.stop() has called HandleThread.shutdown(), CallbackPoller.run()
 
405
       *  has terminated and no additional callbacks will be received.  shutdown() will
 
406
       *  not return until HandleThread has processed all received callbacks.
 
407
       *
 
408
       *  Either run() or shutdown() will enter its own synchronized block first.
 
409
       *
 
410
       *  case 1): run() enters its synchronized block first:
 
411
       *     If toHandleList is empty, then run() will reach toHandleList.wait(), shutdown()
 
412
       *     will wake up run(), and run() will exit.  If toHandleList is not empty, then run()
 
413
       *     will process all outstanding callbacks and return to its synchronized block.  At
 
414
       *     this point, either case 1) (with toHandleList empty) or case 2) applies.
 
415
       *
 
416
       *  case 2): shutdown() enters its synchronized block first:
 
417
       *     run() will process all outstanding callbacks and return to its synchronized block.
 
418
       *     After shutdown() reaches toHandleList.wait(), run() will enter its synchronized
 
419
       *     block, find running == false and toHandleList empty, and it will exit.
 
420
       */
 
421
      protected void shutdown()
 
422
      {
 
423
         log.debug(this + " is shutting down");
 
424
         synchronized (toHandleList)
 
425
         {
 
426
            running = false;
 
427
            toHandleList.notify();
 
428
            while (!done)
 
429
            {
 
430
               try
 
431
               {
 
432
                  toHandleList.wait();
 
433
               }
 
434
               catch (InterruptedException ignored) {}
 
435
            }
 
436
         }
 
437
         log.debug(this + " has shut down");
 
438
         return;
 
439
      }
 
440
   }
 
441
 
 
442
 
 
443
   class AcknowledgeThread extends Thread
 
444
   {
 
445
      boolean running = true;
 
446
      boolean done;
 
447
      ArrayList toAcknowledgeListCopy = new ArrayList();
 
448
 
 
449
      AcknowledgeThread(String name)
 
450
      {
 
451
         super(name);
 
452
      }
 
453
      public void run()
 
454
      {
 
455
         while (true)
 
456
         {
 
457
            synchronized (toAcknowledgeList)
 
458
            {
 
459
               while (toAcknowledgeList.isEmpty() && running)
 
460
               {
 
461
                  try
 
462
                  {
 
463
                     toAcknowledgeList.wait();
 
464
                  }
 
465
                  catch (InterruptedException e)
 
466
                  {
 
467
                     log.debug("unexpected interrupt");
 
468
                     continue;
 
469
                  }
 
470
               }
 
471
 
 
472
               // If toAcknowledgeList is empty, then running must be false.  We return
 
473
               // only when both conditions are true.
 
474
               if (toAcknowledgeList.isEmpty())
 
475
               {
 
476
                  done = true;
 
477
                  toAcknowledgeList.notify();
 
478
                  return;
 
479
               }
 
480
 
 
481
               toAcknowledgeListCopy.addAll(toAcknowledgeList);
 
482
               toAcknowledgeList.clear();
 
483
            }
 
484
 
 
485
            try
 
486
            {
 
487
               if (log.isTraceEnabled())
 
488
               {
 
489
                  Iterator it = toAcknowledgeListCopy.iterator();
 
490
                  while (it.hasNext())
 
491
                  {
 
492
                     Callback cb = (Callback) it.next();
 
493
                     Map map = cb.getReturnPayload();
 
494
                     log.trace("acknowledging: " + map.get(ServerInvokerCallbackHandler.CALLBACK_ID));
 
495
                  }
 
496
               }
 
497
               client.acknowledgeCallbacks(callbackHandler, toAcknowledgeListCopy);
 
498
               toAcknowledgeListCopy.clear();
 
499
            }
 
500
            catch (Throwable t)
 
501
            {
 
502
               log.error("Error acknowledging callback for callback handler (" + callbackHandler + ").", t);
 
503
            }
 
504
         }
 
505
      }
 
506
 
 
507
      /**
 
508
       *  Once CallbackPoller.stop() has called AcknowledgeThread.shutdown(), HandleThread
 
509
       *  has terminated and no additional callbacks will be added to toAcknowledgeList.
 
510
       *  shutdown() will not return until AcknowledgeThread has acknowledged all callbacks
 
511
       *  in toAcknowledgeList.
 
512
       *
 
513
       *  Either run() or shutdown() will enter its own synchronized block first.
 
514
       *
 
515
       *  case 1): run() enters its synchronized block first:
 
516
       *     If toAcknowledgeList is empty, then run() will reach toAcknowledgeList.wait(),
 
517
       *     shutdown() will wake up run(), and run() will exit.  If toAcknowledgeList is not
 
518
       *     empty, then run() will process all callbacks in toAcknowledgeList and return to
 
519
       *     its synchronized block.  At this point, either case 1) (with toAcknowledgeList
 
520
       *     empty) or case 2) applies.
 
521
       *
 
522
       *  case 2): shutdown() enters its synchronized block first:
 
523
       *     run() will process all callbacks in toAcknowledgeList and return to its
 
524
       *     synchronized block.  After shutdown() reaches toAcknowledgeList.wait(), run()
 
525
       *     will enter its synchronized block, find running == false and toAcknowledgeList
 
526
       *     empty, and it will exit.
 
527
       */
 
528
      public void shutdown()
 
529
      {
 
530
         log.debug(this + " is shutting down");      
 
531
         synchronized (toAcknowledgeList)
 
532
         {
 
533
            running = false;
 
534
            toAcknowledgeList.notify();
 
535
            while (!done)
 
536
            {
 
537
               try
 
538
               {
 
539
                  toAcknowledgeList.wait();
 
540
               }
 
541
               catch (InterruptedException ignored) {}
 
542
            }
 
543
         }
 
544
         log.debug(this + " has shut down");
 
545
         return;
 
546
      }
 
547
   }
 
548
 
 
549
 
 
550
   private void checkForAcknowledgeRequest(Callback callback)
 
551
   {
 
552
      Map returnPayload = callback.getReturnPayload();
 
553
      if (returnPayload != null)
 
554
      {
 
555
         Object callbackId = returnPayload.get(ServerInvokerCallbackHandler.CALLBACK_ID);
 
556
         if (callbackId != null)
 
557
         {
 
558
            Object o = returnPayload.get(ServerInvokerCallbackHandler.REMOTING_ACKNOWLEDGES_PUSH_CALLBACKS);
 
559
            if (o instanceof String  && Boolean.valueOf((String)o).booleanValue() ||
 
560
                o instanceof Boolean && ((Boolean)o).booleanValue())
 
561
            {
 
562
               synchronized (toAcknowledgeList)
 
563
               {
 
564
                  toAcknowledgeList.add(callback);
 
565
                  if (toAcknowledgeList.size() == 1)
 
566
                  {
 
567
                     if (acknowledgeThread == null)
 
568
                     {
 
569
                        acknowledgeThread = new AcknowledgeThread("AcknowledgeThread");
 
570
                        acknowledgeThread.start();
 
571
                     }
 
572
                     else
 
573
                     {
 
574
                        toAcknowledgeList.notify();
 
575
                     }
 
576
                  }
 
577
               }
 
578
            }
 
579
         }
 
580
      }
 
581
   }
 
582
   
 
583
   
 
584
   private void configureParameters()
 
585
   {
 
586
      Map config = new HashMap();
 
587
      ClientInvoker invoker = client.getInvoker();
 
588
      if (invoker != null)
 
589
      {
 
590
         config.putAll(invoker.getLocator().getParameters());
 
591
      }
 
592
      config.putAll(client.getConfiguration());
 
593
      config.putAll(metadata);
 
594
      
 
595
      Object val = config.get(Client.USE_ALL_PARAMS);
 
596
      if (val != null)
 
597
      {
 
598
         if (val instanceof String)
 
599
         {
 
600
            useAllParams = Boolean.valueOf((String) val).booleanValue();
 
601
         }
 
602
         else
 
603
         {
 
604
            log.warn("Value for " + Client.USE_ALL_PARAMS + " must be of type " +
 
605
                     String.class.getName() + " and is " + val.getClass().getName());
 
606
         }
 
607
      }
 
608
      log.debug(this + ": useAllParams: " + useAllParams);
 
609
      if (!useAllParams)
 
610
      {
 
611
         config = metadata;
 
612
      }
 
613
      
 
614
      val = config.get(ServerInvoker.BLOCKING_MODE);
 
615
      if (val != null)
 
616
      {
 
617
         if (val instanceof String)
 
618
         {
 
619
            if (ServerInvoker.BLOCKING.equals(val))
 
620
            {
 
621
               blocking = true;
 
622
               synchronizedShutdown = false;
 
623
            }
 
624
            else if (ServerInvoker.NONBLOCKING.equals(val))
 
625
            {
 
626
               blocking = false;
 
627
               synchronizedShutdown = true;
 
628
            }
 
629
            else
 
630
            {
 
631
               log.warn("Value for " + ServerInvoker.BLOCKING_MODE + 
 
632
                     " configuration is " + val + ". Must be either " +
 
633
                     ServerInvoker.BLOCKING + " or " + ServerInvoker.NONBLOCKING +
 
634
                     ". Using " + ServerInvoker.BLOCKING + ".");
 
635
            }
 
636
         }
 
637
         else
 
638
         {
 
639
            log.warn("Value for " + ServerInvoker.BLOCKING_MODE + 
 
640
                  " configuration must be of type " + String.class.getName() +
 
641
                  " and is of type " + val.getClass().getName());
 
642
         }
 
643
      }
 
644
 
 
645
      // Default blocking mode on server is nonblocking.
 
646
      if (blocking)
 
647
         metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCKING);
 
648
 
 
649
      val = config.get(ServerInvoker.BLOCKING_TIMEOUT);
 
650
      if (val != null)
 
651
      {
 
652
         if (val instanceof String)
 
653
         {
 
654
            try
 
655
            {
 
656
               int blockingTimeout = Integer.parseInt((String) val);
 
657
               metadata.put(ServerInvoker.TIMEOUT, Integer.toString(blockingTimeout));
 
658
            }
 
659
            catch (NumberFormatException e)
 
660
            {
 
661
               log.warn("Error converting " + ServerInvoker.BLOCKING_TIMEOUT + " to type long.  " + e.getMessage());
 
662
            }
 
663
         }
 
664
         else
 
665
         {
 
666
            log.warn("Value for " + ServerInvoker.BLOCKING_TIMEOUT + " configuration must be of type " + String.class.getName() +
 
667
                  " and is " + val.getClass().getName());
 
668
         }
 
669
      }
 
670
 
 
671
      val = config.get(SYNCHRONIZED_SHUTDOWN);
 
672
      if (val != null)
 
673
      {
 
674
         if (val instanceof String)
 
675
         {
 
676
            synchronizedShutdown = Boolean.valueOf((String) val).booleanValue();
 
677
         }
 
678
         else
 
679
         {
 
680
            log.warn("Value for " + SYNCHRONIZED_SHUTDOWN + " must be of type " + String.class.getName() +
 
681
                  " and is " + val.getClass().getName());
 
682
         }
 
683
      }
 
684
 
 
685
      val = config.get(CALLBACK_POLL_PERIOD);
 
686
      if (val != null)
 
687
      {
 
688
         if (val instanceof String)
 
689
         {
 
690
            try
 
691
            {
 
692
               pollPeriod = Long.parseLong((String) val);
 
693
            }
 
694
            catch (NumberFormatException e)
 
695
            {
 
696
               log.warn("Error converting " + CALLBACK_POLL_PERIOD + " to type long.  " + e.getMessage());
 
697
            }
 
698
         }
 
699
         else
 
700
         {
 
701
            log.warn("Value for " + CALLBACK_POLL_PERIOD + " configuration must be of type " + String.class.getName() +
 
702
                  " and is " + val.getClass().getName());
 
703
         }
 
704
      }
 
705
      val = config.get(CALLBACK_SCHEDULE_MODE);
 
706
      if (val != null)
 
707
      {
 
708
         if (val instanceof String)
 
709
         {
 
710
            if (SCHEDULE_FIXED_DELAY.equals(val) || SCHEDULE_FIXED_RATE.equals(val))
 
711
            {
 
712
               scheduleMode = (String) val;
 
713
            }
 
714
            else
 
715
            {
 
716
               log.warn("Unrecognized value for " + CALLBACK_SCHEDULE_MODE + ": " + val);
 
717
               log.warn("Using " + scheduleMode);
 
718
            }
 
719
         }
 
720
         else
 
721
         {
 
722
            log.warn("Value for " + CALLBACK_SCHEDULE_MODE + " must be of type " + String.class.getName() +
 
723
                  " and is " + val.getClass().getName());
 
724
         }
 
725
      }
 
726
      val = config.get(MAX_ERROR_COUNT);
 
727
      if (val != null)
 
728
      {
 
729
         if (val instanceof String)
 
730
         {
 
731
            try
 
732
            {
 
733
               maxErrorCount = Integer.parseInt((String) val);
 
734
            }
 
735
            catch (NumberFormatException e)
 
736
            {
 
737
               log.warn("Error converting " + MAX_ERROR_COUNT + " to type int.  " + e.getMessage());
 
738
            }
 
739
         }
 
740
         else
 
741
         {
 
742
            log.warn("Value for " + MAX_ERROR_COUNT + " configuration must be of type " + String.class.getName() +
 
743
                  " and is " + val.getClass().getName());
 
744
         }
 
745
      }
 
746
      if (config.get(REPORT_STATISTICS) != null)
 
747
      {
 
748
         reportStatistics = true;
 
749
      }
 
750
   }
 
751
 
 
752
 
 
753
   private void reportStatistics(List callbacks)
 
754
   {
 
755
      int toHandle;
 
756
      int toAcknowledge = 0;
 
757
 
 
758
      synchronized (toHandleList)
 
759
      {
 
760
         toHandle = toHandleList.size() + handleThread.toHandleListCopy.size();
 
761
      }
 
762
 
 
763
      synchronized (toAcknowledgeList)
 
764
      {
 
765
         if (acknowledgeThread != null)
 
766
            toAcknowledge = toAcknowledgeList.size() + acknowledgeThread.toAcknowledgeListCopy.size();
 
767
      }
 
768
 
 
769
      StringBuffer message = new StringBuffer("\n");
 
770
      message.append("================================\n")
 
771
             .append("  retrieved " + callbacks.size() + " callbacks\n")
 
772
             .append("  callbacks waiting to be processed: " + toHandle + "\n")
 
773
             .append("  callbacks waiting to be acknowledged: " + toAcknowledge + "\n")
 
774
             .append("================================");
 
775
      log.info(message);
 
776
   }
 
777
 
 
778
 
 
779
   /**
 
780
    * The key value to use in metadata Map to specify the desired scheduling mode. 
 
781
    */
 
782
   public static final String CALLBACK_SCHEDULE_MODE = "scheduleMode";
 
783
}
 
 
b'\\ No newline at end of file'