~ubuntu-branches/ubuntu/wily/sflphone/wily

« back to all changes in this revision

Viewing changes to daemon/src/audio/mainbuffer.cpp

  • Committer: Package Import Robot
  • Author(s): Jonathan Riddell
  • Date: 2015-01-07 14:51:16 UTC
  • mfrom: (4.3.5 sid)
  • Revision ID: package-import@ubuntu.com-20150107145116-yxnafinf4lrdvrmx
Tags: 1.4.1-0.1ubuntu1
* Merge with Debian, remaining changes:
 - Drop soprano, nepomuk build-dep
* Drop ubuntu patches, now upstream

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/*
2
 
 *  Copyright (C) 2004-2013 Savoir-Faire Linux Inc.
 
2
 *  Copyright (C) 2004-2014 Savoir-Faire Linux Inc.
3
3
 *  Author : Alexandre Savard <alexandre.savard@savoirfairelinux.com>
4
4
 *
5
5
 *  This program is free software; you can redistribute it and/or modify
31
31
#include "mainbuffer.h"
32
32
#include "ringbuffer.h"
33
33
#include "sfl_types.h" // for SIZEBUF
 
34
#include "logger.h"
 
35
 
34
36
#include <limits>
 
37
#include <utility> // for std::pair
35
38
#include <cstring>
36
 
#include <utility> // for std::pair
37
 
#include "logger.h"
38
39
 
39
40
const char * const MainBuffer::DEFAULT_ID = "audiolayer_id";
40
41
 
41
 
MainBuffer::MainBuffer() : ringBufferMap_(), callIDMap_(), mutex_(), internalSamplingRate_(8000)
 
42
MainBuffer::MainBuffer()
42
43
{}
43
44
 
44
45
MainBuffer::~MainBuffer()
45
46
{
46
 
    // delete any ring buffers that didn't get removed
47
 
    for (auto &item : ringBufferMap_)
48
 
        delete item.second;
49
 
}
50
 
 
51
 
void MainBuffer::setInternalSamplingRate(int sr)
52
 
{
53
 
    if (sr != internalSamplingRate_) {
54
 
        flushAllBuffers();
55
 
        internalSamplingRate_ = sr;
56
 
    }
57
 
}
58
 
 
59
 
CallIDSet* MainBuffer::getCallIDSet(const std::string &call_id)
 
47
    // Delete any ring buffers that didn't get removed.
 
48
    // XXXX: With a good design this should never happen! :-P
 
49
    for (const auto& item : ringBufferMap_) {
 
50
        const auto& shared = item.second;
 
51
        if (shared.use_count() > 1)
 
52
            WARN("Leaking RingBuffer %s", shared->getBufferId().c_str());
 
53
    }
 
54
}
 
55
 
 
56
void MainBuffer::setInternalSamplingRate(unsigned sr)
 
57
{
 
58
    if (sr != internalAudioFormat_.sample_rate) {
 
59
        flushAllBuffers();
 
60
        internalAudioFormat_.sample_rate = sr;
 
61
    }
 
62
}
 
63
 
 
64
void MainBuffer::setInternalAudioFormat(AudioFormat format)
 
65
{
 
66
    if (format != internalAudioFormat_) {
 
67
        flushAllBuffers();
 
68
        internalAudioFormat_ = format;
 
69
    }
 
70
}
 
71
 
 
72
bool MainBuffer::hasCallIDSet(const std::string& call_id)
 
73
{
 
74
    return callIDMap_.find(call_id) != callIDMap_.end();
 
75
}
 
76
 
 
77
std::shared_ptr<CallIDSet> MainBuffer::getCallIDSet(const std::string& call_id)
60
78
{
61
79
    CallIDMap::iterator iter = callIDMap_.find(call_id);
62
 
    return (iter != callIDMap_.end()) ? iter->second : NULL;
63
 
}
64
 
 
65
 
void MainBuffer::createCallIDSet(const std::string &set_id)
66
 
{
67
 
    if (getCallIDSet(set_id) == NULL)
68
 
        callIDMap_[set_id] = new CallIDSet;
 
80
    return (iter != callIDMap_.end()) ? iter->second : nullptr;
 
81
}
 
82
 
 
83
std::shared_ptr<CallIDSet>
 
84
MainBuffer::getCallIDSet(const std::string& call_id) const
 
85
{
 
86
    CallIDMap::const_iterator iter = callIDMap_.find(call_id);
 
87
    return (iter != callIDMap_.end()) ? iter->second : nullptr;
 
88
}
 
89
 
 
90
void MainBuffer::createCallIDSet(const std::string& set_id)
 
91
{
 
92
    if (!hasCallIDSet(set_id))
 
93
        callIDMap_[set_id] = std::make_shared<CallIDSet>();
69
94
    else
70
 
        DEBUG("CallID set %s already exists, ignoring", set_id.c_str());
 
95
        DEBUG("CallID set %s already exists", set_id.c_str());
71
96
}
72
97
 
73
 
void MainBuffer::removeCallIDSet(const std::string &set_id)
 
98
void MainBuffer::removeCallIDSet(const std::string& set_id)
74
99
{
75
 
    CallIDSet* callid_set = getCallIDSet(set_id);
76
 
 
77
 
    if (callid_set) {
 
100
    if (hasCallIDSet(set_id)) {
78
101
        callIDMap_.erase(set_id);
79
 
        delete callid_set;
80
102
    } else
81
 
        ERROR("CallID set %s does not exist!", set_id.c_str());
82
 
}
83
 
 
84
 
void MainBuffer::addCallIDtoSet(const std::string &set_id, const std::string &call_id)
85
 
{
86
 
    CallIDSet* callid_set = getCallIDSet(set_id);
87
 
 
88
 
    if (callid_set)
89
 
        callid_set->insert(call_id);
90
 
    else
91
 
        ERROR("CallIDSet %s does not exist!", set_id.c_str());
92
 
}
93
 
 
94
 
void MainBuffer::removeCallIDfromSet(const std::string &set_id, const std::string &call_id)
95
 
{
96
 
    CallIDSet* callid_set = getCallIDSet(set_id);
97
 
 
98
 
    if (callid_set)
99
 
        callid_set->erase(call_id);
100
 
    else
101
 
        ERROR("CallIDSet %s does not exist!", set_id.c_str());
102
 
}
103
 
 
104
 
RingBuffer* MainBuffer::getRingBuffer(const std::string & call_id)
 
103
        WARN("CallID set %s does not exist!", set_id.c_str());
 
104
}
 
105
 
 
106
void MainBuffer::addCallIDtoSet(const std::string& set_id, const std::string& call_id)
 
107
{
 
108
    const auto callid_set_shared = getCallIDSet(set_id);
 
109
    if (callid_set_shared)
 
110
        callid_set_shared->insert(call_id);
 
111
    else
 
112
        WARN("CallIDSet %s does not exist!", set_id.c_str());
 
113
}
 
114
 
 
115
void MainBuffer::removeCallIDfromSet(const std::string& set_id, const std::string& call_id)
 
116
{
 
117
    const auto callid_set_shared = getCallIDSet(set_id);
 
118
    if (callid_set_shared)
 
119
        callid_set_shared->erase(call_id);
 
120
    else
 
121
        WARN("CallIDSet %s does not exist!", set_id.c_str());
 
122
}
 
123
 
 
124
bool MainBuffer::hasRingBuffer(const std::string& call_id)
 
125
{
 
126
    return ringBufferMap_.find(call_id) != ringBufferMap_.end();
 
127
}
 
128
 
 
129
std::shared_ptr<RingBuffer> MainBuffer::getRingBuffer(const std::string& call_id)
105
130
{
106
131
    RingBufferMap::iterator iter = ringBufferMap_.find(call_id);
107
 
    return (iter != ringBufferMap_.end()) ? iter->second : NULL;
 
132
    return (iter != ringBufferMap_.end()) ? iter->second : nullptr;
108
133
}
109
134
 
110
 
const RingBuffer* MainBuffer::getRingBuffer(const std::string & call_id) const
 
135
std::shared_ptr<RingBuffer> MainBuffer::getRingBuffer(const std::string& call_id) const
111
136
{
112
137
    RingBufferMap::const_iterator iter = ringBufferMap_.find(call_id);
113
 
    return (iter != ringBufferMap_.end()) ? iter->second : NULL;
 
138
    return (iter != ringBufferMap_.end()) ? iter->second : nullptr;
114
139
}
115
140
 
116
 
void MainBuffer::createRingBuffer(const std::string &call_id)
 
141
void MainBuffer::createRingBuffer(const std::string& call_id)
117
142
{
118
 
    if (!getRingBuffer(call_id))
119
 
        ringBufferMap_[call_id] = new RingBuffer(SIZEBUF, call_id);
 
143
    if (!hasRingBuffer(call_id))
 
144
        ringBufferMap_[call_id] = std::make_shared<RingBuffer>(SIZEBUF, call_id);
120
145
    else
121
146
        DEBUG("Ringbuffer already exists for call_id %s", call_id.c_str());
122
147
}
123
148
 
124
 
void MainBuffer::removeRingBuffer(const std::string &call_id)
 
149
void MainBuffer::removeRingBuffer(const std::string& call_id)
125
150
{
126
 
    RingBuffer* ring_buffer = getRingBuffer(call_id);
127
 
 
128
 
    if (ring_buffer) {
 
151
    if (hasRingBuffer(call_id)) {
129
152
        ringBufferMap_.erase(call_id);
130
 
        delete ring_buffer;
131
153
    } else
132
 
        DEBUG("Ringbuffer %s does not exist!", call_id.c_str());
 
154
        WARN("Ringbuffer %s does not exist!", call_id.c_str());
133
155
}
134
156
 
135
 
void MainBuffer::bindCallID(const std::string & call_id1, const std::string & call_id2)
 
157
void MainBuffer::bindCallID(const std::string& call_id1, const std::string& call_id2)
136
158
{
137
 
    std::lock_guard<std::mutex> guard(mutex_);
 
159
    std::lock_guard<std::recursive_mutex> lk(stateLock_);
138
160
 
139
161
    createRingBuffer(call_id1);
140
162
    createCallIDSet(call_id1);
141
163
    createRingBuffer(call_id2);
142
164
    createCallIDSet(call_id2);
143
165
 
144
 
    getRingBuffer(call_id1)->createReadPointer(call_id2);
145
 
    getRingBuffer(call_id2)->createReadPointer(call_id1);
 
166
    getRingBuffer(call_id1)->createReadOffset(call_id2);
 
167
    getRingBuffer(call_id2)->createReadOffset(call_id1);
146
168
    addCallIDtoSet(call_id1, call_id2);
147
169
    addCallIDtoSet(call_id2, call_id1);
148
170
}
149
171
 
150
 
void MainBuffer::bindHalfDuplexOut(const std::string & process_id, const std::string & call_id)
 
172
void MainBuffer::bindHalfDuplexOut(const std::string& process_id, const std::string& call_id)
151
173
{
152
 
    std::lock_guard<std::mutex> guard(mutex_);
 
174
    std::lock_guard<std::recursive_mutex> lk(stateLock_);
153
175
 
154
176
    // This method is used only for active calls, if this call does not exist, do nothing
155
 
    if (!getRingBuffer(call_id))
 
177
    if (!hasRingBuffer(call_id))
156
178
        return;
157
179
 
158
180
    createCallIDSet(process_id);
159
 
    getRingBuffer(call_id)->createReadPointer(process_id);
 
181
    getRingBuffer(call_id)->createReadOffset(process_id);
160
182
    addCallIDtoSet(process_id, call_id);
161
183
}
162
184
 
163
 
void MainBuffer::unBindCallID(const std::string & call_id1, const std::string & call_id2)
164
 
{
165
 
    std::lock_guard<std::mutex> guard(mutex_);
 
185
void MainBuffer::removeReadOffsetFromRingBuffer(const std::string& call_id1,
 
186
                                                 const std::string& call_id2)
 
187
{
 
188
    const auto ringbuffer_shared = getRingBuffer(call_id1);
 
189
    if (!ringbuffer_shared) {
 
190
        DEBUG("did not find ringbuffer %s", call_id1.c_str());
 
191
        return;
 
192
    }
 
193
 
 
194
    /* Don't remove read offset if still in use (i.e. in wait ) */
 
