33
33
* as that of the covered work.
36
#include "ringbuffer.h"
40
#include <utility> // for std::pair
38
#include <utility> // for std::pair
41
#include "ringbuffer.h"
44
44
// corresponds to 160 ms (about 5 rtp packets)
45
const size_t MIN_BUFFER_SIZE = 1280;
45
static const size_t MIN_BUFFER_SIZE = 1024;
48
47
// Create a ring buffer with 'size' bytes
49
RingBuffer::RingBuffer(size_t size, const std::string &call_id) :
48
RingBuffer::RingBuffer(size_t size, const std::string &call_id, AudioFormat format /* = MONO */) :
51
, buffer_(std::max(size, MIN_BUFFER_SIZE), 1, 8000)
50
, buffer_(std::max(size, MIN_BUFFER_SIZE), format)
53
54
, buffer_id_(call_id)
85
86
const size_t buffer_size = buffer_.frames();
86
87
if (buffer_size == 0)
88
return (endPos_ + buffer_size - getReadPointer(call_id)) % buffer_size;
89
return (endPos_ + buffer_size - getReadOffset(call_id)) % buffer_size;
92
93
RingBuffer::debug()
94
DEBUG("Start=%d; End=%d; BufferSize=%d", getSmallestReadPointer(), endPos_, buffer_.frames());
95
DEBUG("Start=%d; End=%d; BufferSize=%d", getSmallestReadOffset(), endPos_, buffer_.frames());
97
size_t RingBuffer::getReadPointer(const std::string &call_id) const
98
size_t RingBuffer::getReadOffset(const std::string &call_id) const
99
if (hasNoReadPointers())
100
if (hasNoReadOffsets())
102
ReadPointer::const_iterator iter = readpointers_.find(call_id);
103
return (iter != readpointers_.end()) ? iter->second : 0;
102
ReadOffset::const_iterator iter = readoffsets_.find(call_id);
103
return (iter != readoffsets_.end()) ? iter->second : 0;
107
RingBuffer::getSmallestReadPointer() const
107
RingBuffer::getSmallestReadOffset() const
109
if (hasNoReadPointers())
109
if (hasNoReadOffsets())
112
111
size_t smallest = buffer_.frames();
114
ReadPointer::const_iterator iter;
116
for (iter = readpointers_.begin(); iter != readpointers_.end(); ++iter)
117
if (iter->second < smallest)
118
smallest = iter->second;
112
for(auto const& iter : readoffsets_)
113
smallest = std::min(smallest, iter.second);
124
RingBuffer::storeReadPointer(size_t pointer_value, const std::string &call_id)
118
RingBuffer::storeReadOffset(size_t offset, const std::string &call_id)
126
ReadPointer::iterator iter = readpointers_.find(call_id);
120
ReadOffset::iterator iter = readoffsets_.find(call_id);
128
if (iter != readpointers_.end())
129
iter->second = pointer_value;
122
if (iter != readoffsets_.end())
123
iter->second = offset;
131
DEBUG("Cannot find \"%s\" readPointer in \"%s\" ringbuffer", call_id.c_str(), buffer_id_.c_str());
136
RingBuffer::createReadPointer(const std::string &call_id)
138
if (!hasThisReadPointer(call_id))
139
readpointers_.insert(std::pair<std::string, int> (call_id, endPos_));
144
RingBuffer::removeReadPointer(const std::string &call_id)
146
ReadPointer::iterator iter = readpointers_.find(call_id);
148
if (iter != readpointers_.end())
149
readpointers_.erase(iter);
125
DEBUG("Cannot find \"%s\" readOffset in \"%s\" ringbuffer", call_id.c_str(), buffer_id_.c_str());
130
RingBuffer::createReadOffset(const std::string &call_id)
132
std::lock_guard<std::mutex> l(lock_);
133
if (!hasThisReadOffset(call_id))
134
readoffsets_.insert(std::pair<std::string, int> (call_id, endPos_));
139
RingBuffer::removeReadOffset(const std::string &call_id)
141
std::lock_guard<std::mutex> l(lock_);
142
ReadOffset::iterator iter = readoffsets_.find(call_id);
144
if (iter != readoffsets_.end())
145
readoffsets_.erase(iter);
154
RingBuffer::hasThisReadPointer(const std::string &call_id) const
150
RingBuffer::hasThisReadOffset(const std::string &call_id) const
156
return readpointers_.find(call_id) != readpointers_.end();
152
return readoffsets_.find(call_id) != readoffsets_.end();
160
bool RingBuffer::hasNoReadPointers() const
156
bool RingBuffer::hasNoReadOffsets() const
162
return readpointers_.empty();
158
return readoffsets_.empty();
169
165
// This one puts some data inside the ring buffer.
170
166
void RingBuffer::put(AudioBuffer& buf)
172
const size_t len = putLength();
168
std::lock_guard<std::mutex> l(lock_);
173
169
const size_t sample_num = buf.frames();
174
170
const size_t buffer_size = buffer_.frames();
175
171
if (buffer_size == 0)
174
size_t len = putLength();
175
if (buffer_size - len < sample_num)
178
177
size_t toCopy = sample_num;
180
179
// Add more channels if the input buffer holds more channels than the ring.
181
180
if (buffer_.channels() < buf.channels())
182
181
buffer_.setChannelNum(buf.channels());
184
if (toCopy > buffer_size - len)
185
toCopy = buffer_size - len;
187
183
size_t in_pos = 0;
188
184
size_t pos = endPos_;
213
210
return getLength(call_id);
216
// Get will move 'toCopy' bytes from the internal FIFO to 'buffer'
217
213
size_t RingBuffer::get(AudioBuffer& buf, const std::string &call_id)
219
if (hasNoReadPointers())
222
if (not hasThisReadPointer(call_id))
225
const size_t len = getLength(call_id);
226
const size_t sample_num = buf.frames();
215
std::lock_guard<std::mutex> l(lock_);
217
if (hasNoReadOffsets())
220
if (not hasThisReadOffset(call_id))
227
223
const size_t buffer_size = buffer_.frames();
228
224
if (buffer_size == 0)
227
size_t len = getLength(call_id);
228
const size_t sample_num = buf.frames();
230
229
size_t toCopy = std::min(sample_num, len);
230
if (toCopy and toCopy != sample_num) {
231
DEBUG("Partial get: %d/%d", toCopy, sample_num);
232
234
const size_t copied = toCopy;
235
size_t startPos = getReadPointer(call_id);
237
size_t startPos = getReadOffset(call_id);
237
239
while (toCopy > 0) {
238
240
size_t block = toCopy;
250
storeReadPointer(startPos, call_id);
252
storeReadOffset(startPos, call_id);
257
size_t RingBuffer::waitForDataAvailable(const std::string &call_id, const size_t min_data_length, const std::chrono::high_resolution_clock::time_point& deadline) const
259
std::unique_lock<std::mutex> l(lock_);
260
const size_t buffer_size = buffer_.frames();
261
if (buffer_size < min_data_length) return 0;
262
ReadOffset::const_iterator read_ptr = readoffsets_.find(call_id);
263
if (read_ptr == readoffsets_.end()) return 0;
265
if (deadline == std::chrono::high_resolution_clock::time_point()) {
266
not_empty_.wait(l, [=, &getl] {
267
getl = (endPos_ + buffer_size - read_ptr->second) % buffer_size;
268
return getl >= min_data_length;
271
not_empty_.wait_until(l, deadline, [=, &getl]{
272
getl = (endPos_ + buffer_size - read_ptr->second) % buffer_size;
273
return getl >= min_data_length;
255
280
RingBuffer::discard(size_t toDiscard, const std::string &call_id)
282
std::lock_guard<std::mutex> l(lock_);
284
const size_t buffer_size = buffer_.frames();
285
if (buffer_size == 0)
257
288
size_t len = getLength(call_id);
259
289
if (toDiscard > len)
262
size_t buffer_size = buffer_.frames();
292
size_t startPos = (getReadOffset(call_id) + toDiscard) % buffer_size;
293
storeReadOffset(startPos, call_id);
298
RingBuffer::discard(size_t toDiscard)
300
const size_t buffer_size = buffer_.frames();
263
301
if (buffer_size == 0)
265
size_t startPos = (getReadPointer(call_id) + toDiscard) % buffer_size;
267
storeReadPointer(startPos, call_id);
304
for (auto & r : readoffsets_) {
305
size_t dst = (r.second + buffer_size - endPos_) % buffer_size;
306
if (dst < toDiscard) {
307
DEBUG("%s : discarding: %d frames", r.first.c_str(), toDiscard - dst);
308
r.second = (r.second + toDiscard - dst) % buffer_size;
269
311
return toDiscard;