1
#ifndef RESIP_AbstractFifo_hxx
2
#define RESIP_AbstractFifo_hxx
7
#include "rutil/Mutex.hxx"
8
#include "rutil/Condition.hxx"
9
#include "rutil/Lock.hxx"
10
#include "rutil/CongestionManager.hxx"
12
#include "rutil/compat.hxx"
13
#include "rutil/Timer.hxx"
18
@brief Interface for providing metrics on FIFOs, primarily used by
20
Provides four different types of metrics:
21
- size : The number of elements in the queue
22
- time-depth : The age of the oldest item in the queue (ie, the front)
23
- expected wait-time : A heuristic estimating the amount of time a message
24
would take to be serviced if it were added to the queue.
25
- average service time : The average time it takes to service a single
26
element from the queue (this is helpful in congestion control, but is
27
mostly intended for logging).
29
class FifoStatsInterface
34
virtual ~FifoStatsInterface();
37
Returns the expected time it will take to service all messages
38
currently in the queue (in milli-seconds)
40
virtual time_t expectedWaitTimeMilliSec() const =0;
43
Returns the difference in time between the youngest and oldest item in
46
virtual time_t getTimeDepth() const = 0;
49
Returns the number of elements in the FIFO
51
virtual size_t getCountDepth() const = 0;
54
Returns the average time it takes for individual messages to be
55
serviced (in micro-seconds)
57
virtual time_t averageServiceTimeMicroSec() const = 0;
61
Return this fifo's role-number. The meaning of the return is defined on
62
a per-application basis, and will have special meaning to the
63
CongestionManager implementation specific to that app. For instance,
64
1 might be understood to represent the main state machine fifo in
65
resip, 2 might indicate a transport fifo (of which there may be
66
several), 3 might indicate a particular TU's fifo, etc.
67
These are intended for use by CongestionManager only.
69
inline UInt8 getRole() const {return mRole;}
73
Set this fifo's role-number.
76
inline void setRole(UInt8 role) {mRole=role;}
79
Sets the description for this fifo. This is used in the logging for
80
this fifo's statistics, and can also be used by the CongestionManager
81
to assign a role-number.
82
@param description The description for this fifo.
84
inline void setDescription(const resip::Data& description)
86
mDescription=description;
90
Gets the description for this fifo.
93
virtual const resip::Data& getDescription() const {return mDescription;}
101
* The getNext() method takes an argument {ms} that normally
102
* the number of milliseconds to wait. There are two special values:
104
* Don't wait/block/sleep. If no message to retrieve, return NULL.
106
* Wait forever until a message is available.
107
* Note that the encoding (0 vs -1) is the oppositive convention
108
* of standard APIs such as epoll_wait(). This is for historical reasons.
110
#define RESIP_FIFO_NOWAIT -1
111
#define RESIP_FIFO_FOREVER 0
114
@brief The base class from which various templated Fifo classes are derived.
117
AbstractFifo's get operations are all threadsafe; AbstractFifo does not
118
define any put operations (these are defined in subclasses).
119
@note Users of the resip stack will not need to interact with this class
120
directly in most cases. Look at Fifo and TimeLimitFifo instead.
122
@ingroup message_passing
124
template <typename T>
125
class AbstractFifo : public FifoStatsInterface
130
* @param maxSize max number of messages to keep
133
: FifoStatsInterface(),
134
mLastSampleTakenMicroSec(0),
136
mAverageServiceTimeMicroSec(0),
140
virtual ~AbstractFifo()
145
@brief is the queue empty?
146
@return true if the queue is empty and false otherwise
150
Lock lock(mMutex); (void)lock;
151
return mFifo.empty();
155
@brief get the current size of the fifo.
156
@note Note you should not use this function to determine
157
whether a call to getNext() will block or not. Use
158
messageAvailable() instead.
159
@return the number of messages in the queue
161
virtual unsigned int size() const
163
Lock lock(mMutex); (void)lock;
164
return (unsigned int)mFifo.size();
168
@brief is a message available?
169
@retval true if a message is available and false otherwise
172
bool messageAvailable() const
174
Lock lock(mMutex); (void)lock;
175
return !mFifo.empty();
179
@brief computes the time delta between the oldest and newest queue members
180
@note defaults to zero, overridden by TimeLimitFifo<T>
181
@return the time delta between the oldest and newest queue members
183
virtual time_t getTimeDepth() const
188
virtual size_t getCountDepth() const
193
virtual time_t expectedWaitTimeMilliSec() const
195
return ((mAverageServiceTimeMicroSec*mSize)+500)/1000;
198
virtual time_t averageServiceTimeMicroSec() const
200
return mAverageServiceTimeMicroSec;
203
/// remove all elements in the queue (or not)
204
virtual void clear() {};
208
@brief Returns the first message available.
209
@details Returns the first message available. It will wait if no
210
messages are available. If a signal interrupts the wait,
211
it will retry the wait. Signals can therefore not be caught
212
via getNext. If you need to detect a signal, use block
213
prior to calling getNext.
214
@return the first message available
218
Lock lock(mMutex); (void)lock;
221
// Wait util there are messages available.
222
while (mFifo.empty())
224
mCondition.wait(mMutex);
227
// Return the first message on the fifo.
229
T firstMessage(mFifo.front());
237
@brief Returns the next message available.
238
@details Returns the next message available. Will wait up to
239
ms milliseconds if no information is available. If
240
the specified time passes or a signal interrupts the
241
wait, this method returns 0. This interface provides
242
no mechanism to distinguish between timeout and
245
bool getNext(int ms, T& toReturn)
249
toReturn = getNext();
255
Lock lock(mMutex); (void)lock;
257
if (mFifo.empty()) // WATCHOUT: Do not test mSize instead
259
toReturn = mFifo.front();
264
const UInt64 begin(Timer::getTimeMs());
265
const UInt64 end(begin + (unsigned int)(ms)); // !kh! ms should've been unsigned :(
266
Lock lock(mMutex); (void)lock;
269
// Wait until there are messages available
270
while (mFifo.empty())
276
const UInt64 now(Timer::getTimeMs());
282
unsigned int timeout((unsigned int)(end - now));
284
// bail if total wait time exceeds limit
285
bool signaled = mCondition.wait(mMutex, timeout);
292
// Return the first message on the fifo.
294
toReturn=mFifo.front();
300
typedef std::deque<T> Messages;
302
void getMultiple(Messages& other, unsigned int max)
304
Lock lock(mMutex); (void)lock;
306
assert(other.empty());
307
while (mFifo.empty())
309
mCondition.wait(mMutex);
312
if(mFifo.size() <= max)
314
std::swap(mFifo, other);
315
onMessagePopped(mSize);
322
other.push_back(mFifo.front());
325
onMessagePopped((unsigned int)num);
329
bool getMultiple(int ms, Messages& other, unsigned int max)
333
getMultiple(other,max);
337
assert(other.empty());
338
const UInt64 begin(Timer::getTimeMs());
339
const UInt64 end(begin + (unsigned int)(ms)); // !kh! ms should've been unsigned :(
340
Lock lock(mMutex); (void)lock;
343
// Wait until there are messages available
344
while (mFifo.empty())
350
const UInt64 now(Timer::getTimeMs());
356
unsigned int timeout((unsigned int)(end - now));
358
// bail if total wait time exceeds limit
359
bool signaled = mCondition.wait(mMutex, timeout);
366
if(mFifo.size() <= max)
368
std::swap(mFifo, other);
369
onMessagePopped(mSize);
376
other.push_back(mFifo.front());
379
onMessagePopped((unsigned int)num);
384
size_t add(const T& item)
386
Lock lock(mMutex); (void)lock;
387
mFifo.push_back(item);
393
size_t addMultiple(Messages& items)
395
Lock lock(mMutex); (void)lock;
396
size_t size=items.size();
399
std::swap(mFifo, items);
403
// I suppose it is possible to optimize this as a push_front() from
404
// mFifo to items, and then do a swap, if items is larger.
405
while(!items.empty())
407
mFifo.push_back(items.front());
412
onMessagePushed((int)size);
416
/** @brief container for FIFO items */
418
/** @brief access serialization lock */
419
mutable Mutex mMutex;
420
/** @brief condition for waiting on new queue items */
421
Condition mCondition;
423
mutable UInt64 mLastSampleTakenMicroSec;
424
mutable UInt32 mCounter;
425
mutable UInt32 mAverageServiceTimeMicroSec;
426
// std::deque has to perform some amount of traversal to calculate its
427
// size; we maintain this count so that it can be queried without locking,
428
// in situations where it being off by a small amount is ok.
431
virtual void onFifoPolled()
433
// !bwc! TODO allow this sampling frequency to be tweaked
434
if(mLastSampleTakenMicroSec &&
436
(mCounter >= 64 || mFifo.empty()))
438
UInt64 now(Timer::getTimeMicroSec());
439
UInt64 diff = now-mLastSampleTakenMicroSec;
443
mAverageServiceTimeMicroSec=(UInt32)resipIntDiv(diff, mCounter);
445
else // fifo got emptied; merge into a rolling average
447
// .bwc. This is a moving average with period 64, round to
449
mAverageServiceTimeMicroSec=(UInt32)resipIntDiv(
450
diff+((4096-mCounter)*mAverageServiceTimeMicroSec),
456
mLastSampleTakenMicroSec=0;
460
mLastSampleTakenMicroSec=now;
466
Called when a message (or messages) are removed from this fifo. Used to
467
drive service time calculations.
469
virtual void onMessagePopped(unsigned int num=1)
475
virtual void onMessagePushed(int num)
479
// Fifo went from empty to non-empty. Take a timestamp, and record
480
// how long it takes to process some messages.
481
mLastSampleTakenMicroSec=Timer::getTimeMicroSec();
486
// no value semantics
487
AbstractFifo(const AbstractFifo&);
488
AbstractFifo& operator=(const AbstractFifo&);
495
/* ====================================================================
496
* The Vovida Software License, Version 1.0
498
* Redistribution and use in source and binary forms, with or without
499
* modification, are permitted provided that the following conditions
502
* 1. Redistributions of source code must retain the above copyright
503
* notice, this list of conditions and the following disclaimer.
505
* 2. Redistributions in binary form must reproduce the above copyright
506
* notice, this list of conditions and the following disclaimer in
507
* the documentation and/or other materials provided with the
510
* 3. The names "VOCAL", "Vovida Open Communication Application Library",
511
* and "Vovida Open Communication Application Library (VOCAL)" must
512
* not be used to endorse or promote products derived from this
513
* software without prior written permission. For written
514
* permission, please contact vocal@vovida.org.
516
* 4. Products derived from this software may not be called "VOCAL", nor
517
* may "VOCAL" appear in their name, without prior written
518
* permission of Vovida Networks, Inc.
520
* THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED
521
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
522
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND
523
* NON-INFRINGEMENT ARE DISCLAIMED. IN NO EVENT SHALL VOVIDA
524
* NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES
525
* IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL,
526
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
527
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
528
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
529
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
530
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
531
* USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
534
* ====================================================================
536
* This software consists of voluntary contributions made by Vovida
537
* Networks, Inc. and many individuals on behalf of Vovida Networks,
538
* Inc. For more information on Vovida Networks, Inc., please see
539
* <http://www.vovida.org/>.