195
    if (ringbuffer_shared.use_count() >= 2) {
 
196
 
 
197
        /* remove them from the maps, but owners will still have
 
198
         * references to them */
 
199
        if (ringbuffer_shared->readOffsetCount() <= 1) {
 
200
            removeCallIDSet(call_id1);
 
201
            removeRingBuffer(call_id1);
 
202
        }
 
203
    } else {
 
204
 
 
205
        ringbuffer_shared->removeReadOffset(call_id2);
 
206
 
 
207
        // Remove empty RingBuffer/CallIDSet
 
208
        if (ringbuffer_shared->hasNoReadOffsets()) {
 
209
            removeCallIDSet(call_id1);
 
210
            removeRingBuffer(call_id1);
 
211
        }
 
212
    }
 
213
}
 
214
 
 
215
void MainBuffer::unBindCallID(const std::string& call_id1, const std::string& call_id2)
 
216
{
 
217
    std::lock_guard<std::recursive_mutex> lk(stateLock_);
166
218
 
167
219
    removeCallIDfromSet(call_id1, call_id2);
168
220
    removeCallIDfromSet(call_id2, call_id1);
169
221
 
170
 
    RingBuffer* ringbuffer = getRingBuffer(call_id2);
171
 
 
172
 
    if (ringbuffer) {
173
 
        ringbuffer->removeReadPointer(call_id1);
174
 
 
175
 
        if (ringbuffer->hasNoReadPointers()) {
176
 
            removeCallIDSet(call_id2);
177
 
            removeRingBuffer(call_id2);
178
 
        }
179
 
    }
180
 
 
181
 
    ringbuffer = getRingBuffer(call_id1);
182
 
 
183
 
    if (ringbuffer) {
184
 
        ringbuffer->removeReadPointer(call_id2);
185
 
 
186
 
        if (ringbuffer->hasNoReadPointers()) {
187
 
            removeCallIDSet(call_id1);
188
 
            removeRingBuffer(call_id1);
189
 
        }
190
 
    }
 
222
    removeReadOffsetFromRingBuffer(call_id1, call_id2);
 
223
    removeReadOffsetFromRingBuffer(call_id2, call_id1);
191
224
}
192
225
 
