~ubuntu-branches/ubuntu/saucy/resiprocate/saucy-proposed

« back to all changes in this revision

Viewing changes to rutil/AbstractFifo.hxx

  • Committer: Package Import Robot
  • Author(s): Daniel Pocock
  • Date: 2012-05-17 19:29:59 UTC
  • Revision ID: package-import@ubuntu.com-20120517192959-vv00m77isztdy64q
Tags: upstream-1.8.2
ImportĀ upstreamĀ versionĀ 1.8.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#ifndef RESIP_AbstractFifo_hxx
 
2
#define RESIP_AbstractFifo_hxx 
 
3
 
 
4
#include <cassert>
 
5
#include <deque>
 
6
 
 
7
#include "rutil/Mutex.hxx"
 
8
#include "rutil/Condition.hxx"
 
9
#include "rutil/Lock.hxx"
 
10
#include "rutil/CongestionManager.hxx"
 
11
 
 
12
#include "rutil/compat.hxx"
 
13
#include "rutil/Timer.hxx"
 
14
 
 
15
namespace resip
 
16
{
 
17
/**
 
18
  @brief Interface for providing metrics on FIFOs, primarily used by 
 
19
   CongestionManager.
 
20
   Provides four different types of metrics:
 
21
      - size : The number of elements in the queue
 
22
      - time-depth : The age of the oldest item in the queue (ie, the front)
 
23
      - expected wait-time : A heuristic estimating the amount of time a message
 
24
         would take to be serviced if it were added to the queue.
 
25
      - average service time : The average time it takes to service a single
 
26
         element from the queue (this is helpful in congestion control, but is
 
27
         mostly intended for logging).
 
28
*/
 
29
class FifoStatsInterface
 
30
{
 
31
   public:
 
32
         
 
33
     FifoStatsInterface();
 
34
     virtual ~FifoStatsInterface();
 
35
     
 
36
     /**
 
37
         Returns the expected time it will take to service all messages 
 
38
         currently in the queue (in milli-seconds)
 
39
     */
 
40
     virtual time_t expectedWaitTimeMilliSec() const =0;
 
41
 
 
42
     /**
 
43
         Returns the difference in time between the youngest and oldest item in 
 
44
         the FIFO in seconds
 
45
      */
 
46
      virtual time_t getTimeDepth() const = 0;
 
47
     
 
48
     /**
 
49
         Returns the number of elements in the FIFO
 
50
      */
 
51
      virtual size_t getCountDepth() const = 0;
 
52
 
 
53
     /**
 
54
         Returns the average time it takes for individual messages to be 
 
55
         serviced (in micro-seconds)
 
56
     */
 
57
     virtual time_t averageServiceTimeMicroSec() const = 0;
 
58
 
 
59
      /**
 
60
         @internal
 
61
         Return this fifo's role-number. The meaning of the return is defined on
 
62
         a per-application basis, and will have special meaning to the 
 
63
         CongestionManager implementation specific to that app. For instance, 
 
64
         1 might be understood to represent the main state machine fifo in 
 
65
         resip, 2 might indicate a transport fifo (of which there may be 
 
66
         several), 3 might indicate a particular TU's fifo, etc.
 
67
         These are intended for use by CongestionManager only.
 
68
      */
 
69
      inline UInt8 getRole() const {return mRole;}
 
70
 
 
71
      /**
 
72
         @internal
 
73
         Set this fifo's role-number.
 
74
         @see getRole()
 
75
      */
 
76
      inline void setRole(UInt8 role) {mRole=role;}
 
77
 
 
78
      /**
 
79
         Sets the description for this fifo. This is used in the logging for
 
80
         this fifo's statistics, and can also be used by the CongestionManager 
 
81
         to assign a role-number.
 
82
         @param description The description for this fifo.
 
83
      */
 
84
      inline void setDescription(const resip::Data& description)
 
85
      {
 
86
         mDescription=description;
 
87
      }
 
88
 
 
89
      /**
 
90
         Gets the description for this fifo.
 
91
         @see setDescription()
 
92
      */
 
93
      virtual const resip::Data& getDescription() const {return mDescription;}
 
94
 
 
95
   protected:
 
96
      Data mDescription;
 
97
      UInt8 mRole;
 
98
};
 
99
 
 
100
/**
 
101
  * The getNext() method takes an argument {ms} that normally
 
102
  * the number of milliseconds to wait. There are two special values:
 
103
  * NOWAIT
 
104
  *     Don't wait/block/sleep. If no message to retrieve, return NULL.
 
105
  * FOREVER
 
106
  *     Wait forever until a message is available.
 
107
  * Note that the encoding (0 vs -1) is the oppositive convention
 
108
  * of standard APIs such as epoll_wait(). This is for historical reasons.
 
109
  */
 
110
#define RESIP_FIFO_NOWAIT       -1
 
111
#define RESIP_FIFO_FOREVER      0
 
112
 
 
113
/**
 
114
   @brief The base class from which various templated Fifo classes are derived.
 
115
 
 
116
   (aka template hoist) 
 
117
   AbstractFifo's get operations are all threadsafe; AbstractFifo does not 
 
118
   define any put operations (these are defined in subclasses).
 
119
   @note Users of the resip stack will not need to interact with this class 
 
120
      directly in most cases. Look at Fifo and TimeLimitFifo instead.
 
121
 
 
122
   @ingroup message_passing
 
123
 */
 
124
template <typename T>
 
125
class AbstractFifo : public FifoStatsInterface
 
126
{
 
127
   public:
 
128
     /** 
 
129
      * @brief Constructor
 
130
      * @param maxSize max number of messages to keep
 
131
      **/
 
132
      AbstractFifo()
 
133
         : FifoStatsInterface(),
 
134
            mLastSampleTakenMicroSec(0),
 
135
            mCounter(0),
 
136
            mAverageServiceTimeMicroSec(0),
 
137
            mSize(0)
 
138
      {}
 
139
 
 
140
      virtual ~AbstractFifo()
 
141
      {
 
142
      }
 
143
 
 
144
      /** 
 
145
         @brief is the queue empty?
 
146
         @return true if the queue is empty and false otherwise
 
147
       **/
 
148
      bool empty() const
 
149
      {
 
150
         Lock lock(mMutex); (void)lock;
 
151
         return mFifo.empty();
 
152
      }
 
153
 
 
154
      /**
 
155
        @brief get the current size of the fifo.
 
156
        @note Note you should not use this function to determine
 
157
        whether a call to getNext() will block or not. Use
 
158
        messageAvailable() instead.
 
159
        @return the number of messages in the queue
 
160
       */
 
161
      virtual unsigned int size() const
 
162
      {
 
163
         Lock lock(mMutex); (void)lock;
 
164
         return (unsigned int)mFifo.size();
 
165
      }
 
166
 
 
167
      /**
 
168
      @brief is a message available?
 
169
      @retval true if a message is available and false otherwise
 
170
       */
 
171
       
 
172
      bool messageAvailable() const
 
173
      {
 
174
         Lock lock(mMutex); (void)lock;
 
175
         return !mFifo.empty();
 
176
      }
 
177
 
 
178
      /**
 
179
      @brief computes the time delta between the oldest and newest queue members
 
180
      @note defaults to zero, overridden by TimeLimitFifo<T>
 
181
      @return the time delta between the oldest and newest queue members
 
182
      */
 
183
      virtual time_t getTimeDepth() const
 
184
      {
 
185
         return 0;
 
186
      }
 
187
 
 
188
      virtual size_t getCountDepth() const
 
189
      {
 
190
         return mSize;
 
191
      }
 
192
 
 
193
      virtual time_t expectedWaitTimeMilliSec() const
 
194
      {
 
195
         return ((mAverageServiceTimeMicroSec*mSize)+500)/1000;
 
196
      }
 
197
 
 
198
      virtual time_t averageServiceTimeMicroSec() const
 
199
      {
 
200
         return mAverageServiceTimeMicroSec;
 
201
      }
 
202
 
 
203
      /// remove all elements in the queue (or not)
 
204
      virtual void clear() {};
 
205
 
 
206
   protected:
 
207
      /** 
 
208
          @brief Returns the first message available.
 
209
          @details Returns the first message available. It will wait if no
 
210
          messages are available. If a signal interrupts the wait,
 
211
          it will retry the wait. Signals can therefore not be caught
 
212
          via getNext. If you need to detect a signal, use block
 
213
          prior to calling getNext.
 
214
          @return the first message available
 
215
       */
 
216
      T getNext()
 
217
      {
 
218
         Lock lock(mMutex); (void)lock;
 
219
         onFifoPolled();
 
220
 
 
221
         // Wait util there are messages available.
 
222
         while (mFifo.empty())
 
223
         {
 
224
            mCondition.wait(mMutex);
 
225
         }
 
226
 
 
227
         // Return the first message on the fifo.
 
228
         //
 
229
         T firstMessage(mFifo.front());
 
230
         mFifo.pop_front();
 
231
         onMessagePopped();
 
232
         return firstMessage;
 
233
      }
 
234
 
 
235
 
 
236
      /**
 
237
        @brief Returns the next message available.
 
238
        @details Returns the next message available. Will wait up to
 
239
        ms milliseconds if no information is available. If
 
240
        the specified time passes or a signal interrupts the
 
241
        wait, this method returns 0. This interface provides
 
242
        no mechanism to distinguish between timeout and
 
243
        interrupt.
 
244
       */
 
245
      bool getNext(int ms, T& toReturn)
 
246
      {
 
247
         if(ms == 0) 
 
248
         {
 
249
            toReturn = getNext();
 
250
            return true;
 
251
         }
 
252
 
 
253
         if(ms < 0)
 
254
         {
 
255
            Lock lock(mMutex); (void)lock;
 
256
            onFifoPolled();
 
257
            if (mFifo.empty())  // WATCHOUT: Do not test mSize instead
 
258
              return false;
 
259
            toReturn = mFifo.front();
 
260
            mFifo.pop_front();
 
261
            return true;
 
262
         }
 
263
 
 
264
         const UInt64 begin(Timer::getTimeMs());
 
265
         const UInt64 end(begin + (unsigned int)(ms)); // !kh! ms should've been unsigned :(
 
266
         Lock lock(mMutex); (void)lock;
 
267
         onFifoPolled();
 
268
 
 
269
         // Wait until there are messages available
 
270
         while (mFifo.empty())
 
271
         {
 
272
            if(ms==0)
 
273
            {
 
274
               return false;
 
275
            }
 
276
            const UInt64 now(Timer::getTimeMs());
 
277
            if(now >= end)
 
278
            {
 
279
                return false;
 
280
            }
 
281
      
 
282
            unsigned int timeout((unsigned int)(end - now));
 
283
                    
 
284
            // bail if total wait time exceeds limit
 
285
            bool signaled = mCondition.wait(mMutex, timeout);
 
286
            if (!signaled)
 
287
            {
 
288
               return false;
 
289
            }
 
290
         }
 
291
      
 
292
         // Return the first message on the fifo.
 
293
         //
 
294
         toReturn=mFifo.front();
 
295
         mFifo.pop_front();
 
296
         onMessagePopped();
 
297
         return true;
 
298
      }
 
299
 
 
300
      typedef std::deque<T> Messages;
 
301
 
 
302
      void getMultiple(Messages& other, unsigned int max)
 
303
      {
 
304
         Lock lock(mMutex); (void)lock;
 
305
         onFifoPolled();
 
306
         assert(other.empty());
 
307
         while (mFifo.empty())
 
308
         {
 
309
            mCondition.wait(mMutex);
 
310
         }
 
311
 
 
312
         if(mFifo.size() <= max)
 
313
         {
 
314
            std::swap(mFifo, other);
 
315
            onMessagePopped(mSize);
 
316
         }
 
317
         else
 
318
         {
 
319
            size_t num=max;
 
320
            while( 0 != max-- )
 
321
            {
 
322
               other.push_back(mFifo.front());
 
323
               mFifo.pop_front();
 
324
            }
 
325
            onMessagePopped((unsigned int)num);
 
326
         }
 
327
      }
 
328
 
 
329
      bool getMultiple(int ms, Messages& other, unsigned int max)
 
330
      {
 
331
         if(ms==0)
 
332
         {
 
333
            getMultiple(other,max);
 
334
            return true;
 
335
         }
 
336
 
 
337
         assert(other.empty());
 
338
         const UInt64 begin(Timer::getTimeMs());
 
339
         const UInt64 end(begin + (unsigned int)(ms)); // !kh! ms should've been unsigned :(
 
340
         Lock lock(mMutex); (void)lock;
 
341
         onFifoPolled();
 
342
 
 
343
         // Wait until there are messages available
 
344
         while (mFifo.empty())
 
345
         {
 
346
            if(ms < 0)
 
347
            {
 
348
               return false;
 
349
            }
 
350
            const UInt64 now(Timer::getTimeMs());
 
351
            if(now >= end)
 
352
            {
 
353
                return false;
 
354
            }
 
355
 
 
356
            unsigned int timeout((unsigned int)(end - now));
 
357
                    
 
358
            // bail if total wait time exceeds limit
 
359
            bool signaled = mCondition.wait(mMutex, timeout);
 
360
            if (!signaled)
 
361
            {
 
362
               return false;
 
363
            }
 
364
         }
 
365
 
 
366
         if(mFifo.size() <= max)
 
367
         {
 
368
            std::swap(mFifo, other);
 
369
            onMessagePopped(mSize);
 
370
         }
 
371
         else
 
372
         {
 
373
            size_t num=max;
 
374
            while( 0 != max-- )
 
375
            {
 
376
               other.push_back(mFifo.front());
 
377
               mFifo.pop_front();
 
378
            }
 
379
            onMessagePopped((unsigned int)num);
 
380
         }
 
381
         return true;
 
382
      }
 
383
 
 
384
      size_t add(const T& item)
 
385
      {
 
386
         Lock lock(mMutex); (void)lock;
 
387
         mFifo.push_back(item);
 
388
         mCondition.signal();
 
389
         onMessagePushed(1);
 
390
         return mFifo.size();
 
391
      }
 
392
 
 
393
      size_t addMultiple(Messages& items)
 
394
      {
 
395
         Lock lock(mMutex); (void)lock;
 
396
         size_t size=items.size();
 
397
         if(mFifo.empty())
 
398
         {
 
399
            std::swap(mFifo, items);
 
400
         }
 
401
         else
 
402
         {
 
403
            // I suppose it is possible to optimize this as a push_front() from
 
404
            // mFifo to items, and then do a swap, if items is larger.
 
405
            while(!items.empty())
 
406
            {
 
407
               mFifo.push_back(items.front());
 
408
               items.pop_front();
 
409
            }
 
410
         }
 
411
         mCondition.signal();
 
412
         onMessagePushed((int)size);
 
413
         return mFifo.size();
 
414
      }
 
415
 
 
416
      /** @brief container for FIFO items */
 
417
      Messages mFifo;
 
418
      /** @brief access serialization lock */
 
419
      mutable Mutex mMutex;
 
420
      /** @brief condition for waiting on new queue items */
 
421
      Condition mCondition;
 
422
 
 
423
      mutable UInt64 mLastSampleTakenMicroSec;
 
424
      mutable UInt32 mCounter;
 
425
      mutable UInt32 mAverageServiceTimeMicroSec;
 
426
      // std::deque has to perform some amount of traversal to calculate its 
 
427
      // size; we maintain this count so that it can be queried without locking, 
 
428
      // in situations where it being off by a small amount is ok.
 
429
      UInt32 mSize;
 
430
 
 
431
      virtual void onFifoPolled()
 
432
      {
 
433
         // !bwc! TODO allow this sampling frequency to be tweaked
 
434
         if(mLastSampleTakenMicroSec &&
 
435
            mCounter &&
 
436
            (mCounter >= 64 || mFifo.empty()))
 
437
         {
 
438
            UInt64 now(Timer::getTimeMicroSec());
 
439
            UInt64 diff = now-mLastSampleTakenMicroSec;
 
440
 
 
441
            if(mCounter >= 4096)
 
442
            {
 
443
               mAverageServiceTimeMicroSec=(UInt32)resipIntDiv(diff, mCounter);
 
444
            }
 
445
            else // fifo got emptied; merge into a rolling average
 
446
            {
 
447
               // .bwc. This is a moving average with period 64, round to 
 
448
               // nearest int.
 
449
               mAverageServiceTimeMicroSec=(UInt32)resipIntDiv(
 
450
                     diff+((4096-mCounter)*mAverageServiceTimeMicroSec),
 
451
                     4096U);
 
452
            }
 
453
            mCounter=0;
 
454
            if(mFifo.empty())
 
455
            {
 
456
               mLastSampleTakenMicroSec=0;
 
457
            }
 
458
            else
 
459
            {
 
460
               mLastSampleTakenMicroSec=now;
 
461
            }
 
462
         }
 
463
      }
 
464
 
 
465
      /**
 
466
         Called when a message (or messages) are removed from this fifo. Used to
 
467
         drive service time calculations.
 
468
      */
 
469
      virtual void onMessagePopped(unsigned int num=1)
 
470
      {
 
471
         mCounter+=num;
 
472
         mSize-=num;
 
473
      }
 
474
 
 
475
      virtual void onMessagePushed(int num)
 
476
      {
 
477
         if(mSize==0)
 
478
         {
 
479
            // Fifo went from empty to non-empty. Take a timestamp, and record
 
480
            // how long it takes to process some messages.
 
481
            mLastSampleTakenMicroSec=Timer::getTimeMicroSec();
 
482
         }
 
483
         mSize+=num;
 
484
      }
 
485
   private:
 
486
      // no value semantics
 
487
      AbstractFifo(const AbstractFifo&);
 
488
      AbstractFifo& operator=(const AbstractFifo&);
 
489
};
 
490
 
 
491
} // namespace resip
 
