~ubuntu-branches/ubuntu/oneiric/libjboss-remoting-java/oneiric

« back to all changes in this revision

Viewing changes to src/org/jboss/remoting/callback/CallbackPoller.java

  • Committer: Package Import Robot
  • Author(s): Torsten Werner
  • Date: 2011-09-09 14:01:03 UTC
  • mfrom: (1.1.6 upstream)
  • Revision ID: package-import@ubuntu.com-20110909140103-hqokx61534tas9rg
Tags: 2.5.3.SP1-1
* Newer but not newest upstream release. Do not build samples.
* Change debian/watch to upstream's svn repo.
* Add patch to fix compile error caused by tomcat update.
  (Closes: #628303)
* Switch to source format 3.0.
* Switch to debhelper level 7.
* Remove useless Depends.
* Update Standards-Version: 3.9.2.
* Update README.source.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*
2
 
* JBoss, Home of Professional Open Source
3
 
* Copyright 2005, JBoss Inc., and individual contributors as indicated
4
 
* by the @authors tag. See the copyright.txt in the distribution for a
5
 
* full listing of individual contributors.
6
 
*
7
 
* This is free software; you can redistribute it and/or modify it
8
 
* under the terms of the GNU Lesser General Public License as
9
 
* published by the Free Software Foundation; either version 2.1 of
10
 
* the License, or (at your option) any later version.
11
 
*
12
 
* This software is distributed in the hope that it will be useful,
13
 
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
 
* Lesser General Public License for more details.
16
 
*
17
 
* You should have received a copy of the GNU Lesser General Public
18
 
* License along with this software; if not, write to the Free
19
 
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20
 
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21
 
*/
22
 
package org.jboss.remoting.callback;
23
 
 
24
 
import java.util.ArrayList;
25
 
import java.util.HashMap;
26
 
import java.util.Iterator;
27
 
import java.util.List;
28
 
import java.util.Map;
29
 
import java.util.Timer;
30
 
import java.util.TimerTask;
31
 
 
32
 
import org.jboss.logging.Logger;
33
 
import org.jboss.remoting.Client;
34
 
import org.jboss.remoting.ServerInvoker;
35
 
import org.jboss.remoting.transport.ClientInvoker;
36
 
 
37
 
/**
38
 
 * CallbackPoller is used to simulate push callbacks on transports that don't support
39
 
 * bidirectional connections.  It will periodically pull callbacks from the server
40
 
 * and pass them to the InvokerCallbackHandler.
41
 
 *
42
 
 * @author <a href="mailto:tom.elrod@jboss.com">Tom Elrod</a>
43
 
 * @author <a href="mailto:ron.sigal@jboss.com">Ron Sigal</a>
44
 
 */
45
 
public class CallbackPoller extends TimerTask implements Runnable
46
 
{
47
 
   /*
48
 
    * Implementation note.
49
 
    *
50
 
    * CallbackPoller uses two, or possibly three, threads.  The first thread is the
51
 
    * Timer thread, which periodically pulls callbacks from the server and adds them
52
 
    * to toHandleList.  The second thread takes callbacks from toHandleList, passes
53
 
    * them to the CallbackHandler, and, if an acknowledgement is requested for a
54
 
    * callback, it adds the callback to toAcknowledgeList.  The third thread, which is
55
 
    * created in response to the first callback for which an acknowledgement is requested,
56
 
    * takes the contents of toAcknowledgeList and acknowledges them in a batch.
57
 
    *
58
 
    * CallbackPoller will not shut down until all received callbacks have been processed
59
 
    * by the CallbackHandler and acknowledgements have been sent for all callbacks for
60
 
    * which acknowledgements have been requested.
61
 
    */
62
 
 
63
 
   /**
64
 
    * Default polling period for getting callbacks from the server.
65
 
    * Default is 5000 milliseconds.
66
 
    */
67
 
   public static final long DEFAULT_POLL_PERIOD = 5000;
68
 
   
69
 
   /**
70
 
    * Default timeout for getting callbacks in blocking mode.
71
 
    * Default is 5000 milliseconds.
72
 
    */
73
 
   public static final int DEFAULT_BLOCKING_TIMEOUT = 5000;
74
 
   
75
 
   /**
76
 
    * Default number of exceptions before callback polling wil be shut down.
77
 
    * Default is 5.
78
 
    */
79
 
   public static final int DEFAULT_MAX_ERROR_COUNT = 5;
80
 
   
81
 
   /**
82
 
    * The key value to use to specify if stop() should wait for the call to
83
 
    * org.jboss.remoting.Client.getCallbacks() should return.  The default 
84
 
    * behavior is do a synchronized shutdown for nonblocking callbacks and
85
 
    * a nonsynchronized shutdown for blocking callbacks.
86
 
    */
87
 
   public static final String SYNCHRONIZED_SHUTDOWN = "doSynchronizedShutdown";
88
 
   
89
 
   /**
90
 
    * The key value to use to specify the desired poll period
91
 
    * within the metadata Map.
92
 
    */
93
 
   public static final String CALLBACK_POLL_PERIOD = "callbackPollPeriod";
94
 
 
95
 
   /** Use java.util.timer.schedule(). */
96
 
   public static final String SCHEDULE_FIXED_RATE = "scheduleFixedRate";
97
 
 
98
 
   /** Use java.util.timer.scheduleAtFixedRate(). */
99
 
   public static final String SCHEDULE_FIXED_DELAY = "scheduleFixedDelay";
100
 
   
101
 
   /**
102
 
    * The key to use to specify the number of errors before callback polling
103
 
    * will be shut down.
104
 
    */
105
 
   public static final String MAX_ERROR_COUNT = "maxErrorCount";
106
 
 
107
 
   /** The key to use in metadata Map to request statistics.  The associated
108
 
    *  is ignored. */
109
 
   public static final String REPORT_STATISTICS = "reportStatistics";
110
 
 
111
 
   private Client client = null;
112
 
   private InvokerCallbackHandler callbackHandler = null;
113
 
   private Map metadata = null;
114
 
   private Object callbackHandlerObject = null;
115
 
   private boolean blocking = false;
116
 
   private boolean synchronizedShutdown = false;
117
 
   private long pollPeriod = DEFAULT_POLL_PERIOD;
118
 
   private Timer timer;
119
 
   private String scheduleMode = SCHEDULE_FIXED_RATE;
120
 
   private boolean reportStatistics;
121
 
   private boolean running;
122
 
   private int maxErrorCount = -1;
123
 
   private int errorCount;
124
 
   private boolean useAllParams;
125
 
 
126
 
   private ArrayList toHandleList = new ArrayList();
127
 
   private ArrayList toAcknowledgeList = new ArrayList();
128
 
   private HandleThread handleThread;
129
 
   private AcknowledgeThread acknowledgeThread;
130
 
   private BlockingPollerThread blockingPollerThread;
131
 
 
132
 
   private static final Logger log = Logger.getLogger(CallbackPoller.class);
133
 
 
134
 
 
135
 
   public CallbackPoller(Client client, InvokerCallbackHandler callbackhandler, Map metadata, Object callbackHandlerObject)
136
 
   {
137
 
      this.client = client;
138
 
      this.callbackHandler = callbackhandler;
139
 
      this.metadata = new HashMap(metadata);
140
 
      this.callbackHandlerObject = callbackHandlerObject;
141
 
   }
142
 
 
143
 
   public void start() throws Exception
144
 
   {
145
 
      if (callbackHandler == null)
146
 
      {
147
 
         throw new NullPointerException("Can not poll for callbacks when InvokerCallbackHandler is null.");
148
 
      }
149
 
      if (client != null)
150
 
      {
151
 
         client.connect();
152
 
      }
153
 
      else
154
 
      {
155
 
         throw new NullPointerException("Can not poll for callbacks when Client is null.");
156
 
      }
157
 
 
158
 
      configureParameters();
159
 
 
160
 
      handleThread = new HandleThread("HandleThread");
161
 
      handleThread.start();
162
 
      if (log.isTraceEnabled()) log.trace("blocking: " + blocking);
163
 
      if (blocking)
164
 
      {
165
 
         if (maxErrorCount == -1)
166
 
            maxErrorCount = DEFAULT_MAX_ERROR_COUNT;
167
 
         
168
 
         running = true;
169
 
         metadata.put(Client.THROW_CALLBACK_EXCEPTION, "true");
170
 
         blockingPollerThread = new BlockingPollerThread();
171
 
         blockingPollerThread.start();
172
 
      }
173
 
      else
174
 
      {
175
 
         timer = new Timer(true);
176
 
         if (SCHEDULE_FIXED_DELAY.equals(scheduleMode))
177
 
            timer.schedule(this, pollPeriod, pollPeriod);
178
 
         else
179
 
            timer.scheduleAtFixedRate(this, pollPeriod, pollPeriod);
180
 
      }
181
 
   }
182
 
 
183
 
   public synchronized void run()
184
 
   {
185
 
      // need to pull callbacks from server and give them to callback handler
186
 
      try
187
 
      {
188
 
         if (log.isTraceEnabled()) log.trace(this + " getting callbacks for " + callbackHandler);
189
 
         List callbacks = client.getCallbacks(callbackHandler, metadata);
190
 
         if (log.isTraceEnabled()) log.trace(this + " callback count: " + (callbacks == null ? 0 : callbacks.size()));
191
 
 
192
 
         if (callbacks != null && callbacks.size() > 0)
193
 
         {
194
 
            synchronized (toHandleList)
195
 
            {
196
 
               toHandleList.addAll(callbacks);
197
 
               if (toHandleList.size() == callbacks.size())
198
 
                  toHandleList.notify();
199
 
            }
200
 
         }
201
 
 
202
 
         if (reportStatistics)
203
 
            reportStatistics(callbacks);
204
 
      }
205
 
      catch (Throwable throwable)
206
 
      {
207
 
         if (!running)
208
 
         {
209
 
            stop();
210
 
            return;
211
 
         }
212
 
         
213
 
         log.info(this + " Error getting callbacks from server.");
214
 
         log.debug(this + " Error getting callbacks from server.", throwable);
215
 
         String errorMessage = throwable.getMessage();
216
 
         if (errorMessage != null)
217
 
         {
218
 
            if (errorMessage.startsWith("Could not find listener id"))
219
 
            {
220
 
               log.error("Client no longer has InvokerCallbackHandler (" + 
221
 
                          callbackHandler +
222
 
                         ") registered.  Shutting down callback polling");
223
 
               stop();
224
 
               return;
225
 
            }
226
 
            if (errorMessage.startsWith("Can not make remoting client invocation " +
227
 
                                        "due to not being connected to server."))
228
 
            {
229
 
               log.error("Client no longer connected.  Shutting down callback polling");
230
 
               stop();
231
 
               return;
232
 
            }
233
 
         }
234
 
         if (maxErrorCount >= 0)
235
 
         {
236
 
            if (++errorCount > maxErrorCount)
237
 
            {
238
 
               log.error("Error limit of " + maxErrorCount + 
239
 
                          " exceeded.  Shutting down callback polling");
240
 
               stop();
241
 
               return;
242
 
            }
243
 
         }
244
 
      }
245
 
   }
246
 
   
247
 
   public void stop()
248
 
   {
249
 
      stop(-1);
250
 
   }
251
 
 
252
 
   /**
253
 
    * stop() will not return until all received callbacks have been processed
254
 
    * by the CallbackHandler and acknowledgements have been sent for all callbacks for
255
 
    * which acknowledgements have been requested.
256
 
    */
257
 
   public void stop(int timeout)
258
 
   {
259
 
      log.debug(this + " is shutting down");
260
 
      running = false;
261
 
      
262
 
      if (!blocking)
263
 
      {
264
 
         cancel();
265
 
         
266
 
         if (timer != null)
267
 
         {
268
 
            timer.cancel();
269
 
            timer = null;
270
 
         }
271
 
      }
272
 
      
273
 
      if (timeout == 0)
274
 
         return;
275
 
      
276
 
      if (synchronizedShutdown)
277
 
      {
278
 
         // run() and stop() are synchronized so that stop() will wait until run() has finished
279
 
         // adding any callbacks it has received to toHandleList.  Therefore, once cancel()
280
 
         // returns, no more callbacks will arrive from the server.
281
 
         synchronized (this)
282
 
         {
283
 
            shutdown();
284
 
         }
285
 
      }
286
 
      else
287
 
      {
288
 
         shutdown();
289
 
      }
290
 
 
291
 
      log.debug(this + " has shut down");
292
 
   }
293
 
 
294
 
   
295
 
   private void shutdown()
296
 
   {
297
 
      // HandleThread.shutdown() will not return until all received callbacks have been
298
 
      // processed and, if necessary, added to toAcknowledgeList.
299
 
      if (handleThread != null)
300
 
      {
301
 
         handleThread.shutdown();
302
 
         handleThread = null;
303
 
      }
304
 
 
305
 
      // AcknowledgeThread.shutdown() will not return until acknowledgements have been sent
306
 
      // for all callbacks for which acknowledgements have been requested.
307
 
      if (acknowledgeThread != null)
308
 
      {
309
 
         acknowledgeThread.shutdown();
310
 
         acknowledgeThread = null;
311
 
      }
312
 
   }
313
 
   
314
 
   
315
 
   class BlockingPollerThread extends Thread
316
 
   {
317
 
      public BlockingPollerThread()
318
 
      {
319
 
         String threadName = getName();
320
 
         int i = threadName.indexOf('-');
321
 
         String threadNumber = null;
322
 
         if (i >= 0)
323
 
            threadNumber = threadName.substring(i+1);
324
 
         else
325
 
            threadNumber = Long.toString(System.currentTimeMillis());
326
 
         String pollerString = CallbackPoller.this.toString();
327
 
         String address = pollerString.substring(pollerString.indexOf('@'));
328
 
         setName("CallbackPoller:" + threadNumber + "[" + address + "]");
329
 
         setDaemon(true);
330
 
      }
331
 
 
332
 
      public void run()
333
 
      {
334
 
         while (running)
335
 
         {
336
 
            CallbackPoller.this.run();
337
 
         }
338
 
      }
339
 
   }  
340
 
 
341
 
   
342
 
   class HandleThread extends Thread
343
 
   {
344
 
      boolean running = true;
345
 
      boolean done;
346
 
      ArrayList toHandleListCopy = new ArrayList();
347
 
      Callback callback;
348
 
 
349
 
      HandleThread(String name)
350
 
      {
351
 
         super(name);
352
 
      }
353
 
      public void run()
354
 
      {
355
 
         while (true)
356
 
         {
357
 
            synchronized (toHandleList)
358
 
            {
359
 
               if (toHandleList.isEmpty() && running)
360
 
               {
361
 
                  try
362
 
                  {
363
 
                     toHandleList.wait();
364
 
                  }
365
 
                  catch (InterruptedException e)
366
 
                  {
367
 
                     log.debug("unexpected interrupt");
368
 
                     continue;
369
 
                  }
370
 
               }
371
 
 
372
 
               // If toHandleList is empty, then running must be false.  We return
373
 
               // only when both conditions are true.
374
 
               if (toHandleList.isEmpty())
375
 
               {
376
 
                  done = true;
377
 
                  toHandleList.notify();
378
 
                  return;
379
 
               }
380
 
 
381
 
               toHandleListCopy.addAll(toHandleList);
382
 
               toHandleList.clear();
383
 
            }
384
 
 
385
 
            while (!toHandleListCopy.isEmpty())
386
 
            {
387
 
               try
388
 
               {
389
 
                  callback = (Callback) toHandleListCopy.remove(0);
390
 
                  callback.setCallbackHandleObject(callbackHandlerObject);
391
 
                  callbackHandler.handleCallback(callback);
392
 
               }
393
 
               catch (HandleCallbackException e)
394
 
               {
395
 
                  log.error("Error delivering callback to callback handler (" + callbackHandler + ").", e);
396
 
               }
397
 
 
398
 
               checkForAcknowledgeRequest(callback);
399
 
            }
400
 
         }
401
 
      }
402
 
 
403
 
      /**
404
 
       *  Once CallbackPoller.stop() has called HandleThread.shutdown(), CallbackPoller.run()
405
 
       *  has terminated and no additional callbacks will be received.  shutdown() will
406
 
       *  not return until HandleThread has processed all received callbacks.
407
 
       *
408
 
       *  Either run() or shutdown() will enter its own synchronized block first.
409
 
       *
410
 
       *  case 1): run() enters its synchronized block first:
411
 
       *     If toHandleList is empty, then run() will reach toHandleList.wait(), shutdown()
412
 
       *     will wake up run(), and run() will exit.  If toHandleList is not empty, then run()
413
 
       *     will process all outstanding callbacks and return to its synchronized block.  At
414
 
       *     this point, either case 1) (with toHandleList empty) or case 2) applies.
415
 
       *
416
 
       *  case 2): shutdown() enters its synchronized block first:
417
 
       *     run() will process all outstanding callbacks and return to its synchronized block.
418
 
       *     After shutdown() reaches toHandleList.wait(), run() will enter its synchronized
419
 
       *     block, find running == false and toHandleList empty, and it will exit.
420
 
       */
421
 
      protected void shutdown()
422
 
      {
423
 
         log.debug(this + " is shutting down");
424
 
         synchronized (toHandleList)
425
 
         {
426
 
            running = false;
427
 
            toHandleList.notify();
428
 
            while (!done)
429
 
            {
430
 
               try
431
 
               {
432
 
                  toHandleList.wait();
433
 
               }
434
 
               catch (InterruptedException ignored) {}
435
 
            }
436
 
         }
437
 
         log.debug(this + " has shut down");
438
 
         return;
439
 
      }
440
 
   }
441
 
 
442
 
 
443
 
   class AcknowledgeThread extends Thread
444
 
   {
445
 
      boolean running = true;
446
 
      boolean done;
447
 
      ArrayList toAcknowledgeListCopy = new ArrayList();
448
 
 
449
 
      AcknowledgeThread(String name)
450
 
      {
451
 
         super(name);
452
 
      }
453
 
      public void run()
454
 
      {
455
 
         while (true)
456
 
         {
457
 
            synchronized (toAcknowledgeList)
458
 
            {
459
 
               while (toAcknowledgeList.isEmpty() && running)
460
 
               {
461
 
                  try
462
 
                  {
463
 
                     toAcknowledgeList.wait();
464
 
                  }
465
 
                  catch (InterruptedException e)
466
 
                  {
467
 
                     log.debug("unexpected interrupt");
468
 
                     continue;
469
 
                  }
470
 
               }
471
 
 
472
 
               // If toAcknowledgeList is empty, then running must be false.  We return
473
 
               // only when both conditions are true.
474
 
               if (toAcknowledgeList.isEmpty())
475
 
               {
476
 
                  done = true;
477
 
                  toAcknowledgeList.notify();
478
 
                  return;
479
 
               }
480
 
 
481
 
               toAcknowledgeListCopy.addAll(toAcknowledgeList);
482
 
               toAcknowledgeList.clear();
483
 
            }
484
 
 
485
 
            try
486
 
            {
487
 
               if (log.isTraceEnabled())
488
 
               {
489
 
                  Iterator it = toAcknowledgeListCopy.iterator();
490
 
                  while (it.hasNext())
491
 
                  {
492
 
                     Callback cb = (Callback) it.next();
493
 
                     Map map = cb.getReturnPayload();
494
 
                     log.trace("acknowledging: " + map.get(ServerInvokerCallbackHandler.CALLBACK_ID));
495
 
                  }
496
 
               }
497
 
               client.acknowledgeCallbacks(callbackHandler, toAcknowledgeListCopy);
498
 
               toAcknowledgeListCopy.clear();
499
 
            }
500
 
            catch (Throwable t)
501
 
            {
502
 
               log.error("Error acknowledging callback for callback handler (" + callbackHandler + ").", t);
503
 
            }
504
 
         }
505
 
      }
506
 
 
507
 
      /**
508
 
       *  Once CallbackPoller.stop() has called AcknowledgeThread.shutdown(), HandleThread
509
 
       *  has terminated and no additional callbacks will be added to toAcknowledgeList.
510
 
       *  shutdown() will not return until AcknowledgeThread has acknowledged all callbacks
511
 
       *  in toAcknowledgeList.
512
 
       *
513
 
       *  Either run() or shutdown() will enter its own synchronized block first.
514
 
       *
515
 
       *  case 1): run() enters its synchronized block first:
516
 
       *     If toAcknowledgeList is empty, then run() will reach toAcknowledgeList.wait(),
517
 
       *     shutdown() will wake up run(), and run() will exit.  If toAcknowledgeList is not
518
 
       *     empty, then run() will process all callbacks in toAcknowledgeList and return to
519
 
       *     its synchronized block.  At this point, either case 1) (with toAcknowledgeList
520
 
       *     empty) or case 2) applies.
521
 
       *
522
 
       *  case 2): shutdown() enters its synchronized block first:
523
 
       *     run() will process all callbacks in toAcknowledgeList and return to its
524
 
       *     synchronized block.  After shutdown() reaches toAcknowledgeList.wait(), run()
525
 
       *     will enter its synchronized block, find running == false and toAcknowledgeList
526
 
       *     empty, and it will exit.
527
 
       */
528
 
      public void shutdown()
529
 
      {
530
 
         log.debug(this + " is shutting down");      
531
 
         synchronized (toAcknowledgeList)
532
 
         {
533
 
            running = false;
534
 
            toAcknowledgeList.notify();
535
 
            while (!done)
536
 
            {
537
 
               try
538
 
               {
539
 
                  toAcknowledgeList.wait();
540
 
               }
541
 
               catch (InterruptedException ignored) {}
542
 
            }
543
 
         }
544
 
         log.debug(this + " has shut down");
545
 
         return;
546
 
      }
547
 
   }
548
 
 
549
 
 
550
 
   private void checkForAcknowledgeRequest(Callback callback)
551
 
   {
552
 
      Map returnPayload = callback.getReturnPayload();
553
 
      if (returnPayload != null)
554
 
      {
555
 
         Object callbackId = returnPayload.get(ServerInvokerCallbackHandler.CALLBACK_ID);
556
 
         if (callbackId != null)
557
 
         {
558
 
            Object o = returnPayload.get(ServerInvokerCallbackHandler.REMOTING_ACKNOWLEDGES_PUSH_CALLBACKS);
559
 
            if (o instanceof String  && Boolean.valueOf((String)o).booleanValue() ||
560
 
                o instanceof Boolean && ((Boolean)o).booleanValue())
561
 
            {
562
 
               synchronized (toAcknowledgeList)
563
 
               {
564
 
                  toAcknowledgeList.add(callback);
565
 
                  if (toAcknowledgeList.size() == 1)
566
 
                  {
567
 
                     if (acknowledgeThread == null)
568
 
                     {
569
 
                        acknowledgeThread = new AcknowledgeThread("AcknowledgeThread");
570
 
                        acknowledgeThread.start();
571
 
                     }
572
 
                     else
573
 
                     {
574
 
                        toAcknowledgeList.notify();
575
 
                     }
576
 
                  }
577
 
               }
578
 
            }
579
 
         }
580
 
      }
581
 
   }
582
 
   
583
 
   
584
 
   private void configureParameters()
585
 
   {
586
 
      Map config = new HashMap();
587
 
      ClientInvoker invoker = client.getInvoker();
588
 
      if (invoker != null)
589
 
      {
590
 
         config.putAll(invoker.getLocator().getParameters());
591
 
      }
592
 
      config.putAll(client.getConfiguration());
593
 
      config.putAll(metadata);
594
 
      
595
 
      Object val = config.get(Client.USE_ALL_PARAMS);
596
 
      if (val != null)
597
 
      {
598
 
         if (val instanceof String)
599
 
         {
600
 
            useAllParams = Boolean.valueOf((String) val).booleanValue();
601
 
         }
602
 
         else
603
 
         {
604
 
            log.warn("Value for " + Client.USE_ALL_PARAMS + " must be of type " +
605
 
                     String.class.getName() + " and is " + val.getClass().getName());
606
 
         }
607
 
      }
608
 
      log.debug(this + ": useAllParams: " + useAllParams);
609
 
      if (!useAllParams)
610
 
      {
611
 
         config = metadata;
612
 
      }
613
 
      
614
 
      val = config.get(ServerInvoker.BLOCKING_MODE);
615
 
      if (val != null)
616
 
      {
617
 
         if (val instanceof String)
618
 
         {
619
 
            if (ServerInvoker.BLOCKING.equals(val))
620
 
            {
621
 
               blocking = true;
622
 
               synchronizedShutdown = false;
623
 
            }
624
 
            else if (ServerInvoker.NONBLOCKING.equals(val))
625
 
            {
626
 
               blocking = false;
627
 
               synchronizedShutdown = true;
628
 
            }
629
 
            else
630
 
            {
631
 
               log.warn("Value for " + ServerInvoker.BLOCKING_MODE + 
632
 
                     " configuration is " + val + ". Must be either " +
633
 
                     ServerInvoker.BLOCKING + " or " + ServerInvoker.NONBLOCKING +
634
 
                     ". Using " + ServerInvoker.BLOCKING + ".");
635
 
            }
636
 
         }
637
 
         else
638
 
         {
639
 
            log.warn("Value for " + ServerInvoker.BLOCKING_MODE + 
640
 
                  " configuration must be of type " + String.class.getName() +
641
 
                  " and is of type " + val.getClass().getName());
642
 
         }
643
 
      }
644
 
 
645
 
      // Default blocking mode on server is nonblocking.
646
 
      if (blocking)
647
 
         metadata.put(ServerInvoker.BLOCKING_MODE, ServerInvoker.BLOCKING);
648
 
 
649
 
      val = config.get(ServerInvoker.BLOCKING_TIMEOUT);
650
 
      if (val != null)
651
 
      {
652
 
         if (val instanceof String)
653
 
         {
654
 
            try
655
 
            {
656
 
               int blockingTimeout = Integer.parseInt((String) val);
657
 
               metadata.put(ServerInvoker.TIMEOUT, Integer.toString(blockingTimeout));
658
 
            }
659
 
            catch (NumberFormatException e)
660
 
            {
661
 
               log.warn("Error converting " + ServerInvoker.BLOCKING_TIMEOUT + " to type long.  " + e.getMessage());
662
 
            }
663
 
         }
664
 
         else
665
 
         {
666
 
            log.warn("Value for " + ServerInvoker.BLOCKING_TIMEOUT + " configuration must be of type " + String.class.getName() +
667
 
                  " and is " + val.getClass().getName());
668
 
         }
669
 
      }
670
 
 
671
 
      val = config.get(SYNCHRONIZED_SHUTDOWN);
672
 
      if (val != null)
673
 
      {
674
 
         if (val instanceof String)
675
 
         {
676
 
            synchronizedShutdown = Boolean.valueOf((String) val).booleanValue();
677
 
         }
678
 
         else
679
 
         {
680
 
            log.warn("Value for " + SYNCHRONIZED_SHUTDOWN + " must be of type " + String.class.getName() +
681
 
                  " and is " + val.getClass().getName());
682
 
         }
683
 
      }
684
 
 
685
 
      val = config.get(CALLBACK_POLL_PERIOD);
686
 
      if (val != null)
687
 
      {
688
 
         if (val instanceof String)
689
 
         {
690
 
            try
691
 
            {
692
 
               pollPeriod = Long.parseLong((String) val);
693
 
            }
694
 
            catch (NumberFormatException e)
695
 
            {
696
 
               log.warn("Error converting " + CALLBACK_POLL_PERIOD + " to type long.  " + e.getMessage());
697
 
            }
698
 
         }
699
 
         else
700
 
         {
701
 
            log.warn("Value for " + CALLBACK_POLL_PERIOD + " configuration must be of type " + String.class.getName() +
702
 
                  " and is " + val.getClass().getName());
703
 
         }
704
 
      }
705
 
      val = config.get(CALLBACK_SCHEDULE_MODE);
706
 
      if (val != null)
707
 
      {
708
 
         if (val instanceof String)
709
 
         {
710
 
            if (SCHEDULE_FIXED_DELAY.equals(val) || SCHEDULE_FIXED_RATE.equals(val))
711
 
            {
712
 
               scheduleMode = (String) val;
713
 
            }
714
 
            else
715
 
            {
716
 
               log.warn("Unrecognized value for " + CALLBACK_SCHEDULE_MODE + ": " + val);
717
 
               log.warn("Using " + scheduleMode);
718
 
            }
719
 
         }
720
 
         else
721
 
         {
722
 
            log.warn("Value for " + CALLBACK_SCHEDULE_MODE + " must be of type " + String.class.getName() +
723
 
                  " and is " + val.getClass().getName());
724
 
         }
725
 
      }
726
 
      val = config.get(MAX_ERROR_COUNT);
727
 
      if (val != null)
728
 
      {
729
 
         if (val instanceof String)
730
 
         {
731
 
            try
732
 
            {
733
 
               maxErrorCount = Integer.parseInt((String) val);
734
 
            }
735
 
            catch (NumberFormatException e)
736
 
            {
737
 
               log.warn("Error converting " + MAX_ERROR_COUNT + " to type int.  " + e.getMessage());
738
 
            }
739
 
         }
740
 
         else
741
 
         {
742
 
            log.warn("Value for " + MAX_ERROR_COUNT + " configuration must be of type " + String.class.getName() +
743
 
                  " and is " + val.getClass().getName());
744
 
         }
745
 
      }
746
 
      if (config.get(REPORT_STATISTICS) != null)
747
 
      {
748
 
         reportStatistics = true;
749
 
      }
750
 
   }
751
 
 
752
 
 
753
 
   private void reportStatistics(List callbacks)
754
 
   {
755
 
      int toHandle;
756
 
      int toAcknowledge = 0;
757
 
 
758
 
      synchronized (toHandleList)
759
 
      {
760
 
         toHandle = toHandleList.size() + handleThread.toHandleListCopy.size();
761
 
      }
762
 
 
763
 
      synchronized (toAcknowledgeList)
764
 
      {
765
 
         if (acknowledgeThread != null)
766
 
            toAcknowledge = toAcknowledgeList.size() + acknowledgeThread.toAcknowledgeListCopy.size();
767
 
      }
768
 
 
769
 
      StringBuffer message = new StringBuffer("\n");
770
 
      message.append("================================\n")
771
 
             .append("  retrieved " + callbacks.size() + " callbacks\n")
772
 
             .append("  callbacks waiting to be processed: " + toHandle + "\n")
773
 
             .append("  callbacks waiting to be acknowledged: " + toAcknowledge + "\n")
774
 
             .append("================================");
775
 
      log.info(message);
776
 
   }
777
 
 
778
 
 
779
 
   /**
780
 
    * The key value to use in metadata Map to specify the desired scheduling mode. 
781
 
    */
782
 
   public static final String CALLBACK_SCHEDULE_MODE = "scheduleMode";
783
 
}
 
 
b'\\ No newline at end of file'