2
mediastreamer2 library - modular sound and video processing and streaming
3
Copyright (C) 2006 Simon MORLAT (simon.morlat@linphone.org)
5
This program is free software; you can redistribute it and/or
6
modify it under the terms of the GNU General Public License
7
as published by the Free Software Foundation; either version 2
8
of the License, or (at your option) any later version.
10
This program is distributed in the hope that it will be useful,
11
but WITHOUT ANY WARRANTY; without even the implied warranty of
12
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
GNU General Public License for more details.
15
You should have received a copy of the GNU General Public License
16
along with this program; if not, write to the Free Software
17
Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20
#include "mediastreamer2/msticker.h"
24
#include <sys/resource.h>
27
static const double smooth_coef=0.9;
29
#ifndef TICKER_MEASUREMENTS
31
#define TICKER_MEASUREMENTS 1
33
#if defined(__ARM_ARCH__)
35
/* as MSTicker load computation requires floating point, we prefer to disable it on ARM processors without FPU*/
36
# undef TICKER_MEASUREMENTS
37
# define TICKER_MEASUREMENTS 0
43
#define TICKER_INTERVAL 10
45
static void * ms_ticker_run(void *s);
46
static uint64_t get_cur_time_ms(void *);
47
static int wait_next_tick(void *, uint64_t virt_ticker_time);
48
static void remove_tasks_for_filter(MSTicker *ticker, MSFilter *f);
50
static void ms_ticker_start(MSTicker *s){
52
ms_thread_create(&s->thread,NULL,ms_ticker_run,s);
55
static void ms_ticker_init(MSTicker *ticker, const MSTickerParams *params)
57
ms_mutex_init(&ticker->lock,NULL);
58
ticker->execution_list=NULL;
59
ticker->task_list=NULL;
62
ticker->interval=TICKER_INTERVAL;
65
ticker->get_cur_time_ptr=&get_cur_time_ms;
66
ticker->get_cur_time_data=NULL;
67
ticker->name=ms_strdup(params->name);
69
ticker->prio=params->prio;
70
ticker->wait_next_tick=wait_next_tick;
71
ticker->wait_next_tick_data=ticker;
72
ms_ticker_start(ticker);
75
MSTicker *ms_ticker_new(){
76
MSTickerParams params;
77
params.name="MSTicker";
78
params.prio=MS_TICKER_PRIO_NORMAL;
79
return ms_ticker_new_with_params(¶ms);
82
MSTicker *ms_ticker_new_with_params(const MSTickerParams *params){
83
MSTicker *obj=(MSTicker *)ms_new(MSTicker,1);
84
ms_ticker_init(obj,params);
88
static void ms_ticker_stop(MSTicker *s){
89
ms_mutex_lock(&s->lock);
91
ms_mutex_unlock(&s->lock);
93
ms_thread_join(s->thread,NULL);
96
void ms_ticker_set_name(MSTicker *s, const char *name){
97
if (s->name) ms_free(s->name);
98
s->name=ms_strdup(name);
101
void ms_ticker_set_priority(MSTicker *ticker, MSTickerPrio prio){
105
static void ms_ticker_uninit(MSTicker *ticker)
107
ms_ticker_stop(ticker);
108
ms_free(ticker->name);
109
ms_mutex_destroy(&ticker->lock);
112
void ms_ticker_destroy(MSTicker *ticker){
113
ms_ticker_uninit(ticker);
118
static MSList *get_sources(MSList *filters){
119
MSList *sources=NULL;
121
for(;filters!=NULL;filters=filters->next){
122
f=(MSFilter*)filters->data;
123
if (f->desc->ninputs==0){
124
sources=ms_list_append(sources,f);
130
int ms_ticker_attach(MSTicker *ticker, MSFilter *f){
131
return ms_ticker_attach_multiple(ticker,f,NULL);
134
int ms_ticker_attach_multiple(MSTicker *ticker,MSFilter *f,...)
136
MSList *sources=NULL;
137
MSList *filters=NULL;
139
MSList *total_sources=NULL;
145
if (f->ticker==NULL) {
146
filters=ms_filter_find_neighbours(f);
147
sources=get_sources(filters);
149
ms_fatal("No sources found around filter %s",f->desc->name);
150
ms_list_free(filters);
153
/*run preprocess on each filter: */
154
for(it=filters;it!=NULL;it=it->next)
155
ms_filter_preprocess((MSFilter*)it->data,ticker);
156
ms_list_free(filters);
157
total_sources=ms_list_concat(total_sources,sources);
158
}else ms_message("Filter %s is already being scheduled; nothing to do.",f->desc->name);
159
}while ((f=va_arg(l,MSFilter*))!=NULL);
162
ms_mutex_lock(&ticker->lock);
163
ticker->execution_list=ms_list_concat(ticker->execution_list,total_sources);
164
ms_mutex_unlock(&ticker->lock);
169
static void call_postprocess(MSFilter *f){
170
if (f->postponed_task) remove_tasks_for_filter(f->ticker,f);
171
ms_filter_postprocess(f);
174
int ms_ticker_detach(MSTicker *ticker,MSFilter *f){
175
MSList *sources=NULL;
176
MSList *filters=NULL;
179
if (f->ticker==NULL) {
180
ms_message("Filter %s is not scheduled; nothing to do.",f->desc->name);
184
ms_mutex_lock(&ticker->lock);
186
filters=ms_filter_find_neighbours(f);
187
sources=get_sources(filters);
189
ms_fatal("No sources found around filter %s",f->desc->name);
190
ms_list_free(filters);
191
ms_mutex_unlock(&ticker->lock);
195
for(it=sources;it!=NULL;it=ms_list_next(it)){
196
ticker->execution_list=ms_list_remove(ticker->execution_list,it->data);
198
ms_mutex_unlock(&ticker->lock);
199
ms_list_for_each(filters,(void (*)(void*))call_postprocess);
200
ms_list_free(filters);
201
ms_list_free(sources);
206
static bool_t filter_can_process(MSFilter *f, int tick){
207
/* look if filters before this one have run */
210
for(i=0;i<f->desc->ninputs;i++){
213
if (l->prev.filter->last_tick!=tick) return FALSE;
219
static void call_process(MSFilter *f){
220
bool_t process_done=FALSE;
221
if (f->desc->ninputs==0 || f->desc->flags & MS_FILTER_IS_PUMP){
222
ms_filter_process(f);
224
while (ms_filter_inputs_have_data(f)) {
226
ms_warning("Re-scheduling filter %s: all data should be consumed in one process call, so fix it.",f->desc->name);
228
ms_filter_process(f);
229
if (f->postponed_task) break;
235
static void run_graph(MSFilter *f, MSTicker *s, MSList **unschedulable, bool_t force_schedule){
238
if (f->last_tick!=s->ticks ){
239
if (filter_can_process(f,s->ticks) || force_schedule) {
240
/* this is a candidate */
241
f->last_tick=s->ticks;
243
/* now recurse to next filters */
244
for(i=0;i<f->desc->noutputs;i++){
247
run_graph(l->next.filter,s,unschedulable, force_schedule);
251
/* this filter has not all inputs that have been filled by filters before it. */
252
*unschedulable=ms_list_prepend(*unschedulable,f);
257
static void run_graphs(MSTicker *s, MSList *execution_list, bool_t force_schedule){
259
MSList *unschedulable=NULL;
260
for(it=execution_list;it!=NULL;it=it->next){
261
run_graph((MSFilter*)it->data,s,&unschedulable,force_schedule);
263
/* filters that are part of a loop haven't been called in process() because one of their input refers to a filter that could not be scheduled (because they could not be scheduled themselves)... Do you understand ?*/
264
/* we resolve this by simply assuming that they must be called anyway
265
for the loop to run correctly*/
266
/* we just recall run_graphs on them, as if they were source filters */
267
if (unschedulable!=NULL) {
268
run_graphs(s,unschedulable,TRUE);
269
ms_list_free(unschedulable);
273
static void run_tasks(MSTicker *ticker){
274
MSList *elem,*prevelem=NULL;
275
for (elem=ticker->task_list;elem!=NULL;){
276
MSFilterTask *t=(MSFilterTask*)elem->data;
277
ms_filter_task_process(t);
283
ticker->task_list=NULL;
286
static void remove_tasks_for_filter(MSTicker *ticker, MSFilter *f){
287
MSList *elem,*nextelem;
288
for (elem=ticker->task_list;elem!=NULL;elem=nextelem){
289
MSFilterTask *t=(MSFilterTask*)elem->data;
292
ticker->task_list=ms_list_remove_link(ticker->task_list,elem);
298
static uint64_t get_cur_time_ms(void *unused){
300
ms_get_cur_time(&ts);
301
return (ts.tv_sec*1000LL) + ((ts.tv_nsec+500000LL)/1000000LL);
304
static void sleepMs(int ms){
310
ts.tv_nsec=ms*1000000LL;
315
static int set_high_prio(MSTicker *obj){
319
if (prio>MS_TICKER_PRIO_NORMAL){
323
mm=timeGetDevCaps(&ptc,sizeof(ptc));
325
if (ptc.wPeriodMin<(UINT)precision)
326
ptc.wPeriodMin=precision;
328
precision = ptc.wPeriodMin;
329
mm=timeBeginPeriod(ptc.wPeriodMin);
330
if (mm!=TIMERR_NOERROR){
331
ms_warning("timeBeginPeriod failed.");
333
ms_message("win32 timer resolution set to %i ms",ptc.wPeriodMin);
335
ms_warning("timeGetDevCaps failed.");
338
if(!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST)){
339
ms_warning("SetThreadPriority() failed (%d)\n", (int)GetLastError());
342
struct sched_param param;
344
memset(¶m,0,sizeof(param));
346
char* env_prio_c=NULL;
347
int min_prio, max_prio, env_prio;
349
if (prio==MS_TICKER_PRIO_REALTIME)
352
min_prio = sched_get_priority_min(policy);
353
max_prio = sched_get_priority_max(policy);
354
env_prio_c = getenv("MS_TICKER_SCHEDPRIO");
356
env_prio = (env_prio_c == NULL)?max_prio:atoi(env_prio_c);
358
env_prio = MAX(MIN(env_prio, max_prio), min_prio);
359
ms_message("Priority used: %d", env_prio);
361
param.sched_priority=env_prio;
362
if((result=pthread_setschedparam(pthread_self(),policy, ¶m))) {
366
sched_get_priority_max(SCHED_OTHER)=sched_get_priority_max(SCHED_OTHER)=0.
367
As long as we can't use SCHED_RR or SCHED_FIFO, the only way to increase priority of a calling thread
368
is to use setpriority().
370
if (setpriority(PRIO_PROCESS,0,-20)==-1){
371
ms_message("%s setpriority() failed: %s, nevermind.",obj->name,strerror(errno));
373
ms_message("%s priority increased to maximum.",obj->name);
375
}else ms_warning("%s: Set pthread_setschedparam failed: %s",obj->name,strerror(result));
377
ms_message("%s priority set to %s and value (%i)",obj->name,
378
policy==SCHED_FIFO ? "SCHED_FIFO" : "SCHED_RR", param.sched_priority);
381
}else ms_message("%s priority left to normal.",obj->name);
385
static void unset_high_prio(int precision){
387
if(!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_NORMAL)){
388
ms_warning("SetThreadPriority() failed (%d)\n", (int)GetLastError());
390
timeEndPeriod(precision);
394
static int wait_next_tick(void *data, uint64_t virt_ticker_time){
395
MSTicker *s=(MSTicker*)data;
401
realtime=s->get_cur_time_ptr(s->get_cur_time_data)-s->orig;
402
diff=s->time-realtime;
404
/* sleep until next tick */
408
break; /*exit the while loop */
414
/*the ticker thread function that executes the filters */
415
void * ms_ticker_run(void *arg)
417
MSTicker *s=(MSTicker*)arg;
422
precision = set_high_prio(s);
425
s->orig=s->get_cur_time_ptr(s->get_cur_time_data);
427
ms_mutex_lock(&s->lock);
431
/*Step 1: run the graphs*/
433
#if TICKER_MEASUREMENTS
434
MSTimeSpec begin,end;/*used to measure time spent in processing one tick*/
437
ms_get_cur_time(&begin);
440
run_graphs(s,s->execution_list,FALSE);
441
#if TICKER_MEASUREMENTS
442
ms_get_cur_time(&end);
443
iload=100*((end.tv_sec-begin.tv_sec)*1000.0 + (end.tv_nsec-begin.tv_nsec)/1000000.0)/(double)s->interval;
444
s->av_load=(smooth_coef*s->av_load)+((1.0-smooth_coef)*iload);
447
ms_mutex_unlock(&s->lock);
448
/*Step 2: wait for next tick*/
449
s->time+=s->interval;
450
late=s->wait_next_tick(s->wait_next_tick_data,s->time);
451
if (late>s->interval*5 && late>lastlate){
452
ms_warning("%s: We are late of %d miliseconds.",s->name,late);
455
ms_mutex_lock(&s->lock);
457
ms_mutex_unlock(&s->lock);
458
unset_high_prio(precision);
459
ms_message("%s thread exiting",s->name);
461
ms_thread_exit(NULL);
465
void ms_ticker_set_time_func(MSTicker *ticker, MSTickerTimeFunc func, void *user_data){
466
if (func==NULL) func=get_cur_time_ms;
468
ticker->get_cur_time_ptr=func;
469
ticker->get_cur_time_data=user_data;
470
/*re-set the origin to take in account that previous function ptr and the
471
new one may return different times*/
472
ticker->orig=func(user_data)-ticker->time;
474
ms_message("ms_ticker_set_time_func: ticker's time method updated.");
477
void ms_ticker_set_tick_func(MSTicker *ticker, MSTickerTickFunc func, void *user_data){
482
ticker->wait_next_tick=func;
483
ticker->wait_next_tick_data=user_data;
484
/*re-set the origin to take in account that previous function ptr and the
485
new one may return different times*/
486
ticker->orig=ticker->get_cur_time_ptr(user_data)-ticker->time;
487
ms_message("ms_ticker_set_tick_func: ticker's tick method updated.");
490
static void print_graph(MSFilter *f, MSTicker *s, MSList **unschedulable, bool_t force_schedule){
493
if (f->last_tick!=s->ticks ){
494
if (filter_can_process(f,s->ticks) || force_schedule) {
495
/* this is a candidate */
496
f->last_tick=s->ticks;
497
ms_message("print_graphs: %s", f->desc->name);
498
/* now recurse to next filters */
499
for(i=0;i<f->desc->noutputs;i++){
502
print_graph(l->next.filter,s,unschedulable, force_schedule);
506
/* this filter has not all inputs that have been filled by filters before it. */
507
*unschedulable=ms_list_prepend(*unschedulable,f);
512
static void print_graphs(MSTicker *s, MSList *execution_list, bool_t force_schedule){
514
MSList *unschedulable=NULL;
515
for(it=execution_list;it!=NULL;it=it->next){
516
print_graph((MSFilter*)it->data,s,&unschedulable,force_schedule);
518
/* filters that are part of a loop haven't been called in process() because one of their input refers to a filter that could not be scheduled (because they could not be scheduled themselves)... Do you understand ?*/
519
/* we resolve this by simply assuming that they must be called anyway
520
for the loop to run correctly*/
521
/* we just recall run_graphs on them, as if they were source filters */
522
if (unschedulable!=NULL) {
523
print_graphs(s,unschedulable,TRUE);
524
ms_list_free(unschedulable);
528
void ms_ticker_print_graphs(MSTicker *ticker){
529
print_graphs(ticker,ticker->execution_list,FALSE);
532
float ms_ticker_get_average_load(MSTicker *ticker){
533
#if !TICKER_MEASUREMENTS
534
static bool_t once=FALSE;
536
ms_warning("ms_ticker_get_average_load(): ticker load measurements disabled for performance reasons.");
540
return ticker->av_load;
544
static uint64_t get_ms(const MSTimeSpec *ts){
545
return (ts->tv_sec*1000LL) + ((ts->tv_nsec+500000LL)/1000000LL);
548
static uint64_t get_wallclock_ms(void){
550
ms_get_cur_time(&ts);
554
static const double clock_coef = .01;
556
MSTickerSynchronizer* ms_ticker_synchronizer_new(void) {
557
MSTickerSynchronizer *obj=(MSTickerSynchronizer *)ms_new(MSTickerSynchronizer,1);
563
double ms_ticker_synchronizer_set_external_time(MSTickerSynchronizer* ts, const MSTimeSpec *time) {
566
uint64_t wc = get_wallclock_ms();
567
uint64_t ms = get_ms(time);
568
if (ts->offset == 0) {
569
ts->offset = wc - ms;
571
sound_time = ts->offset + ms;
572
diff = wc - sound_time;
573
ts->av_skew = (ts->av_skew * (1.0 - clock_coef)) + ((double) diff * clock_coef);
579
uint64_t ms_ticker_synchronizer_get_corrected_time(MSTickerSynchronizer* ts) {
580
/* round skew to timer resolution in order to avoid adapt the ticker just with statistical "noise" */
581
int64_t rounded_skew=( ((int64_t)ts->av_skew)/(int64_t)TICKER_INTERVAL) * (int64_t)TICKER_INTERVAL;
582
return get_wallclock_ms() - rounded_skew;
585
void ms_ticker_synchronizer_destroy(MSTickerSynchronizer* ts) {