193
 
void MainBuffer::unBindHalfDuplexOut(const std::string & process_id, const std::string & call_id)
 
226
void MainBuffer::unBindHalfDuplexOut(const std::string& process_id, const std::string& call_id)
194
227
{
195
 
    std::lock_guard<std::mutex> guard(mutex_);
 
228
    std::lock_guard<std::recursive_mutex> lk(stateLock_);
196
229
 
197
230
    removeCallIDfromSet(process_id, call_id);
198
 
 
199
 
    RingBuffer* ringbuffer = getRingBuffer(call_id);
200
 
 
201
 
    if (ringbuffer) {
202
 
        ringbuffer->removeReadPointer(process_id);
203
 
 
204
 
        if (ringbuffer->hasNoReadPointers()) {
205
 
            removeCallIDSet(call_id);
206
 
            removeRingBuffer(call_id);
207
 
        }
208
 
    } else {
209
 
        DEBUG("did not found ringbuffer %s", process_id.c_str());
210
 
        removeCallIDSet(process_id);
211
 
    }
212
 
 
213
 
    CallIDSet* callid_set = getCallIDSet(process_id);
214
 
 
215
 
    if (callid_set and callid_set->empty())
 
231
    removeReadOffsetFromRingBuffer(call_id, process_id);
 
232
 
 
233
    const auto callid_set_shared = getCallIDSet(process_id);
 
234
    if (callid_set_shared and callid_set_shared->empty())
216
235
        removeCallIDSet(process_id);
217
236
}
218
237
 
