31
31
#include "mainbuffer.h"
32
32
#include "ringbuffer.h"
33
33
#include "sfl_types.h" // for SIZEBUF
37
#include <utility> // for std::pair
36
#include <utility> // for std::pair
39
40
const char * const MainBuffer::DEFAULT_ID = "audiolayer_id";
41
MainBuffer::MainBuffer() : ringBufferMap_(), callIDMap_(), mutex_(), internalSamplingRate_(8000)
42
MainBuffer::MainBuffer()
44
45
MainBuffer::~MainBuffer()
46
// delete any ring buffers that didn't get removed
47
for (auto &item : ringBufferMap_)
51
void MainBuffer::setInternalSamplingRate(int sr)
53
if (sr != internalSamplingRate_) {
55
internalSamplingRate_ = sr;
59
CallIDSet* MainBuffer::getCallIDSet(const std::string &call_id)
47
// Delete any ring buffers that didn't get removed.
48
// XXXX: With a good design this should never happen! :-P
49
for (const auto& item : ringBufferMap_) {
50
const auto& shared = item.second;
51
if (shared.use_count() > 1)
52
WARN("Leaking RingBuffer %s", shared->getBufferId().c_str());
56
void MainBuffer::setInternalSamplingRate(unsigned sr)
58
if (sr != internalAudioFormat_.sample_rate) {
60
internalAudioFormat_.sample_rate = sr;
64
void MainBuffer::setInternalAudioFormat(AudioFormat format)
66
if (format != internalAudioFormat_) {
68
internalAudioFormat_ = format;
72
bool MainBuffer::hasCallIDSet(const std::string& call_id)
74
return callIDMap_.find(call_id) != callIDMap_.end();
77
std::shared_ptr<CallIDSet> MainBuffer::getCallIDSet(const std::string& call_id)
61
79
CallIDMap::iterator iter = callIDMap_.find(call_id);
62
return (iter != callIDMap_.end()) ? iter->second : NULL;
65
void MainBuffer::createCallIDSet(const std::string &set_id)
67
if (getCallIDSet(set_id) == NULL)
68
callIDMap_[set_id] = new CallIDSet;
80
return (iter != callIDMap_.end()) ? iter->second : nullptr;
83
std::shared_ptr<CallIDSet>
84
MainBuffer::getCallIDSet(const std::string& call_id) const
86
CallIDMap::const_iterator iter = callIDMap_.find(call_id);
87
return (iter != callIDMap_.end()) ? iter->second : nullptr;
90
void MainBuffer::createCallIDSet(const std::string& set_id)
92
if (!hasCallIDSet(set_id))
93
callIDMap_[set_id] = std::make_shared<CallIDSet>();
70
DEBUG("CallID set %s already exists, ignoring", set_id.c_str());
95
DEBUG("CallID set %s already exists", set_id.c_str());
73
void MainBuffer::removeCallIDSet(const std::string &set_id)
98
void MainBuffer::removeCallIDSet(const std::string& set_id)
75
CallIDSet* callid_set = getCallIDSet(set_id);
100
if (hasCallIDSet(set_id)) {
78
101
callIDMap_.erase(set_id);
81
ERROR("CallID set %s does not exist!", set_id.c_str());
84
void MainBuffer::addCallIDtoSet(const std::string &set_id, const std::string &call_id)
86
CallIDSet* callid_set = getCallIDSet(set_id);
89
callid_set->insert(call_id);
91
ERROR("CallIDSet %s does not exist!", set_id.c_str());
94
void MainBuffer::removeCallIDfromSet(const std::string &set_id, const std::string &call_id)
96
CallIDSet* callid_set = getCallIDSet(set_id);
99
callid_set->erase(call_id);
101
ERROR("CallIDSet %s does not exist!", set_id.c_str());
104
RingBuffer* MainBuffer::getRingBuffer(const std::string & call_id)
103
WARN("CallID set %s does not exist!", set_id.c_str());
106
void MainBuffer::addCallIDtoSet(const std::string& set_id, const std::string& call_id)
108
const auto callid_set_shared = getCallIDSet(set_id);
109
if (callid_set_shared)
110
callid_set_shared->insert(call_id);
112
WARN("CallIDSet %s does not exist!", set_id.c_str());
115
void MainBuffer::removeCallIDfromSet(const std::string& set_id, const std::string& call_id)
117
const auto callid_set_shared = getCallIDSet(set_id);
118
if (callid_set_shared)
119
callid_set_shared->erase(call_id);
121
WARN("CallIDSet %s does not exist!", set_id.c_str());
124
bool MainBuffer::hasRingBuffer(const std::string& call_id)
126
return ringBufferMap_.find(call_id) != ringBufferMap_.end();
129
std::shared_ptr<RingBuffer> MainBuffer::getRingBuffer(const std::string& call_id)
106
131
RingBufferMap::iterator iter = ringBufferMap_.find(call_id);
107
return (iter != ringBufferMap_.end()) ? iter->second : NULL;
132
return (iter != ringBufferMap_.end()) ? iter->second : nullptr;
110
const RingBuffer* MainBuffer::getRingBuffer(const std::string & call_id) const
135
std::shared_ptr<RingBuffer> MainBuffer::getRingBuffer(const std::string& call_id) const
112
137
RingBufferMap::const_iterator iter = ringBufferMap_.find(call_id);
113
return (iter != ringBufferMap_.end()) ? iter->second : NULL;
138
return (iter != ringBufferMap_.end()) ? iter->second : nullptr;
116
void MainBuffer::createRingBuffer(const std::string &call_id)
141
void MainBuffer::createRingBuffer(const std::string& call_id)
118
if (!getRingBuffer(call_id))
119
ringBufferMap_[call_id] = new RingBuffer(SIZEBUF, call_id);
143
if (!hasRingBuffer(call_id))
144
ringBufferMap_[call_id] = std::make_shared<RingBuffer>(SIZEBUF, call_id);
121
146
DEBUG("Ringbuffer already exists for call_id %s", call_id.c_str());
124
void MainBuffer::removeRingBuffer(const std::string &call_id)
149
void MainBuffer::removeRingBuffer(const std::string& call_id)
126
RingBuffer* ring_buffer = getRingBuffer(call_id);
151
if (hasRingBuffer(call_id)) {
129
152
ringBufferMap_.erase(call_id);
132
DEBUG("Ringbuffer %s does not exist!", call_id.c_str());
154
WARN("Ringbuffer %s does not exist!", call_id.c_str());
135
void MainBuffer::bindCallID(const std::string & call_id1, const std::string & call_id2)
157
void MainBuffer::bindCallID(const std::string& call_id1, const std::string& call_id2)
137
std::lock_guard<std::mutex> guard(mutex_);
159
std::lock_guard<std::recursive_mutex> lk(stateLock_);
139
161
createRingBuffer(call_id1);
140
162
createCallIDSet(call_id1);
141
163
createRingBuffer(call_id2);
142
164
createCallIDSet(call_id2);
144
getRingBuffer(call_id1)->createReadPointer(call_id2);
145
getRingBuffer(call_id2)->createReadPointer(call_id1);
166
getRingBuffer(call_id1)->createReadOffset(call_id2);
167
getRingBuffer(call_id2)->createReadOffset(call_id1);
146
168
addCallIDtoSet(call_id1, call_id2);
147
169
addCallIDtoSet(call_id2, call_id1);
150
void MainBuffer::bindHalfDuplexOut(const std::string & process_id, const std::string & call_id)
172
void MainBuffer::bindHalfDuplexOut(const std::string& process_id, const std::string& call_id)
152
std::lock_guard<std::mutex> guard(mutex_);
174
std::lock_guard<std::recursive_mutex> lk(stateLock_);
154
176
// This method is used only for active calls, if this call does not exist, do nothing
155
if (!getRingBuffer(call_id))
177
if (!hasRingBuffer(call_id))
158
180
createCallIDSet(process_id);
159
getRingBuffer(call_id)->createReadPointer(process_id);
181
getRingBuffer(call_id)->createReadOffset(process_id);
160
182
addCallIDtoSet(process_id, call_id);
163
void MainBuffer::unBindCallID(const std::string & call_id1, const std::string & call_id2)
165
std::lock_guard<std::mutex> guard(mutex_);
185
void MainBuffer::removeReadOffsetFromRingBuffer(const std::string& call_id1,
186
const std::string& call_id2)
188
const auto ringbuffer_shared = getRingBuffer(call_id1);
189
if (!ringbuffer_shared) {
190
DEBUG("did not find ringbuffer %s", call_id1.c_str());
194
/* Don't remove read offset if still in use (i.e. in wait ) */
195
if (ringbuffer_shared.use_count() >= 2) {
197
/* remove them from the maps, but owners will still have
198
* references to them */
199
if (ringbuffer_shared->readOffsetCount() <= 1) {
200
removeCallIDSet(call_id1);
201
removeRingBuffer(call_id1);
205
ringbuffer_shared->removeReadOffset(call_id2);
207
// Remove empty RingBuffer/CallIDSet
208
if (ringbuffer_shared->hasNoReadOffsets()) {
209
removeCallIDSet(call_id1);
210
removeRingBuffer(call_id1);
215
void MainBuffer::unBindCallID(const std::string& call_id1, const std::string& call_id2)
217
std::lock_guard<std::recursive_mutex> lk(stateLock_);
167
219
removeCallIDfromSet(call_id1, call_id2);
168
220
removeCallIDfromSet(call_id2, call_id1);
170
RingBuffer* ringbuffer = getRingBuffer(call_id2);
173
ringbuffer->removeReadPointer(call_id1);
175
if (ringbuffer->hasNoReadPointers()) {
176
removeCallIDSet(call_id2);
177
removeRingBuffer(call_id2);
181
ringbuffer = getRingBuffer(call_id1);
184
ringbuffer->removeReadPointer(call_id2);
186
if (ringbuffer->hasNoReadPointers()) {
187
removeCallIDSet(call_id1);
188
removeRingBuffer(call_id1);
222
removeReadOffsetFromRingBuffer(call_id1, call_id2);
223
removeReadOffsetFromRingBuffer(call_id2, call_id1);
193
void MainBuffer::unBindHalfDuplexOut(const std::string & process_id, const std::string & call_id)
226
void MainBuffer::unBindHalfDuplexOut(const std::string& process_id, const std::string& call_id)
195
std::lock_guard<std::mutex> guard(mutex_);
228
std::lock_guard<std::recursive_mutex> lk(stateLock_);
197
230
removeCallIDfromSet(process_id, call_id);
199
RingBuffer* ringbuffer = getRingBuffer(call_id);
202
ringbuffer->removeReadPointer(process_id);
204
if (ringbuffer->hasNoReadPointers()) {
205
removeCallIDSet(call_id);
206
removeRingBuffer(call_id);
209
DEBUG("did not found ringbuffer %s", process_id.c_str());
210
removeCallIDSet(process_id);
213
CallIDSet* callid_set = getCallIDSet(process_id);
215
if (callid_set and callid_set->empty())
231
removeReadOffsetFromRingBuffer(call_id, process_id);
233
const auto callid_set_shared = getCallIDSet(process_id);
234
if (callid_set_shared and callid_set_shared->empty())
216
235
removeCallIDSet(process_id);
219
void MainBuffer::unBindAll(const std::string & call_id)
238
void MainBuffer::unBindAll(const std::string& call_id)
221
CallIDSet* callid_set = getCallIDSet(call_id);
240
std::lock_guard<std::recursive_mutex> lk(stateLock_);
223
if (callid_set == NULL or callid_set->empty())
242
const auto callid_set_shared = getCallIDSet(call_id);
243
if (!callid_set_shared or callid_set_shared->empty())
226
CallIDSet temp_set(*callid_set);
228
for (const auto &item_set : temp_set)
246
const auto callid_set_tmp = *callid_set_shared; // temporary copy of callid_set
247
for (const auto& item_set : callid_set_tmp)
229
248
unBindCallID(call_id, item_set);
232
void MainBuffer::putData(AudioBuffer& buffer, const std::string &call_id)
251
void MainBuffer::putData(AudioBuffer& buffer, const std::string& call_id)
234
std::lock_guard<std::mutex> guard(mutex_);
236
RingBuffer* ring_buffer = getRingBuffer(call_id);
239
ring_buffer->put(buffer);
253
std::lock_guard<std::recursive_mutex> lk(stateLock_);
255
const auto ringbuffer_shared = getRingBuffer(call_id);
256
if (ringbuffer_shared)
257
ringbuffer_shared->put(buffer);
242
size_t MainBuffer::getData(AudioBuffer& buffer, const std::string &call_id)
260
size_t MainBuffer::getData(AudioBuffer& buffer, const std::string& call_id)
244
std::lock_guard<std::mutex> guard(mutex_);
246
CallIDSet* callid_set = getCallIDSet(call_id);
248
if (callid_set == NULL or callid_set->empty())
262
std::lock_guard<std::recursive_mutex> lk(stateLock_);
264
const auto callid_set_shared = getCallIDSet(call_id);
265
if (!callid_set_shared or callid_set_shared->empty())
251
if (callid_set->size() == 1) {
253
CallIDSet::iterator iter_id = callid_set->begin();
255
if (iter_id != callid_set->end())
268
if (callid_set_shared->size() == 1) {
269
CallIDSet::iterator iter_id = callid_set_shared->begin();
271
if (iter_id != callid_set_shared->end())
256
272
return getDataByID(buffer, *iter_id, call_id);
261
buffer.setSampleRate(internalSamplingRate_);
264
AudioBuffer mixBuffer(buffer);
266
for (const auto &item_id : *callid_set) {
267
size = getDataByID(mixBuffer, item_id, call_id);
277
buffer.setFormat(internalAudioFormat_);
280
AudioBuffer mixBuffer(buffer);
282
for (const auto &item_id : *callid_set_shared) {
283
// FIXME: size is not cumulated
284
size = getDataByID(mixBuffer, item_id, call_id);
286
buffer.mix(mixBuffer);
293
bool MainBuffer::waitForDataAvailable(const std::string& call_id, size_t min_frames, const std::chrono::microseconds& max_wait) const
295
std::unique_lock<std::recursive_mutex> lk(stateLock_);
297
// convert to absolute time
298
const auto deadline = std::chrono::high_resolution_clock::now() + max_wait;
299
std::shared_ptr<CallIDSet> callid_set_shared;
301
callid_set_shared = getCallIDSet(call_id);
302
if (!callid_set_shared or callid_set_shared->empty())
305
const auto callid_set_tmp = *callid_set_shared; // temporary copy of callid_set
306
for (const auto &i : callid_set_tmp) {
307
const auto ringbuffer_shared = getRingBuffer(i);
308
if (!ringbuffer_shared)
311
if (ringbuffer_shared->waitForDataAvailable(call_id, min_frames, deadline) < min_frames)
318
size_t MainBuffer::getAvailableData(AudioBuffer& buffer, const std::string& call_id)
320
std::lock_guard<std::recursive_mutex> lk(stateLock_);
322
const auto callid_set_shared = getCallIDSet(call_id);
323
if (!callid_set_shared or callid_set_shared->empty())
326
if (callid_set_shared->size() == 1) {
327
CallIDSet::iterator iter_id = callid_set_shared->begin();
328
const auto ringbuffer_shared = getRingBuffer(*iter_id);
329
if (!ringbuffer_shared)
331
return ringbuffer_shared->get(buffer, call_id);
333
size_t availableSamples = std::numeric_limits<size_t>::max();
335
for (const auto &i : *callid_set_shared) {
336
const auto ringbuffer_shared = getRingBuffer(i);
337
if (!ringbuffer_shared) continue;
338
availableSamples = std::min(availableSamples, ringbuffer_shared->availableForGet(i));
341
if (availableSamples == std::numeric_limits<size_t>::max())
344
availableSamples = std::min(availableSamples, buffer.frames());
345
buffer.resize(availableSamples);
347
buffer.setFormat(internalAudioFormat_);
350
AudioBuffer mixBuffer(buffer);
352
for (const auto &item_id : *callid_set_shared) {
353
// FIXME: size is not cumulated
354
size = getDataByID(mixBuffer, item_id, call_id);
270
356
buffer.mix(mixBuffer);
360
return availableSamples;
278
size_t MainBuffer::getDataByID(AudioBuffer& buffer, const std::string & call_id, const std::string & reader_id)
364
size_t MainBuffer::getDataByID(AudioBuffer& buffer, const std::string& call_id, const std::string& reader_id)
280
RingBuffer* ring_buffer = getRingBuffer(call_id);
281
return ring_buffer ? ring_buffer->get(buffer, reader_id) : 0;
366
const auto ringbuffer_shared = getRingBuffer(call_id);
367
return ringbuffer_shared ? ringbuffer_shared->get(buffer, reader_id) : 0;
284
size_t MainBuffer::availableForGet(const std::string &call_id)
370
size_t MainBuffer::availableForGet(const std::string& call_id) const
286
std::lock_guard<std::mutex> guard(mutex_);
288
CallIDSet* callid_set = getCallIDSet(call_id);
290
if (callid_set == NULL or callid_set->empty())
372
std::lock_guard<std::recursive_mutex> lk(stateLock_);
374
const auto callid_set_shared = getCallIDSet(call_id);
375
if (!callid_set_shared or callid_set_shared->empty())
293
if (callid_set->size() == 1) {
294
CallIDSet::iterator iter_id = callid_set->begin();
378
if (callid_set_shared->size() == 1) {
379
CallIDSet::iterator iter_id = callid_set_shared->begin();
296
381
if ((call_id != DEFAULT_ID) && (*iter_id == call_id))
297
DEBUG("This problem should not occur since we have %ld elements", callid_set->size());
382
DEBUG("This problem should not occur since we have %ld elements", callid_set_shared->size());
299
384
return availableForGetByID(*iter_id, call_id);
303
386
size_t availableSamples = std::numeric_limits<size_t>::max();
305
for (auto &i : *callid_set) {
388
for (const auto &i : *callid_set_shared) {
306
389
const size_t nbSamples = availableForGetByID(i, call_id);
308
390
if (nbSamples != 0)
309
391
availableSamples = std::min(availableSamples, nbSamples);