492
 
 
493
#endif
 
494
 
 
495
/* ====================================================================
 
496
 * The Vovida Software License, Version 1.0 
 
497
 * 
 
498
 * Redistribution and use in source and binary forms, with or without
 
499
 * modification, are permitted provided that the following conditions
 
500
 * are met:
 
501
 * 
 
502
 * 1. Redistributions of source code must retain the above copyright
 
503
 *    notice, this list of conditions and the following disclaimer.
 
504
 * 
 
505
 * 2. Redistributions in binary form must reproduce the above copyright
 
506
 *    notice, this list of conditions and the following disclaimer in
 
507
 *    the documentation and/or other materials provided with the
 
508
 *    distribution.
 
509
 * 
 
510
 * 3. The names "VOCAL", "Vovida Open Communication Application Library",
 
511
 *    and "Vovida Open Communication Application Library (VOCAL)" must
 
512
 *    not be used to endorse or promote products derived from this
 
513
 *    software without prior written permission. For written
 
514
 *    permission, please contact vocal@vovida.org.
 
515
 *
 
516
 * 4. Products derived from this software may not be called "VOCAL", nor
 
517
 *    may "VOCAL" appear in their name, without prior written
 
518
 *    permission of Vovida Networks, Inc.
 
519
 * 
 
520
 * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED
 
521
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 
522
 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND
 
523
 * NON-INFRINGEMENT ARE DISCLAIMED.  IN NO EVENT SHALL VOVIDA
 
524
 * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES
 
525
 * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL,
 
526
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 
527
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 
528
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
 
529
 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
530
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
 
531
 * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
 
532
 * DAMAGE.
 
533
 * 
 
534
 * ====================================================================
 
535
 * 
 
536
 * This software consists of voluntary contributions made by Vovida
 
537
 * Networks, Inc. and many individuals on behalf of Vovida Networks,
 
538
 * Inc.  For more information on Vovida Networks, Inc., please see
 
539
 * <http://www.vovida.org/>.
 
540
 *
 
541
 */