219
 
void MainBuffer::unBindAll(const std::string & call_id)
 
238
void MainBuffer::unBindAll(const std::string& call_id)
220
239
{
221
 
    CallIDSet* callid_set = getCallIDSet(call_id);
 
240
    std::lock_guard<std::recursive_mutex> lk(stateLock_);
222
241
 
223
 
    if (callid_set == NULL or callid_set->empty())
 
242
    const auto callid_set_shared = getCallIDSet(call_id);
 
243
    if (!callid_set_shared or callid_set_shared->empty())
224
244
        return;
225
245
 
226
 
    CallIDSet temp_set(*callid_set);
227
 
 
228
 
    for (const auto &item_set : temp_set)
 
246
    const auto callid_set_tmp = *callid_set_shared; // temporary copy of callid_set
 
247
    for (const auto& item_set : callid_set_tmp)
229
248
        unBindCallID(call_id, item_set);
230
249
}
231
250
 
232
 
void MainBuffer::putData(AudioBuffer& buffer, const std::string &call_id)
 
251
void MainBuffer::putData(AudioBuffer& buffer, const std::string& call_id)
233
252
{
234
 
    std::lock_guard<std::mutex> guard(mutex_);
235
 
 
236
 
    RingBuffer* ring_buffer = getRingBuffer(call_id);
237
 
 
238
 
    if (ring_buffer)
239
 
        ring_buffer->put(buffer);
 
253
    std::lock_guard<std::recursive_mutex> lk(stateLock_);
 
254
 
 
255
    const auto ringbuffer_shared = getRingBuffer(call_id);
 
256
    if (ringbuffer_shared)
 
257
        ringbuffer_shared->put(buffer);
240
258
}
241
259
 
