1
/***************************************************************************
3
copyright : (C) 2006 by mean
4
email : fixounet@free.fr
5
***************************************************************************/
7
/***************************************************************************
9
* This program is free software; you can redistribute it and/or modify *
10
* it under the terms of the GNU General Public License as published by *
11
* the Free Software Foundation; either version 2 of the License, or *
12
* (at your option) any later version. *
14
***************************************************************************/
20
#include <ADM_assert.h>
21
#include "ADM_library/default.h"
22
#include "ADM_toolkit/ADM_threads.h"
24
#include "ADM_packetQueue.h"
26
Constructor for packetQueue
27
\param name : name of the packetQueue, useful for debugging
28
\param nbSlot : How many packets can the packetQueue hold
29
\param buffSize : BufferSize in bytes
32
PacketQueue::PacketQueue(const char *name,uint32_t nbSlot,uint32_t buffSize)
37
_name=ADM_strdup(name);
38
_buffer=new uint8_t[_bufferSize];
39
_slots=new Slots[_nbSlots];
40
_mutex=new admMutex(_name);
41
_pusherCond=new admCond(_mutex);
42
_poperCond=new admCond(_mutex);
43
_slotQueue=_slotHead=0;
44
_bufferQueue=_bufferHead=0;
47
printf("PacketQueue %s created\n",_name);
53
PacketQueue::~PacketQueue()
55
if(_pusherCond) delete _pusherCond;
57
if(_poperCond) delete _poperCond;
59
if(_mutex) delete _mutex;
61
if(_name) ADM_dealloc(_name);
63
if(_buffer) delete [] _buffer;
65
if(_slots) delete [] _slots;
68
Signal the pusher has finished, wake the poper.
69
Warning : Must be called with mutex held
72
uint8_t PacketQueue::IsEmpty(void)
76
if(_slotHead==_slotQueue)
84
Signal the pusher has finished, wake the poper.
88
uint8_t PacketQueue::Finished(void)
99
Put a packet in the buffer
100
Wait on _pusherCond if no slot available or buffer full
101
\param ptr : Ptr where to take the packet from
102
\param size : packetsize
105
uint8_t PacketQueue::Push(uint8_t *ptr, uint32_t size,uint32_t sample)
111
// First try to allocate a slot
112
while(((_nbSlots+_slotHead-_slotQueue)%_nbSlots)==1)
117
// Ok we have a slot,
118
// Now lets's see if we have enough data in the buffer (we are still under lock)
121
available=availableSpace();
128
_slots[slot].size=size;
129
_slots[slot].sample=sample;
130
_slots[slot].startAt=_bufferHead;
132
// printf("Pushing slot %d at %u\n",slot,_bufferHead,_slots[slot].startAt);
133
if(_bufferSize>=(_bufferHead+size))
135
memcpy(&_buffer[ _bufferHead],ptr,size);
140
uint32_t part1=_bufferSize-_bufferHead;
141
memcpy(&_buffer[ _bufferHead],ptr,part1);
142
memcpy(&_buffer[ 0],ptr+part1,size-part1);
143
_bufferHead=size-part1;
145
// Look if someone was waiting ...
146
if(_poperCond->iswaiting())
148
_poperCond->wakeup();
157
printf("[PKTQ] %s is eof\n",_name);
161
Read a packet from the queue
162
Wait on the _poperCond if empty
163
If aborted returns immediatly
164
\param ptr : Ptr where to put the packet
165
\param size : returns packetsize
168
uint8_t PacketQueue::Pop(uint8_t *ptr, uint32_t *size,uint32_t *sample)
172
uint32_t available,sz,position;
174
// is there something ?
175
while(IsEmpty() && !_eof)
180
if(IsEmpty() && _eof)
187
//printf("Pop : Head %u Tail %u\n",_slotHead,_slotQueue);
191
_slotQueue%=_nbSlots;
192
sz=*size=_slots[slot].size;
193
*sample=_slots[slot].sample;
194
//printf("Poping slot %d at %u\n",slot,_bufferQueue,_slots[slot].startAt);
195
if(!_bufferQueue==_slots[slot].startAt)
197
printf("Buffer Q:%u\n",_bufferQueue);
198
printf("Slot :%u\n",_slots[slot].startAt);
202
if(_bufferSize>=(_bufferQueue+sz))
204
memcpy(ptr,&(_buffer[_bufferQueue]),sz);
206
_bufferQueue%=_bufferSize;
210
uint32_t part1=_bufferSize-_bufferQueue;
211
memcpy(ptr,&_buffer[ _bufferQueue],part1);
212
memcpy(ptr+part1,&_buffer[ 0],sz-part1);
213
_bufferQueue=sz-part1;
215
// In case we made some space
216
if(_pusherCond->iswaiting())
218
uint32_t third=(2*_bufferSize)/3;
219
uint32_t available=availableSpace();
220
// Only wakeup if we go over 1/3 of the buffer
221
ADM_assert(available>=sz);
222
if(available > third)
223
_pusherCond->wakeup();
229
Returns available space in bytes in the buffer
230
Warning, this method must be called with mutex
234
uint32_t PacketQueue::availableSpace(void)
237
uint32_t space=_bufferSize+_bufferHead-_bufferQueue;
238
space=space%_bufferSize;
239
space=_bufferSize-space;
243
uint8_t PacketQueue::Abort(void)
250
_slotHead=_slotQueue=0;
251
_bufferQueue=_bufferHead=0;
253
_pusherCond->abort();
257
uint8_t PacketQueue::isEof(void)
261
if((_eof||_aborted) && _slotHead==_slotQueue) r=1;