1
// ------------------------------------------------------------------------
2
// audioio-db-server.cpp: Audio i/o engine serving db clients.
3
// Copyright (C) 2000-2004 Kai Vehmanen
6
// eca-style-version: 3
8
// This program is free software; you can redistribute it and/or modify
9
// it under the terms of the GNU General Public License as published by
10
// the Free Software Foundation; either version 2 of the License, or
11
// (at your option) any later version.
13
// This program is distributed in the hope that it will be useful,
14
// but WITHOUT ANY WARRANTY; without even the implied warranty of
15
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16
// GNU General Public License for more details.
18
// You should have received a copy of the GNU General Public License
19
// along with this program; if not, write to the Free Software
20
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21
// ------------------------------------------------------------------------
30
#include <errno.h> /* ETIMEDOUT */
34
#include <sys/time.h> /* gettimeofday() */
37
#include <kvu_utils.h>
38
#include <kvu_numtostr.h>
40
#include "sample-specs.h"
41
#include "samplebuffer.h"
42
#include "eca-logger.h"
43
#include "audioio-db-server.h"
44
#include "audioio-db-server_impl.h"
49
// #define DB_PROFILING
55
#define DB_PROFILING_INC(x) ++(x)
56
#define DB_PROFILING_STATEMENT(x) x
58
#define DB_PROFILING_INC(x) (void)0
59
#define DB_PROFILING_STATEMENT(x) (void)0
63
// Initialization of static member functions
65
const int AUDIO_IO_DB_SERVER::buffercount_default = 32;
66
const long int AUDIO_IO_DB_SERVER::buffersize_default = 1024;
69
// Initialization of static, global functions
71
static int timed_wait(pthread_mutex_t* mutex, pthread_cond_t* cond, long int usecs);
72
static void timed_wait_print_result(int result, const char* tag, bool verbose);
75
* Helper function for starting the slave thread.
77
void* start_db_server_io_thread(void *ptr)
81
sigaddset(&sigset, SIGINT);
82
sigprocmask(SIG_BLOCK, &sigset, 0);
84
AUDIO_IO_DB_SERVER* pserver =
85
static_cast<AUDIO_IO_DB_SERVER*>(ptr);
94
AUDIO_IO_DB_SERVER::AUDIO_IO_DB_SERVER (void)
96
ECA_LOG_MSG(ECA_LOGGER::system_objects, "constructor");
97
buffercount_rep = buffercount_default;
98
buffersize_rep = buffersize_default;
100
impl_repp = new AUDIO_IO_DB_SERVER_impl;
102
thread_running_rep = false;
104
pthread_cond_init(&impl_repp->client_cond_rep, NULL);
105
pthread_mutex_init(&impl_repp->client_mutex_rep, NULL);
106
pthread_cond_init(&impl_repp->data_cond_rep, NULL);
107
pthread_mutex_init(&impl_repp->data_mutex_rep, NULL);
108
pthread_cond_init(&impl_repp->full_cond_rep, NULL);
109
pthread_mutex_init(&impl_repp->full_mutex_rep, NULL);
110
pthread_cond_init(&impl_repp->stop_cond_rep, NULL);
111
pthread_mutex_init(&impl_repp->stop_mutex_rep, NULL);
112
pthread_cond_init(&impl_repp->flush_cond_rep, NULL);
113
pthread_mutex_init(&impl_repp->flush_mutex_rep, NULL);
117
stop_request_rep.set(0);
118
exit_request_rep.set(0);
121
impl_repp->profile_full_rep = 0;
122
impl_repp->profile_no_processing_rep = 0;
123
impl_repp->profile_not_full_anymore_rep = 0;
124
impl_repp->profile_processing_rep = 0;
125
impl_repp->profile_read_xrun_danger_rep = 0;
126
impl_repp->profile_write_xrun_danger_rep = 0;
127
impl_repp->profile_rounds_total_rep = 0;
131
* Destructor. Doesn't delete any client objects.
133
AUDIO_IO_DB_SERVER::~AUDIO_IO_DB_SERVER(void)
135
ECA_LOG_MSG(ECA_LOGGER::system_objects, "destructor");
136
stop_request_rep.set(1);
137
exit_request_rep.set(1);
139
if (thread_running_rep == true) {
140
pthread_join(impl_repp->io_thread_rep, 0);
142
for(unsigned int p = 0; p < buffers_rep.size(); p++) {
143
delete buffers_rep[p];
148
DB_PROFILING_STATEMENT(dump_profile_counters());
152
* Starts the db server.
154
* @pre is_running() != true
156
void AUDIO_IO_DB_SERVER::start(void)
159
DBC_REQUIRE(is_running() != true);
162
ECA_LOG_MSG(ECA_LOGGER::system_objects, "start");
163
if (thread_running_rep != true) {
164
int ret = pthread_create(&impl_repp->io_thread_rep,
166
start_db_server_io_thread,
167
static_cast<void *>(this));
169
ECA_LOG_MSG(ECA_LOGGER::info, "(audio_io_db_server) pthread_create failed, exiting");
173
thread_running_rep = true;
176
stop_request_rep.set(0);
178
ECA_LOG_MSG(ECA_LOGGER::system_objects, "(audio_io_db_server) starting processing");
182
* Stops the db server.
184
void AUDIO_IO_DB_SERVER::stop(void)
186
ECA_LOG_MSG(ECA_LOGGER::system_objects, "stop");
187
stop_request_rep.set(1);
191
* Whether the db server has been started?
193
bool AUDIO_IO_DB_SERVER::is_running(void) const
195
if (running_rep.get() == 0) return false;
200
* Whether the db server buffers are full?
202
bool AUDIO_IO_DB_SERVER::is_full(void) const
204
if (full_rep.get() == 0) return false;
209
* Waits for condition to occur.
211
* @return 0 on success,
212
* -ETIMEDOUT if timeout occured,
213
* other nonzero value on other errors
215
static int timed_wait(pthread_mutex_t* mutex,
216
pthread_cond_t* cond,
220
gettimeofday(&now, 0);
222
struct timespec sleepcount;
223
sleepcount.tv_nsec = now.tv_usec * 1000 + (msecs % 1000) * 1000000;
224
sleepcount.tv_sec = now.tv_sec + msecs / 1000;
225
if (sleepcount.tv_nsec > 1000000000) {
227
sleepcount.tv_nsec -= 1000000000;
232
pthread_mutex_lock(mutex);
233
ret = pthread_cond_timedwait(cond,
236
pthread_mutex_unlock(mutex);
242
* Prints debug information based on the result
243
* of timed_wait() call.
245
static void timed_wait_print_result(int result, const char* tag, bool verbose)
247
ECA_LOGGER::Msg_level_t level = ECA_LOGGER::info;
249
level = ECA_LOGGER::continuous;
252
if (result == -ETIMEDOUT)
253
ECA_LOG_MSG(level, string("(audioio-db-server) ") + tag + " failed; timeout");
255
ECA_LOG_MSG(level, string("(audioio-db-server) ") + tag + " failed");
260
* Signals the server that one of its client has
261
* processed data from the db buffers. This
262
* function helps server to keep its buffers
263
* full without resorting to polling.
265
* Called by both db clients and the db server.
267
void AUDIO_IO_DB_SERVER::signal_client_activity(void)
269
pthread_mutex_lock(&impl_repp->client_mutex_rep);
270
pthread_cond_broadcast(&impl_repp->client_cond_rep);
271
pthread_mutex_unlock(&impl_repp->client_mutex_rep);
275
* Function that blocks until some client
278
* Only called by db server.
280
* @see signal_client_activity()
282
* @pre is_running() != true
284
void AUDIO_IO_DB_SERVER::wait_for_client_activity(void)
287
DBC_REQUIRE(is_running() == true);
290
/* note! we only wait for 100msec in case no clients
291
* clients signal activity but there's still
294
int res = timed_wait(&impl_repp->client_mutex_rep, &impl_repp->client_cond_rep, 100);
295
timed_wait_print_result(res, "wait_for_client_activity", false);
299
* Function that blocks until the server signals
300
* that all its buffers are full.
302
void AUDIO_IO_DB_SERVER::wait_for_full(void)
304
if (is_running() == true &&
305
clients_rep.size() > 0) {
307
/* note! we wait until we get a signal_full() even though
308
* full_rep could already be set */
310
int res = timed_wait(&impl_repp->full_mutex_rep, &impl_repp->full_cond_rep, 5000);
311
timed_wait_print_result(res, "wait_for_full", true);
314
ECA_LOG_MSG(ECA_LOGGER::system_objects, "wait_for_full failed; not running");
319
* Function that blocks until the server signals
320
* that it has stopped.
322
void AUDIO_IO_DB_SERVER::wait_for_stop(void)
324
if (is_running() == true) {
325
int res = timed_wait(&impl_repp->stop_mutex_rep, &impl_repp->stop_cond_rep, 5000);
326
timed_wait_print_result(res, "wait_for_stop", true);
331
* Function that blocks until the server signals
332
* that it has flushed all buffers (after
335
void AUDIO_IO_DB_SERVER::wait_for_flush(void)
337
if (is_running() == true) {
338
if (exit_ok_rep.get() == 0) {
339
signal_client_activity();
340
int res = timed_wait(&impl_repp->flush_mutex_rep, &impl_repp->flush_cond_rep, 5000);
341
timed_wait_print_result(res, "wait_for_flush", true);
345
ECA_LOG_MSG(ECA_LOGGER::system_objects, "wait_for_flush failed; not running");
350
* Sends a signal notifying that server buffers
353
* Called by db server.
355
void AUDIO_IO_DB_SERVER::signal_full(void)
357
pthread_mutex_lock(&impl_repp->full_mutex_rep);
358
pthread_cond_broadcast(&impl_repp->full_cond_rep);
359
pthread_mutex_unlock(&impl_repp->full_mutex_rep);
363
* Sends a signal notifying that server has
366
* Called by db server.
368
void AUDIO_IO_DB_SERVER::signal_stop(void)
370
pthread_mutex_lock(&impl_repp->stop_mutex_rep);
371
pthread_cond_broadcast(&impl_repp->stop_cond_rep);
372
pthread_mutex_unlock(&impl_repp->stop_mutex_rep);
376
* Sends a signal notifying that server has
377
* flushed all buffers (after an exit request).
379
* Called by db server.
381
void AUDIO_IO_DB_SERVER::signal_flush(void)
383
pthread_mutex_lock(&impl_repp->flush_mutex_rep);
384
pthread_cond_broadcast(&impl_repp->flush_cond_rep);
385
pthread_mutex_unlock(&impl_repp->flush_mutex_rep);
390
* Sets new default values for sample buffers.
392
* @pre is_running() != true
394
void AUDIO_IO_DB_SERVER::set_buffer_defaults(int buffers,
398
DBC_REQUIRE(is_running() != true);
401
buffercount_rep = buffers;
402
buffersize_rep = buffersize;
406
* Registers a new client object.
409
* @pre is_running() != true
411
void AUDIO_IO_DB_SERVER::register_client(AUDIO_IO* aobject)
414
DBC_REQUIRE(aobject != 0);
415
DBC_REQUIRE(is_running() != true);
418
clients_rep.push_back(aobject);
419
ECA_LOG_MSG(ECA_LOGGER::system_objects,
420
"(audioio-db-server) Registering client " +
421
kvu_numtostr(clients_rep.size() - 1) +
423
kvu_numtostr(buffercount_rep) + ".");
424
buffers_rep.push_back(new AUDIO_IO_DB_BUFFER(buffercount_rep,
426
aobject->channels()));
427
client_map_rep[aobject] = clients_rep.size() - 1;
431
* Unregisters the client object given as the argument. No
432
* resources are freed during this call.
435
void AUDIO_IO_DB_SERVER::unregister_client(AUDIO_IO* aobject)
438
DBC_REQUIRE(is_running() != true);
441
ECA_LOG_MSG(ECA_LOGGER::system_objects, "unregister_client " + aobject->name() + ".");
442
if (client_map_rep.find(aobject) != client_map_rep.end()) {
443
size_t index = client_map_rep[aobject];
444
if (index >= 0 && index < clients_rep.size()) {
445
clients_rep[index] = 0;
446
delete buffers_rep[index];
447
buffers_rep[index] = 0;
450
ECA_LOG_MSG(ECA_LOGGER::system_objects, "unregister_client failed (1)");
453
ECA_LOG_MSG(ECA_LOGGER::system_objects, "unregister_client failed (2)");
458
* Gets a pointer to the client buffer belonging to
459
* the audio object given as parameter. If no
460
* buffers are found (client not registered, etc),
463
AUDIO_IO_DB_BUFFER* AUDIO_IO_DB_SERVER::get_client_buffer(AUDIO_IO* aobject)
465
if (client_map_rep.find(aobject) == client_map_rep.end() ||
466
clients_rep[client_map_rep[aobject]] == 0)
469
return buffers_rep[client_map_rep[aobject]];
475
void AUDIO_IO_DB_SERVER::io_thread(void)
477
ECA_LOG_MSG(ECA_LOGGER::system_objects, "(audio_io_db_server) Hey, in the I/O loop!");
480
int passive_rounds = 0;
481
DB_PROFILING_STATEMENT(bool one_time_full = false);
483
/* set idle timeout to ~10% of total buffersize (using 44.1Hz as a reference) */
484
long int sleeplen = buffersize_rep * buffercount_rep * 1000 / 44100 / 10 * 1000000;
486
ECA_LOG_MSG(ECA_LOGGER::system_objects,
487
"(audio_io_db_server) Using idle timeout of " +
488
kvu_numtostr(sleeplen) +
492
if (running_rep.get() == 0) {
493
kvu_sleep(0, sleeplen);
494
if (exit_request_rep.get() == 1) break;
498
DB_PROFILING_INC(impl_repp->profile_rounds_total_rep);
502
int min_free_space = buffercount_rep;
504
DB_PROFILING_STATEMENT(impl_repp->looptimer_rep.start());
506
for(unsigned int p = 0; p < clients_rep.size(); p++) {
507
if (clients_rep[p] == 0 ||
508
buffers_rep[p]->finished_rep.get()) continue;
512
if (buffers_rep[p]->io_mode_rep == AUDIO_IO::io_read) {
513
free_space = buffers_rep[p]->write_space();
514
if (free_space > 0) {
515
/* room available, so we can read at least one buffer of data */
517
if (clients_rep[p]->finished() != true) {
518
clients_rep[p]->read_buffer(buffers_rep[p]->sbufs_rep[buffers_rep[p]->writeptr_rep.get()]);
519
if (clients_rep[p]->finished() == true) buffers_rep[p]->finished_rep.set(1);
520
buffers_rep[p]->advance_write_pointer();
525
if (buffers_rep[p]->write_space() > 16 && one_time_full == true) {
526
DB_PROFILING_INC(impl_repp->profile_read_xrun_danger_rep);
533
free_space = buffers_rep[p]->read_space();
534
if (free_space > 0) {
535
/* room available, so we can write at least one buffer of data */
537
if (clients_rep[p]->finished() != true) {
538
clients_rep[p]->write_buffer(buffers_rep[p]->sbufs_rep[buffers_rep[p]->readptr_rep.get()]);
539
if (clients_rep[p]->finished() == true) buffers_rep[p]->finished_rep.set(1);
540
buffers_rep[p]->advance_read_pointer();
545
if (buffers_rep[p]->read_space() < 16 && one_time_full == true) {
546
DB_PROFILING_INC(impl_repp->profile_write_xrun_danger_rep);
552
if (free_space < min_free_space) min_free_space = free_space;
555
DB_PROFILING_STATEMENT(impl_repp->looptimer_rep.stop());
557
if (stop_request_rep.get() == 1) {
558
stop_request_rep.set(0);
564
if (processed == 0) passive_rounds++;
565
else passive_rounds = 0;
567
if (processed == 0) {
568
if (passive_rounds > 1) {
569
/* case 1: nothing processed during the last two rounds ==> signal_full, wait_for_client_activity */
570
DB_PROFILING_INC(impl_repp->profile_full_rep);
572
DB_PROFILING_STATEMENT(if (one_time_full != true) one_time_full = true);
574
DBC_CHECK(running_rep.get() == 1);
577
/* case 2: nothing processed during the last round ==> wait_for_client_activity */
578
DB_PROFILING_INC(impl_repp->profile_no_processing_rep);
581
wait_for_client_activity();
584
/* case 3: something processed; business as usual */
585
DB_PROFILING_INC(impl_repp->profile_processing_rep);
588
/* case X: something processed; room in the buffers ==> data available */
589
// DB_PROFILING_INC(impl_repp->profile_not_full_anymore_rep);
594
// std::cerr << "Exiting db server thread." << std::endl;
597
void AUDIO_IO_DB_SERVER::dump_profile_counters(void)
599
std::cerr << "(audioio-db-server) *** profile begin ***" << std::endl;
600
std::cerr << "Profile_full_rep: " << impl_repp->profile_full_rep << std::endl;
601
std::cerr << "Profile_no_processing_rep: " << impl_repp->profile_no_processing_rep << std::endl;
602
std::cerr << "Profile_not_full_anymore_rep: " << impl_repp->profile_not_full_anymore_rep << std::endl;
603
std::cerr << "Profile_processing_rep: " << impl_repp->profile_processing_rep << std::endl;
604
std::cerr << "Profile_read_xrun_danger_rep: " << impl_repp->profile_read_xrun_danger_rep << std::endl;
605
std::cerr << "Profile_write_xrun_danger_rep: " << impl_repp->profile_write_xrun_danger_rep << std::endl;
606
std::cerr << "Profile_rounds_total_rep: " << impl_repp->profile_rounds_total_rep << std::endl;
607
std::cerr << "Fastest/slowest/average loop time: ";
608
std::cerr << kvu_numtostr(impl_repp->looptimer_rep.min_duration_seconds() * 1000, 1);
610
std::cerr << kvu_numtostr(impl_repp->looptimer_rep.max_duration_seconds() * 1000, 1);
612
std::cerr << kvu_numtostr(impl_repp->looptimer_rep.average_duration_seconds() * 1000, 1);
613
std::cerr << " msec." << std::endl;
614
std::cerr << "(audioio-db-server) *** profile end ***" << std::endl;
618
* Flushes all data in the buffers to disk.
620
void AUDIO_IO_DB_SERVER::flush(void)
622
int not_finished = 1;
623
while(not_finished != 0) {
625
for(unsigned int p = 0; p < clients_rep.size(); p++) {
626
if (clients_rep[p] == 0 ||
627
buffers_rep[p]->finished_rep.get()) continue;
628
if (buffers_rep[p]->io_mode_rep != AUDIO_IO::io_read) {
629
if (buffers_rep[p]->read_space() > 0) {
632
ECA_LOG_MSG(ECA_LOGGER::info,
633
std::string("(audio_io_db_server) ") +
635
kvu_numtostr(buffers_rep[p]->readptr_rep.get()) +
639
kvu_numtostr(buffers_rep[p]->read_space()) +
642
clients_rep[p]->write_buffer(buffers_rep[p]->sbufs_rep[buffers_rep[p]->readptr_rep.get()]);
643
if (clients_rep[p]->finished() == true) buffers_rep[p]->finished_rep.set(1);
644
buffers_rep[p]->advance_read_pointer();
649
for(unsigned int p = 0; p < buffers_rep.size(); p++) {
650
if (buffers_rep[p] != 0) {
651
buffers_rep[p]->reset();