44
42
PA_STATIC_TLS_DECLARE_NO_FREE(thread_mq);
46
static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) {
44
static void asyncmsgq_read_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) {
47
45
pa_thread_mq *q = userdata;
50
pa_assert(pa_asyncmsgq_get_fd(q->outq) == fd);
48
pa_assert(pa_asyncmsgq_read_fd(q->outq) == fd);
51
49
pa_assert(events == PA_IO_EVENT_INPUT);
53
51
pa_asyncmsgq_ref(aq = q->outq);
54
pa_asyncmsgq_after_poll(aq);
52
pa_asyncmsgq_write_after_poll(aq);
57
55
pa_msgobject *object;
68
66
pa_asyncmsgq_done(aq, ret);
71
if (pa_asyncmsgq_before_poll(aq) == 0)
69
if (pa_asyncmsgq_read_before_poll(aq) == 0)
75
73
pa_asyncmsgq_unref(aq);
78
void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop) {
76
static void asyncmsgq_write_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) {
77
pa_thread_mq *q = userdata;
79
pa_assert(pa_asyncmsgq_write_fd(q->inq) == fd);
80
pa_assert(events == PA_IO_EVENT_INPUT);
82
pa_asyncmsgq_write_after_poll(q->inq);
83
pa_asyncmsgq_write_before_poll(q->inq);
86
void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop, pa_rtpoll *rtpoll) {
80
88
pa_assert(mainloop);
83
91
pa_assert_se(q->inq = pa_asyncmsgq_new(0));
84
92
pa_assert_se(q->outq = pa_asyncmsgq_new(0));
86
pa_assert_se(pa_asyncmsgq_before_poll(q->outq) == 0);
87
pa_assert_se(q->io_event = mainloop->io_new(mainloop, pa_asyncmsgq_get_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_cb, q));
94
pa_assert_se(pa_asyncmsgq_read_before_poll(q->outq) == 0);
95
pa_assert_se(q->read_event = mainloop->io_new(mainloop, pa_asyncmsgq_read_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_read_cb, q));
97
pa_asyncmsgq_write_before_poll(q->inq);
98
pa_assert_se(q->write_event = mainloop->io_new(mainloop, pa_asyncmsgq_write_fd(q->inq), PA_IO_EVENT_INPUT, asyncmsgq_write_cb, q));
100
pa_rtpoll_item_new_asyncmsgq_read(rtpoll, PA_RTPOLL_EARLY, q->inq);
101
pa_rtpoll_item_new_asyncmsgq_write(rtpoll, PA_RTPOLL_LATE, q->outq);
90
104
void pa_thread_mq_done(pa_thread_mq *q) {
93
q->mainloop->io_free(q->io_event);
107
q->mainloop->io_free(q->read_event);
108
q->mainloop->io_free(q->write_event);
109
q->read_event = q->write_event = NULL;
96
111
pa_asyncmsgq_unref(q->inq);
97
112
pa_asyncmsgq_unref(q->outq);