1
/* $Id: event.c 3905 2011-12-09 05:15:39Z ming $ */
3
* Copyright (C) 2011-2011 Teluu Inc. (http://www.teluu.com)
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
#include <pjmedia/event.h>
20
#include <pjmedia/errno.h>
21
#include <pj/assert.h>
26
#include <pj/string.h>
28
#define THIS_FILE "event.c"
32
typedef struct esub esub;
36
PJ_DECL_LIST_MEMBER(esub);
43
typedef struct event_queue
45
pjmedia_event events[MAX_EVENTS]; /**< array of events. */
50
struct pjmedia_event_mgr
53
pj_thread_t *thread; /**< worker thread. */
54
pj_bool_t is_quitting;
58
event_queue *pub_ev_queue; /**< publish() event queue. */
59
esub esub_list; /**< list of subscribers. */
60
esub free_esub_list; /**< list of subscribers. */
61
esub *th_next_sub, /**< worker thread's next sub. */
62
*pub_next_sub; /**< publish() next sub. */
65
static pjmedia_event_mgr *event_manager_instance;
67
static pj_status_t event_queue_add_event(event_queue* ev_queue,
70
if (ev_queue->is_full) {
73
/* This event will be ignored. */
74
PJ_LOG(4, (THIS_FILE, "Lost event %s from publisher [0x%p] "
76
pjmedia_fourcc_name(event->type, ev_name),
82
pj_memcpy(&ev_queue->events[ev_queue->tail], event, sizeof(*event));
83
ev_queue->tail = (ev_queue->tail + 1) % MAX_EVENTS;
84
if (ev_queue->tail == ev_queue->head)
85
ev_queue->is_full = PJ_TRUE;
90
static pj_status_t event_mgr_distribute_events(pjmedia_event_mgr *mgr,
91
event_queue *ev_queue,
95
pj_status_t err = PJ_SUCCESS;
96
esub * sub = mgr->esub_list.next;
97
pjmedia_event *ev = &ev_queue->events[ev_queue->head];
99
while (sub != &mgr->esub_list) {
100
*next_sub = sub->next;
102
/* Check if the subscriber is interested in
103
* receiving the event from the publisher.
105
if (sub->epub == ev->epub || !sub->epub) {
106
pjmedia_event_cb *cb = sub->cb;
107
void *user_data = sub->user_data;
111
pj_mutex_unlock(mgr->mutex);
113
status = (*cb)(ev, user_data);
114
if (status != PJ_SUCCESS && err == PJ_SUCCESS)
118
pj_mutex_lock(mgr->mutex);
124
ev_queue->head = (ev_queue->head + 1) % MAX_EVENTS;
125
ev_queue->is_full = PJ_FALSE;
130
/* Event worker thread function. */
131
static int event_worker_thread(void *arg)
133
pjmedia_event_mgr *mgr = (pjmedia_event_mgr *)arg;
136
/* Wait until there is an event. */
137
pj_sem_wait(mgr->sem);
139
if (mgr->is_quitting)
142
pj_mutex_lock(mgr->mutex);
143
event_mgr_distribute_events(mgr, &mgr->ev_queue,
144
&mgr->th_next_sub, PJ_TRUE);
145
pj_mutex_unlock(mgr->mutex);
151
PJ_DEF(pj_status_t) pjmedia_event_mgr_create(pj_pool_t *pool,
153
pjmedia_event_mgr **p_mgr)
155
pjmedia_event_mgr *mgr;
158
mgr = PJ_POOL_ZALLOC_T(pool, pjmedia_event_mgr);
159
mgr->pool = pj_pool_create(pool->factory, "evt mgr", 500, 500, NULL);
160
pj_list_init(&mgr->esub_list);
161
pj_list_init(&mgr->free_esub_list);
163
if (!(options & PJMEDIA_EVENT_MGR_NO_THREAD)) {
164
status = pj_sem_create(mgr->pool, "ev_sem", 0, MAX_EVENTS + 1,
166
if (status != PJ_SUCCESS)
169
status = pj_thread_create(mgr->pool, "ev_thread",
170
&event_worker_thread,
171
mgr, 0, 0, &mgr->thread);
172
if (status != PJ_SUCCESS) {
173
pjmedia_event_mgr_destroy(mgr);
178
status = pj_mutex_create_recursive(mgr->pool, "ev_mutex", &mgr->mutex);
179
if (status != PJ_SUCCESS) {
180
pjmedia_event_mgr_destroy(mgr);
184
if (!event_manager_instance)
185
event_manager_instance = mgr;
193
PJ_DEF(pjmedia_event_mgr*) pjmedia_event_mgr_instance(void)
195
return event_manager_instance;
198
PJ_DEF(void) pjmedia_event_mgr_set_instance(pjmedia_event_mgr *mgr)
200
event_manager_instance = mgr;
203
PJ_DEF(void) pjmedia_event_mgr_destroy(pjmedia_event_mgr *mgr)
205
if (!mgr) mgr = pjmedia_event_mgr_instance();
206
PJ_ASSERT_ON_FAIL(mgr != NULL, return);
209
mgr->is_quitting = PJ_TRUE;
210
pj_sem_post(mgr->sem);
211
pj_thread_join(mgr->thread);
215
pj_sem_destroy(mgr->sem);
220
pj_mutex_destroy(mgr->mutex);
225
pj_pool_release(mgr->pool);
227
if (event_manager_instance == mgr)
228
event_manager_instance = NULL;
231
PJ_DEF(void) pjmedia_event_init( pjmedia_event *event,
232
pjmedia_event_type type,
233
const pj_timestamp *ts,
236
pj_bzero(event, sizeof(*event));
239
event->timestamp.u64 = ts->u64;
240
event->epub = event->src = src;
243
PJ_DEF(pj_status_t) pjmedia_event_subscribe( pjmedia_event_mgr *mgr,
244
pjmedia_event_cb *cb,
250
PJ_ASSERT_RETURN(cb, PJ_EINVAL);
252
if (!mgr) mgr = pjmedia_event_mgr_instance();
253
PJ_ASSERT_RETURN(mgr, PJ_EINVAL);
255
pj_mutex_lock(mgr->mutex);
256
/* Check whether callback function with the same user data is already
257
* subscribed to the publisher. This is to prevent the callback function
258
* receiving the same event from the same publisher more than once.
260
sub = mgr->esub_list.next;
261
while (sub != &mgr->esub_list) {
262
esub *next = sub->next;
263
if (sub->cb == cb && sub->user_data == user_data &&
266
pj_mutex_unlock(mgr->mutex);
272
if (mgr->free_esub_list.next != &mgr->free_esub_list) {
273
sub = mgr->free_esub_list.next;
276
sub = PJ_POOL_ZALLOC_T(mgr->pool, esub);
278
sub->user_data = user_data;
280
pj_list_push_back(&mgr->esub_list, sub);
281
pj_mutex_unlock(mgr->mutex);
287
pjmedia_event_unsubscribe(pjmedia_event_mgr *mgr,
288
pjmedia_event_cb *cb,
294
PJ_ASSERT_RETURN(cb, PJ_EINVAL);
296
if (!mgr) mgr = pjmedia_event_mgr_instance();
297
PJ_ASSERT_RETURN(mgr, PJ_EINVAL);
299
pj_mutex_lock(mgr->mutex);
300
sub = mgr->esub_list.next;
301
while (sub != &mgr->esub_list) {
302
esub *next = sub->next;
303
if (sub->cb == cb && (sub->user_data == user_data || !user_data) &&
304
(sub->epub == epub || !epub))
306
/* If the worker thread or pjmedia_event_publish() API is
307
* in the process of distributing events, make sure that
308
* its pointer to the next subscriber stays valid.
310
if (mgr->th_next_sub == sub)
311
mgr->th_next_sub = sub->next;
312
if (mgr->pub_next_sub == sub)
313
mgr->pub_next_sub = sub->next;
315
pj_list_push_back(&mgr->free_esub_list, sub);
316
if (user_data && epub)
321
pj_mutex_unlock(mgr->mutex);
326
PJ_DEF(pj_status_t) pjmedia_event_publish( pjmedia_event_mgr *mgr,
328
pjmedia_event *event,
329
pjmedia_event_publish_flag flag)
331
pj_status_t err = PJ_SUCCESS;
333
PJ_ASSERT_RETURN(epub && event, PJ_EINVAL);
335
if (!mgr) mgr = pjmedia_event_mgr_instance();
336
PJ_ASSERT_RETURN(mgr, PJ_EINVAL);
340
pj_mutex_lock(mgr->mutex);
341
if (flag & PJMEDIA_EVENT_PUBLISH_POST_EVENT) {
342
if (event_queue_add_event(&mgr->ev_queue, event) == PJ_SUCCESS)
343
pj_sem_post(mgr->sem);
345
/* For nested pjmedia_event_publish() calls, i.e. calling publish()
346
* inside the subscriber's callback, the function will only add
347
* the event to the event queue of the first publish() call. It
348
* is the first publish() call that will be responsible to
349
* distribute the events.
351
if (mgr->pub_ev_queue) {
352
event_queue_add_event(mgr->pub_ev_queue, event);
354
static event_queue ev_queue;
357
ev_queue.head = ev_queue.tail = 0;
358
ev_queue.is_full = PJ_FALSE;
359
mgr->pub_ev_queue = &ev_queue;
361
event_queue_add_event(mgr->pub_ev_queue, event);
364
status = event_mgr_distribute_events(mgr, mgr->pub_ev_queue,
367
if (status != PJ_SUCCESS && err == PJ_SUCCESS)
369
} while(ev_queue.head != ev_queue.tail || ev_queue.is_full);
371
mgr->pub_ev_queue = NULL;
374
pj_mutex_unlock(mgr->mutex);