242
 
size_t MainBuffer::getData(AudioBuffer& buffer, const std::string &call_id)
 
260
size_t MainBuffer::getData(AudioBuffer& buffer, const std::string& call_id)
243
261
{
244
 
    std::lock_guard<std::mutex> guard(mutex_);
245
 
 
246
 
    CallIDSet* callid_set = getCallIDSet(call_id);
247
 
 
248
 
    if (callid_set == NULL or callid_set->empty())
 
262
    std::lock_guard<std::recursive_mutex> lk(stateLock_);
 
263
 
 
264
    const auto callid_set_shared = getCallIDSet(call_id);
 
265
    if (!callid_set_shared or callid_set_shared->empty())
249
266
        return 0;
250
267
 
251
 
    if (callid_set->size() == 1) {
252
 
 
253
 
        CallIDSet::iterator iter_id = callid_set->begin();
254
 
 
255
 
        if (iter_id != callid_set->end())
 
268
    if (callid_set_shared->size() == 1) {
 
269
        CallIDSet::iterator iter_id = callid_set_shared->begin();
 
270
 
 
271
        if (iter_id != callid_set_shared->end())
256
272
            return getDataByID(buffer, *iter_id, call_id);
257
273
        else
258
274
            return 0;
259
275
    } else {
260
276
        buffer.reset();
261
 
        buffer.setSampleRate(internalSamplingRate_);
262
 
 
263
 
        size_t size = 0;
264
 
        AudioBuffer mixBuffer(buffer);
265
 
 
266
 
        for (const auto &item_id : *callid_set) {
267
 
            size = getDataByID(mixBuffer, item_id, call_id);
268
 
 
 
277
        buffer.setFormat(internalAudioFormat_);
 
278
 
 
279
        size_t size = 0;
 
280
        AudioBuffer mixBuffer(buffer);
 
281
 
 
282
        for (const auto &item_id : *callid_set_shared) {
 
283
            // FIXME: size is not cumulated
 
284
            size = getDataByID(mixBuffer, item_id, call_id);
 
285
            if (size > 0)
 
286
                buffer.mix(mixBuffer);
 
287
        }
 
288
 
 
289
        return size;
 
290
    }
 
291
}
 
292
 
 
293
bool MainBuffer::waitForDataAvailable(const std::string& call_id, size_t min_frames, const std::chrono::microseconds& max_wait) const
 
294
{
 
295
    std::unique_lock<std::recursive_mutex> lk(stateLock_);
 
296
 
 
297
    // convert to absolute time
 
298
    const auto deadline = std::chrono::high_resolution_clock::now() + max_wait;
 
299
    std::shared_ptr<CallIDSet> callid_set_shared;
 
300
 
 
301
    callid_set_shared = getCallIDSet(call_id);
 
302
    if (!callid_set_shared or callid_set_shared->empty())
 
303
        return false;
 
304
 
 
305
    const auto callid_set_tmp = *callid_set_shared; // temporary copy of callid_set
 
306
    for (const auto &i : callid_set_tmp) {
 
307
        const auto ringbuffer_shared = getRingBuffer(i);
 
308
        if (!ringbuffer_shared)
 
309
            continue;
 
310
        lk.unlock();
 
311
        if (ringbuffer_shared->waitForDataAvailable(call_id, min_frames, deadline) < min_frames)
 
312
            return false;
 
313
        lk.lock();
 
314
    }
 
315
    return true;
 
316
}
 
317
 
 
318
size_t MainBuffer::getAvailableData(AudioBuffer& buffer, const std::string& call_id)
 
319
{
 
320
    std::lock_guard<std::recursive_mutex> lk(stateLock_);
 
321
 
 
322
    const auto callid_set_shared = getCallIDSet(call_id);
 
323
    if (!callid_set_shared or callid_set_shared->empty())
 
324
        return 0;
 
325
 
 
326
    if (callid_set_shared->size() == 1) {
 
327
        CallIDSet::iterator iter_id = callid_set_shared->begin();
 
328
        const auto ringbuffer_shared = getRingBuffer(*iter_id);
 
329
        if (!ringbuffer_shared)
 
330
            return 0;
 
331
        return ringbuffer_shared->get(buffer, call_id);
 
332
    } else {
 
333
        size_t availableSamples = std::numeric_limits<size_t>::max();
 
334
 
 
335
        for (const auto &i : *callid_set_shared) {
 
336
            const auto ringbuffer_shared = getRingBuffer(i);
 
337
            if (!ringbuffer_shared) continue;
 
338
            availableSamples = std::min(availableSamples, ringbuffer_shared->availableForGet(i));
 
339
        }
 
340
 
 
341
        if (availableSamples == std::numeric_limits<size_t>::max())
 
342
            return 0;
 
343
 
 
344
        availableSamples = std::min(availableSamples, buffer.frames());
 
345
        buffer.resize(availableSamples);
 
346
        buffer.reset();
 
347
        buffer.setFormat(internalAudioFormat_);
 
348
 
 
349
        size_t size = 0;
 
350
        AudioBuffer mixBuffer(buffer);
 
351
 
 
352
        for (const auto &item_id : *callid_set_shared) {
 
353
            // FIXME: size is not cumulated
 
354
            size = getDataByID(mixBuffer, item_id, call_id);
269
355
            if (size > 0) {
270
356
                buffer.mix(mixBuffer);
271
357
            }
272
358
        }
273
359
 
274
 
        return size;
 
360
        return availableSamples;
275
361
    }
276
362
}
277
363
 
278
 
size_t MainBuffer::getDataByID(AudioBuffer& buffer, const std::string & call_id, const std::string & reader_id)
 
364
size_t MainBuffer::getDataByID(AudioBuffer& buffer, const std::string& call_id, const std::string& reader_id)
279
365
{
280
 
    RingBuffer* ring_buffer = getRingBuffer(call_id);
281
 
    return ring_buffer ? ring_buffer->get(buffer, reader_id) : 0;
 
366
    const auto ringbuffer_shared = getRingBuffer(call_id);
 
367
    return ringbuffer_shared ? ringbuffer_shared->get(buffer, reader_id) : 0;
282
368
}
283
369
 
284
 
size_t MainBuffer::availableForGet(const std::string &call_id)
 
370
size_t MainBuffer::availableForGet(const std::string& call_id) const
285
371
{
286
 
    std::lock_guard<std::mutex> guard(mutex_);
287
 
 
288
 
    CallIDSet* callid_set = getCallIDSet(call_id);
289
 
 
290
 
    if (callid_set == NULL or callid_set->empty())
 
372
    std::lock_guard<std::recursive_mutex> lk(stateLock_);
 
373
 
 
374
    const auto callid_set_shared = getCallIDSet(call_id);
 
375
    if (!callid_set_shared or callid_set_shared->empty())
291
376
        return 0;
292
377
 
293
 
    if (callid_set->size() == 1) {
294
 
        CallIDSet::iterator iter_id = callid_set->begin();
 
378
    if (callid_set_shared->size() == 1) {
 
379
        CallIDSet::iterator iter_id = callid_set_shared->begin();
295
380
 
296
381
        if ((call_id != DEFAULT_ID) && (*iter_id == call_id))
297
 
            DEBUG("This problem should not occur since we have %ld elements", callid_set->size());
 
382
            DEBUG("This problem should not occur since we have %ld elements", callid_set_shared->size());
298
383
 
299
384
        return availableForGetByID(*iter_id, call_id);
300
 
 
301
385
    } else {
302
 
 
303
386
        size_t availableSamples = std::numeric_limits<size_t>::max();
304
387
 
305
 
        for (auto &i : *callid_set) {
 
388
        for (const auto &i : *callid_set_shared) {
306
389
            const size_t nbSamples = availableForGetByID(i, call_id);
307
 
 
308
390
            if (nbSamples != 0)
309
391
                availableSamples = std::min(availableSamples, nbSamples);
310
392
        }
313
395
    }
314
396
}
315
397
 
316
 
size_t MainBuffer::availableForGetByID(const std::string &call_id,
317
 
                                       const std::string &reader_id) const
 
398
size_t MainBuffer::availableForGetByID(const std::string& call_id,
 
399
                                       const std::string& reader_id) const
318
400
{
319
401
    if (call_id != DEFAULT_ID and reader_id == call_id)
320
 
        ERROR("RingBuffer has a readpointer on itself");
321
 
 
322
 
    const RingBuffer* ringbuffer = getRingBuffer(call_id);
323
 
 
324
 
    if (ringbuffer == NULL) {
325
 
        ERROR("RingBuffer does not exist");
326
 
        return 0;
327
 
    } else
328
 
        return ringbuffer->availableForGet(reader_id);
329
 
 
 
402
        WARN("RingBuffer has a readoffset on itself");
 
403
 
 
404
    const auto ringbuffer_shared = getRingBuffer(call_id);
 
405
    if (ringbuffer_shared)
 
406
        return ringbuffer_shared->availableForGet(reader_id);
 
407
 
 
408
    WARN("RingBuffer does not exist");
 
409
    return 0;
330
410
}
331
411
 
332
 
size_t MainBuffer::discard(size_t toDiscard, const std::string &call_id)
 
412
size_t MainBuffer::discard(size_t toDiscard, const std::string& call_id)
333
413
{
334
 
    std::lock_guard<std::mutex> guard(mutex_);
335
 
 
336
 
    CallIDSet* callid_set = getCallIDSet(call_id);
337
 
 
338
 
    if (!callid_set or callid_set->empty())
 
414
    std::lock_guard<std::recursive_mutex> lk(stateLock_);
 
415
 
 
416
    const auto callid_set_shared = getCallIDSet(call_id);
 
417
    if (!callid_set_shared or callid_set_shared->empty())
339
418
        return 0;
340
419
 
341
 
    for (auto &item : *callid_set)
 
420
    for (const auto &item : *callid_set_shared)
342
421
        discardByID(toDiscard, item, call_id);
343
422
 
344
423
    return toDiscard;
345
424
}
346
425
 
347
 
void MainBuffer::discardByID(size_t toDiscard, const std::string & call_id, const std::string & reader_id)
 
426
void MainBuffer::discardByID(size_t toDiscard, const std::string& call_id, const std::string& reader_id)
348
427
{
349
 
    RingBuffer* ringbuffer = getRingBuffer(call_id);
350
 
 
351
 
    if (ringbuffer)
352
 
        ringbuffer->discard(toDiscard, reader_id);
 
428
    const auto ringbuffer_shared = getRingBuffer(call_id);
 
429
    if (ringbuffer_shared)
 
430
        ringbuffer_shared->discard(toDiscard, reader_id);
353
431
}
354
432
 
355
 
void MainBuffer::flush(const std::string & call_id)
 
433
void MainBuffer::flush(const std::string& call_id)
356
434
{
357
 
    std::lock_guard<std::mutex> guard(mutex_);
358
 
 
359
 
    CallIDSet* callid_set = getCallIDSet(call_id);
360
 
 
361
 
    if (callid_set == NULL)
 
435
    std::lock_guard<std::recursive_mutex> lk(stateLock_);
 
436
 
 
437
    const auto callid_set_shared = getCallIDSet(call_id);
 
438
    if (!callid_set_shared)
362
439
        return;
363
440
 
364
 
    for (auto &item : *callid_set)
 
441
    for (const auto &item : *callid_set_shared)
365
442
        flushByID(item, call_id);
366
443
}
367
444
 
368
 
void MainBuffer::flushByID(const std::string & call_id, const std::string & reader_id)
 
445
void MainBuffer::flushByID(const std::string& call_id, const std::string& reader_id)
369
446
{
370
 
    RingBuffer* ringbuffer = getRingBuffer(call_id);
371
 
 
372
 
    if (ringbuffer)
373
 
        ringbuffer->flush(reader_id);
 
447
    const auto ringbuffer_shared = getRingBuffer(call_id);
 
448
    if (ringbuffer_shared)
 
449
        ringbuffer_shared->flush(reader_id);
374
450
}
375
451
 
376
 
 
377
452
void MainBuffer::flushAllBuffers()
378
453
{
379
 
    for (auto &item : ringBufferMap_)
 
454
    std::lock_guard<std::recursive_mutex> lk(stateLock_);
 
455
 
 
456
    for (const auto& item : ringBufferMap_)
380
457
        item.second->flushAll();
381
458
}