2
This file is part of PulseAudio.
4
Copyright 2004-2006 Lennart Poettering
5
Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7
PulseAudio is free software; you can redistribute it and/or modify
8
it under the terms of the GNU Lesser General Public License as published
9
by the Free Software Foundation; either version 2.1 of the License,
10
or (at your option) any later version.
12
PulseAudio is distributed in the hope that it will be useful, but
13
WITHOUT ANY WARRANTY; without even the implied warranty of
14
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
General Public License for more details.
17
You should have received a copy of the GNU Lesser General Public License
18
along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
29
#include <pulse/def.h>
30
#include <pulse/timeval.h>
31
#include <pulse/rtclock.h>
32
#include <pulse/xmalloc.h>
33
#include <pulse/fork-detect.h>
35
#include <pulsecore/pstream-util.h>
36
#include <pulsecore/log.h>
37
#include <pulsecore/hashmap.h>
38
#include <pulsecore/macro.h>
39
#include <pulsecore/core-rtclock.h>
40
#include <pulsecore/core-util.h>
45
/* #define STREAM_DEBUG */
47
#define AUTO_TIMING_INTERVAL_START_USEC (10*PA_USEC_PER_MSEC)
48
#define AUTO_TIMING_INTERVAL_END_USEC (1500*PA_USEC_PER_MSEC)
50
#define SMOOTHER_ADJUST_TIME (1000*PA_USEC_PER_MSEC)
51
#define SMOOTHER_HISTORY_TIME (5000*PA_USEC_PER_MSEC)
52
#define SMOOTHER_MIN_HISTORY (4)
54
pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
55
return pa_stream_new_with_proplist(c, name, ss, map, NULL);
58
static void reset_callbacks(pa_stream *s) {
59
s->read_callback = NULL;
60
s->read_userdata = NULL;
61
s->write_callback = NULL;
62
s->write_userdata = NULL;
63
s->state_callback = NULL;
64
s->state_userdata = NULL;
65
s->overflow_callback = NULL;
66
s->overflow_userdata = NULL;
67
s->underflow_callback = NULL;
68
s->underflow_userdata = NULL;
69
s->latency_update_callback = NULL;
70
s->latency_update_userdata = NULL;
71
s->moved_callback = NULL;
72
s->moved_userdata = NULL;
73
s->suspended_callback = NULL;
74
s->suspended_userdata = NULL;
75
s->started_callback = NULL;
76
s->started_userdata = NULL;
77
s->event_callback = NULL;
78
s->event_userdata = NULL;
79
s->buffer_attr_callback = NULL;
80
s->buffer_attr_userdata = NULL;
83
static pa_stream *pa_stream_new_with_proplist_internal(
86
const pa_sample_spec *ss,
87
const pa_channel_map *map,
88
pa_format_info * const *formats,
89
unsigned int n_formats,
96
pa_assert(PA_REFCNT_VALUE(c) >= 1);
97
pa_assert((ss == NULL && map == NULL) || (formats == NULL && n_formats == 0));
98
pa_assert(n_formats < PA_MAX_FORMATS);
100
PA_CHECK_VALIDITY_RETURN_NULL(c, !pa_detect_fork(), PA_ERR_FORKED);
101
PA_CHECK_VALIDITY_RETURN_NULL(c, name || (p && pa_proplist_contains(p, PA_PROP_MEDIA_NAME)), PA_ERR_INVALID);
103
s = pa_xnew(pa_stream, 1);
106
s->mainloop = c->mainloop;
108
s->direction = PA_STREAM_NODIRECTION;
109
s->state = PA_STREAM_UNCONNECTED;
113
s->sample_spec = *ss;
115
pa_sample_spec_init(&s->sample_spec);
118
s->channel_map = *map;
120
pa_channel_map_init(&s->channel_map);
124
s->n_formats = n_formats;
125
for (i = 0; i < n_formats; i++)
126
s->req_formats[i] = pa_format_info_copy(formats[i]);
129
/* We'll get the final negotiated format after connecting */
132
s->direct_on_input = PA_INVALID_INDEX;
134
s->proplist = p ? pa_proplist_copy(p) : pa_proplist_new();
136
pa_proplist_sets(s->proplist, PA_PROP_MEDIA_NAME, name);
139
s->channel_valid = false;
140
s->syncid = c->csyncid++;
141
s->stream_index = PA_INVALID_INDEX;
143
s->requested_bytes = 0;
144
memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
146
/* We initialize the target length here, so that if the user
147
* passes no explicit buffering metrics the default is similar to
148
* what older PA versions provided. */
150
s->buffer_attr.maxlength = (uint32_t) -1;
152
s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */
154
/* FIXME: We assume a worst-case compressed format corresponding to
155
* 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */
156
pa_sample_spec tmp_ss = {
157
.format = PA_SAMPLE_S16NE,
161
s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */
163
s->buffer_attr.minreq = (uint32_t) -1;
164
s->buffer_attr.prebuf = (uint32_t) -1;
165
s->buffer_attr.fragsize = (uint32_t) -1;
167
s->device_index = PA_INVALID_INDEX;
168
s->device_name = NULL;
169
s->suspended = false;
172
s->write_memblock = NULL;
173
s->write_data = NULL;
175
pa_memchunk_reset(&s->peek_memchunk);
177
s->record_memblockq = NULL;
179
memset(&s->timing_info, 0, sizeof(s->timing_info));
180
s->timing_info_valid = false;
182
s->previous_time = 0;
183
s->latest_underrun_at_index = -1;
185
s->read_index_not_before = 0;
186
s->write_index_not_before = 0;
187
for (i = 0; i < PA_MAX_WRITE_INDEX_CORRECTIONS; i++)
188
s->write_index_corrections[i].valid = 0;
189
s->current_write_index_correction = 0;
191
s->auto_timing_update_event = NULL;
192
s->auto_timing_update_requested = false;
193
s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
199
/* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */
200
PA_LLIST_PREPEND(pa_stream, c->streams, s);
206
pa_stream *pa_stream_new_with_proplist(
209
const pa_sample_spec *ss,
210
const pa_channel_map *map,
215
PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
216
PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 12 || (ss->format != PA_SAMPLE_S32LE && ss->format != PA_SAMPLE_S32BE), PA_ERR_NOTSUPPORTED);
217
PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24LE && ss->format != PA_SAMPLE_S24BE), PA_ERR_NOTSUPPORTED);
218
PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 15 || (ss->format != PA_SAMPLE_S24_32LE && ss->format != PA_SAMPLE_S24_32BE), PA_ERR_NOTSUPPORTED);
219
PA_CHECK_VALIDITY_RETURN_NULL(c, !map || (pa_channel_map_valid(map) && map->channels == ss->channels), PA_ERR_INVALID);
222
PA_CHECK_VALIDITY_RETURN_NULL(c, map = pa_channel_map_init_auto(&tmap, ss->channels, PA_CHANNEL_MAP_DEFAULT), PA_ERR_INVALID);
224
return pa_stream_new_with_proplist_internal(c, name, ss, map, NULL, 0, p);
227
pa_stream *pa_stream_new_extended(
230
pa_format_info * const *formats,
231
unsigned int n_formats,
234
PA_CHECK_VALIDITY_RETURN_NULL(c, c->version >= 21, PA_ERR_NOTSUPPORTED);
236
return pa_stream_new_with_proplist_internal(c, name, NULL, NULL, formats, n_formats, p);
239
static void stream_unlink(pa_stream *s) {
246
/* Detach from context */
248
/* Unref all operation objects that point to us */
249
for (o = s->context->operations; o; o = n) {
253
pa_operation_cancel(o);
256
/* Drop all outstanding replies for this stream */
257
if (s->context->pdispatch)
258
pa_pdispatch_unregister_reply(s->context->pdispatch, s);
260
if (s->channel_valid) {
261
pa_hashmap_remove((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel));
263
s->channel_valid = false;
266
PA_LLIST_REMOVE(pa_stream, s->context->streams, s);
271
if (s->auto_timing_update_event) {
272
pa_assert(s->mainloop);
273
s->mainloop->time_free(s->auto_timing_update_event);
279
static void stream_free(pa_stream *s) {
286
if (s->write_memblock) {
288
pa_memblock_release(s->write_memblock);
289
pa_memblock_unref(s->write_memblock);
292
if (s->peek_memchunk.memblock) {
294
pa_memblock_release(s->peek_memchunk.memblock);
295
pa_memblock_unref(s->peek_memchunk.memblock);
298
if (s->record_memblockq)
299
pa_memblockq_free(s->record_memblockq);
302
pa_proplist_free(s->proplist);
305
pa_smoother_free(s->smoother);
307
for (i = 0; i < s->n_formats; i++)
308
pa_format_info_free(s->req_formats[i]);
311
pa_format_info_free(s->format);
313
pa_xfree(s->device_name);
317
void pa_stream_unref(pa_stream *s) {
319
pa_assert(PA_REFCNT_VALUE(s) >= 1);
321
if (PA_REFCNT_DEC(s) <= 0)
325
pa_stream* pa_stream_ref(pa_stream *s) {
327
pa_assert(PA_REFCNT_VALUE(s) >= 1);
333
pa_stream_state_t pa_stream_get_state(pa_stream *s) {
335
pa_assert(PA_REFCNT_VALUE(s) >= 1);
340
pa_context* pa_stream_get_context(pa_stream *s) {
342
pa_assert(PA_REFCNT_VALUE(s) >= 1);
347
uint32_t pa_stream_get_index(pa_stream *s) {
349
pa_assert(PA_REFCNT_VALUE(s) >= 1);
351
PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
352
PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
354
return s->stream_index;
357
void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
359
pa_assert(PA_REFCNT_VALUE(s) >= 1);
368
if (s->state_callback)
369
s->state_callback(s, s->state_userdata);
371
if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED))
377
static void request_auto_timing_update(pa_stream *s, bool force) {
379
pa_assert(PA_REFCNT_VALUE(s) >= 1);
381
if (!(s->flags & PA_STREAM_AUTO_TIMING_UPDATE))
384
if (s->state == PA_STREAM_READY &&
385
(force || !s->auto_timing_update_requested)) {
389
pa_log_debug("Automatically requesting new timing data");
392
if ((o = pa_stream_update_timing_info(s, NULL, NULL))) {
393
pa_operation_unref(o);
394
s->auto_timing_update_requested = true;
398
if (s->auto_timing_update_event) {
399
if (s->suspended && !force) {
400
pa_assert(s->mainloop);
401
s->mainloop->time_free(s->auto_timing_update_event);
402
s->auto_timing_update_event = NULL;
405
s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
407
pa_context_rttime_restart(s->context, s->auto_timing_update_event, pa_rtclock_now() + s->auto_timing_interval_usec);
409
s->auto_timing_interval_usec = PA_MIN(AUTO_TIMING_INTERVAL_END_USEC, s->auto_timing_interval_usec*2);
414
void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
415
pa_context *c = userdata;
420
pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_KILLED || command == PA_COMMAND_RECORD_STREAM_KILLED);
423
pa_assert(PA_REFCNT_VALUE(c) >= 1);
427
if (pa_tagstruct_getu32(t, &channel) < 0 ||
428
!pa_tagstruct_eof(t)) {
429
pa_context_fail(c, PA_ERR_PROTOCOL);
433
if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
436
if (s->state != PA_STREAM_READY)
439
pa_context_set_error(c, PA_ERR_KILLED);
440
pa_stream_set_state(s, PA_STREAM_FAILED);
446
static void check_smoother_status(pa_stream *s, bool aposteriori, bool force_start, bool force_stop) {
450
pa_assert(!force_start || !force_stop);
455
x = pa_rtclock_now();
457
if (s->timing_info_valid) {
459
x -= s->timing_info.transport_usec;
461
x += s->timing_info.transport_usec;
464
if (s->suspended || s->corked || force_stop)
465
pa_smoother_pause(s->smoother, x);
466
else if (force_start || s->buffer_attr.prebuf == 0) {
468
if (!s->timing_info_valid &&
472
s->context->version >= 13) {
474
/* If the server supports STARTED events we take them as
475
* indications when audio really starts/stops playing, if
476
* we don't have any timing info yet -- instead of trying
477
* to be smart and guessing the server time. Otherwise the
478
* unknown transport delay adds too much noise to our time
484
pa_smoother_resume(s->smoother, x, true);
487
/* Please note that we have no idea if playback actually started
488
* if prebuf is non-zero! */
491
static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata);
493
void pa_command_stream_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
494
pa_context *c = userdata;
501
uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
504
pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_MOVED || command == PA_COMMAND_RECORD_STREAM_MOVED);
507
pa_assert(PA_REFCNT_VALUE(c) >= 1);
511
if (c->version < 12) {
512
pa_context_fail(c, PA_ERR_PROTOCOL);
516
if (pa_tagstruct_getu32(t, &channel) < 0 ||
517
pa_tagstruct_getu32(t, &di) < 0 ||
518
pa_tagstruct_gets(t, &dn) < 0 ||
519
pa_tagstruct_get_boolean(t, &suspended) < 0) {
520
pa_context_fail(c, PA_ERR_PROTOCOL);
524
if (c->version >= 13) {
526
if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
527
if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
528
pa_tagstruct_getu32(t, &fragsize) < 0 ||
529
pa_tagstruct_get_usec(t, &usec) < 0) {
530
pa_context_fail(c, PA_ERR_PROTOCOL);
534
if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
535
pa_tagstruct_getu32(t, &tlength) < 0 ||
536
pa_tagstruct_getu32(t, &prebuf) < 0 ||
537
pa_tagstruct_getu32(t, &minreq) < 0 ||
538
pa_tagstruct_get_usec(t, &usec) < 0) {
539
pa_context_fail(c, PA_ERR_PROTOCOL);
545
if (!pa_tagstruct_eof(t)) {
546
pa_context_fail(c, PA_ERR_PROTOCOL);
550
if (!dn || di == PA_INVALID_INDEX) {
551
pa_context_fail(c, PA_ERR_PROTOCOL);
555
if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_MOVED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
558
if (s->state != PA_STREAM_READY)
561
if (c->version >= 13) {
562
if (s->direction == PA_STREAM_RECORD)
563
s->timing_info.configured_source_usec = usec;
565
s->timing_info.configured_sink_usec = usec;
567
s->buffer_attr.maxlength = maxlength;
568
s->buffer_attr.fragsize = fragsize;
569
s->buffer_attr.tlength = tlength;
570
s->buffer_attr.prebuf = prebuf;
571
s->buffer_attr.minreq = minreq;
574
pa_xfree(s->device_name);
575
s->device_name = pa_xstrdup(dn);
576
s->device_index = di;
578
s->suspended = suspended;
580
if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
581
s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
582
s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
583
request_auto_timing_update(s, true);
586
check_smoother_status(s, true, false, false);
587
request_auto_timing_update(s, true);
589
if (s->moved_callback)
590
s->moved_callback(s, s->moved_userdata);
596
void pa_command_stream_buffer_attr(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
597
pa_context *c = userdata;
601
uint32_t maxlength = 0, fragsize = 0, minreq = 0, tlength = 0, prebuf = 0;
604
pa_assert(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED || command == PA_COMMAND_RECORD_BUFFER_ATTR_CHANGED);
607
pa_assert(PA_REFCNT_VALUE(c) >= 1);
611
if (c->version < 15) {
612
pa_context_fail(c, PA_ERR_PROTOCOL);
616
if (pa_tagstruct_getu32(t, &channel) < 0) {
617
pa_context_fail(c, PA_ERR_PROTOCOL);
621
if (command == PA_COMMAND_RECORD_STREAM_MOVED) {
622
if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
623
pa_tagstruct_getu32(t, &fragsize) < 0 ||
624
pa_tagstruct_get_usec(t, &usec) < 0) {
625
pa_context_fail(c, PA_ERR_PROTOCOL);
629
if (pa_tagstruct_getu32(t, &maxlength) < 0 ||
630
pa_tagstruct_getu32(t, &tlength) < 0 ||
631
pa_tagstruct_getu32(t, &prebuf) < 0 ||
632
pa_tagstruct_getu32(t, &minreq) < 0 ||
633
pa_tagstruct_get_usec(t, &usec) < 0) {
634
pa_context_fail(c, PA_ERR_PROTOCOL);
639
if (!pa_tagstruct_eof(t)) {
640
pa_context_fail(c, PA_ERR_PROTOCOL);
644
if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
647
if (s->state != PA_STREAM_READY)
650
if (s->direction == PA_STREAM_RECORD)
651
s->timing_info.configured_source_usec = usec;
653
s->timing_info.configured_sink_usec = usec;
655
s->buffer_attr.maxlength = maxlength;
656
s->buffer_attr.fragsize = fragsize;
657
s->buffer_attr.tlength = tlength;
658
s->buffer_attr.prebuf = prebuf;
659
s->buffer_attr.minreq = minreq;
661
request_auto_timing_update(s, true);
663
if (s->buffer_attr_callback)
664
s->buffer_attr_callback(s, s->buffer_attr_userdata);
670
void pa_command_stream_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
671
pa_context *c = userdata;
677
pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED || command == PA_COMMAND_RECORD_STREAM_SUSPENDED);
680
pa_assert(PA_REFCNT_VALUE(c) >= 1);
684
if (c->version < 12) {
685
pa_context_fail(c, PA_ERR_PROTOCOL);
689
if (pa_tagstruct_getu32(t, &channel) < 0 ||
690
pa_tagstruct_get_boolean(t, &suspended) < 0 ||
691
!pa_tagstruct_eof(t)) {
692
pa_context_fail(c, PA_ERR_PROTOCOL);
696
if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_SUSPENDED ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
699
if (s->state != PA_STREAM_READY)
702
s->suspended = suspended;
704
if ((s->flags & PA_STREAM_AUTO_TIMING_UPDATE) && !suspended && !s->auto_timing_update_event) {
705
s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
706
s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
707
request_auto_timing_update(s, true);
710
check_smoother_status(s, true, false, false);
711
request_auto_timing_update(s, true);
713
if (s->suspended_callback)
714
s->suspended_callback(s, s->suspended_userdata);
720
void pa_command_stream_started(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
721
pa_context *c = userdata;
726
pa_assert(command == PA_COMMAND_STARTED);
729
pa_assert(PA_REFCNT_VALUE(c) >= 1);
733
if (c->version < 13) {
734
pa_context_fail(c, PA_ERR_PROTOCOL);
738
if (pa_tagstruct_getu32(t, &channel) < 0 ||
739
!pa_tagstruct_eof(t)) {
740
pa_context_fail(c, PA_ERR_PROTOCOL);
744
if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
747
if (s->state != PA_STREAM_READY)
750
check_smoother_status(s, true, true, false);
751
request_auto_timing_update(s, true);
753
if (s->started_callback)
754
s->started_callback(s, s->started_userdata);
760
void pa_command_stream_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
761
pa_context *c = userdata;
764
pa_proplist *pl = NULL;
768
pa_assert(command == PA_COMMAND_PLAYBACK_STREAM_EVENT || command == PA_COMMAND_RECORD_STREAM_EVENT);
771
pa_assert(PA_REFCNT_VALUE(c) >= 1);
775
if (c->version < 15) {
776
pa_context_fail(c, PA_ERR_PROTOCOL);
780
pl = pa_proplist_new();
782
if (pa_tagstruct_getu32(t, &channel) < 0 ||
783
pa_tagstruct_gets(t, &event) < 0 ||
784
pa_tagstruct_get_proplist(t, pl) < 0 ||
785
!pa_tagstruct_eof(t) || !event) {
786
pa_context_fail(c, PA_ERR_PROTOCOL);
790
if (!(s = pa_hashmap_get(command == PA_COMMAND_PLAYBACK_STREAM_EVENT ? c->playback_streams : c->record_streams, PA_UINT32_TO_PTR(channel))))
793
if (s->state != PA_STREAM_READY)
796
if (pa_streq(event, PA_STREAM_EVENT_FORMAT_LOST)) {
797
/* Let client know what the running time was when the stream had to be killed */
798
pa_usec_t stream_time;
799
if (pa_stream_get_time(s, &stream_time) == 0)
800
pa_proplist_setf(pl, "stream-time", "%llu", (unsigned long long) stream_time);
803
if (s->event_callback)
804
s->event_callback(s, event, pl, s->event_userdata);
810
pa_proplist_free(pl);
813
void pa_command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
815
pa_context *c = userdata;
816
uint32_t bytes, channel;
819
pa_assert(command == PA_COMMAND_REQUEST);
822
pa_assert(PA_REFCNT_VALUE(c) >= 1);
826
if (pa_tagstruct_getu32(t, &channel) < 0 ||
827
pa_tagstruct_getu32(t, &bytes) < 0 ||
828
!pa_tagstruct_eof(t)) {
829
pa_context_fail(c, PA_ERR_PROTOCOL);
833
if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
836
if (s->state != PA_STREAM_READY)
839
s->requested_bytes += bytes;
842
pa_log_debug("got request for %lli, now at %lli", (long long) bytes, (long long) s->requested_bytes);
845
if (s->requested_bytes > 0 && s->write_callback)
846
s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
852
int64_t pa_stream_get_underflow_index(pa_stream *p) {
854
return p->latest_underrun_at_index;
857
void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
859
pa_context *c = userdata;
864
pa_assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
867
pa_assert(PA_REFCNT_VALUE(c) >= 1);
871
if (pa_tagstruct_getu32(t, &channel) < 0) {
872
pa_context_fail(c, PA_ERR_PROTOCOL);
876
if (c->version >= 23 && command == PA_COMMAND_UNDERFLOW) {
877
if (pa_tagstruct_gets64(t, &offset) < 0) {
878
pa_context_fail(c, PA_ERR_PROTOCOL);
883
if (!pa_tagstruct_eof(t)) {
884
pa_context_fail(c, PA_ERR_PROTOCOL);
888
if (!(s = pa_hashmap_get(c->playback_streams, PA_UINT32_TO_PTR(channel))))
891
if (s->state != PA_STREAM_READY)
895
s->latest_underrun_at_index = offset;
897
if (s->buffer_attr.prebuf > 0)
898
check_smoother_status(s, true, false, true);
900
request_auto_timing_update(s, true);
902
if (command == PA_COMMAND_OVERFLOW) {
903
if (s->overflow_callback)
904
s->overflow_callback(s, s->overflow_userdata);
905
} else if (command == PA_COMMAND_UNDERFLOW) {
906
if (s->underflow_callback)
907
s->underflow_callback(s, s->underflow_userdata);
914
static void invalidate_indexes(pa_stream *s, bool r, bool w) {
916
pa_assert(PA_REFCNT_VALUE(s) >= 1);
919
pa_log_debug("invalidate r:%u w:%u tag:%u", r, w, s->context->ctag);
922
if (s->state != PA_STREAM_READY)
926
s->write_index_not_before = s->context->ctag;
928
if (s->timing_info_valid)
929
s->timing_info.write_index_corrupt = true;
932
pa_log_debug("write_index invalidated");
937
s->read_index_not_before = s->context->ctag;
939
if (s->timing_info_valid)
940
s->timing_info.read_index_corrupt = true;
943
pa_log_debug("read_index invalidated");
947
request_auto_timing_update(s, true);
950
static void auto_timing_update_callback(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
951
pa_stream *s = userdata;
954
pa_assert(PA_REFCNT_VALUE(s) >= 1);
957
request_auto_timing_update(s, false);
961
static void create_stream_complete(pa_stream *s) {
963
pa_assert(PA_REFCNT_VALUE(s) >= 1);
964
pa_assert(s->state == PA_STREAM_CREATING);
966
pa_stream_set_state(s, PA_STREAM_READY);
968
if (s->requested_bytes > 0 && s->write_callback)
969
s->write_callback(s, (size_t) s->requested_bytes, s->write_userdata);
971
if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) {
972
s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC;
973
pa_assert(!s->auto_timing_update_event);
974
s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s);
976
request_auto_timing_update(s, true);
979
check_smoother_status(s, true, false, false);
982
static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) {
988
if ((e = getenv("PULSE_LATENCY_MSEC"))) {
991
if (pa_atou(e, &ms) < 0 || ms <= 0)
992
pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e);
994
attr->maxlength = (uint32_t) -1;
995
attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &s->sample_spec);
996
attr->minreq = (uint32_t) -1;
997
attr->prebuf = (uint32_t) -1;
998
attr->fragsize = attr->tlength;
1002
*flags |= PA_STREAM_ADJUST_LATENCY;
1005
if (s->context->version >= 13)
1008
/* Version older than 0.9.10 didn't do server side buffer_attr
1009
* selection, hence we have to fake it on the client side. */
1011
/* We choose fairly conservative values here, to not confuse
1012
* old clients with extremely large playback buffers */
1014
if (attr->maxlength == (uint32_t) -1)
1015
attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */
1017
if (attr->tlength == (uint32_t) -1)
1018
attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */
1020
if (attr->minreq == (uint32_t) -1)
1021
attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */
1023
if (attr->prebuf == (uint32_t) -1)
1024
attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */
1026
if (attr->fragsize == (uint32_t) -1)
1027
attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
1030
void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1031
pa_stream *s = userdata;
1032
uint32_t requested_bytes = 0;
1036
pa_assert(PA_REFCNT_VALUE(s) >= 1);
1037
pa_assert(s->state == PA_STREAM_CREATING);
1041
if (command != PA_COMMAND_REPLY) {
1042
if (pa_context_handle_error(s->context, command, t, false) < 0)
1045
pa_stream_set_state(s, PA_STREAM_FAILED);
1049
if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
1050
s->channel == PA_INVALID_INDEX ||
1051
((s->direction != PA_STREAM_UPLOAD) && (pa_tagstruct_getu32(t, &s->stream_index) < 0 || s->stream_index == PA_INVALID_INDEX)) ||
1052
((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &requested_bytes) < 0)) {
1053
pa_context_fail(s->context, PA_ERR_PROTOCOL);
1057
s->requested_bytes = (int64_t) requested_bytes;
1059
if (s->context->version >= 9) {
1060
if (s->direction == PA_STREAM_PLAYBACK) {
1061
if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1062
pa_tagstruct_getu32(t, &s->buffer_attr.tlength) < 0 ||
1063
pa_tagstruct_getu32(t, &s->buffer_attr.prebuf) < 0 ||
1064
pa_tagstruct_getu32(t, &s->buffer_attr.minreq) < 0) {
1065
pa_context_fail(s->context, PA_ERR_PROTOCOL);
1068
} else if (s->direction == PA_STREAM_RECORD) {
1069
if (pa_tagstruct_getu32(t, &s->buffer_attr.maxlength) < 0 ||
1070
pa_tagstruct_getu32(t, &s->buffer_attr.fragsize) < 0) {
1071
pa_context_fail(s->context, PA_ERR_PROTOCOL);
1077
if (s->context->version >= 12 && s->direction != PA_STREAM_UPLOAD) {
1080
const char *dn = NULL;
1083
if (pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
1084
pa_tagstruct_get_channel_map(t, &cm) < 0 ||
1085
pa_tagstruct_getu32(t, &s->device_index) < 0 ||
1086
pa_tagstruct_gets(t, &dn) < 0 ||
1087
pa_tagstruct_get_boolean(t, &suspended) < 0) {
1088
pa_context_fail(s->context, PA_ERR_PROTOCOL);
1092
if (!dn || s->device_index == PA_INVALID_INDEX ||
1093
ss.channels != cm.channels ||
1094
!pa_channel_map_valid(&cm) ||
1095
!pa_sample_spec_valid(&ss) ||
1096
(s->n_formats == 0 && (
1097
(!(s->flags & PA_STREAM_FIX_FORMAT) && ss.format != s->sample_spec.format) ||
1098
(!(s->flags & PA_STREAM_FIX_RATE) && ss.rate != s->sample_spec.rate) ||
1099
(!(s->flags & PA_STREAM_FIX_CHANNELS) && !pa_channel_map_equal(&cm, &s->channel_map))))) {
1100
pa_context_fail(s->context, PA_ERR_PROTOCOL);
1104
pa_xfree(s->device_name);
1105
s->device_name = pa_xstrdup(dn);
1106
s->suspended = suspended;
1108
s->channel_map = cm;
1109
s->sample_spec = ss;
1112
if (s->context->version >= 13 && s->direction != PA_STREAM_UPLOAD) {
1115
if (pa_tagstruct_get_usec(t, &usec) < 0) {
1116
pa_context_fail(s->context, PA_ERR_PROTOCOL);
1120
if (s->direction == PA_STREAM_RECORD)
1121
s->timing_info.configured_source_usec = usec;
1123
s->timing_info.configured_sink_usec = usec;
1126
if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1127
|| s->context->version >= 22) {
1129
pa_format_info *f = pa_format_info_new();
1130
pa_tagstruct_get_format_info(t, f);
1132
if (pa_format_info_valid(f))
1135
pa_format_info_free(f);
1136
if (s->n_formats > 0) {
1137
/* We used the extended API, so we should have got back a proper format */
1138
pa_context_fail(s->context, PA_ERR_PROTOCOL);
1144
if (!pa_tagstruct_eof(t)) {
1145
pa_context_fail(s->context, PA_ERR_PROTOCOL);
1149
if (s->direction == PA_STREAM_RECORD) {
1150
pa_assert(!s->record_memblockq);
1152
s->record_memblockq = pa_memblockq_new(
1153
"client side record memblockq",
1155
s->buffer_attr.maxlength,
1164
s->channel_valid = true;
1165
pa_hashmap_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, PA_UINT32_TO_PTR(s->channel), s);
1167
create_stream_complete(s);
1173
static int create_stream(
1174
pa_stream_direction_t direction,
1177
const pa_buffer_attr *attr,
1178
pa_stream_flags_t flags,
1179
const pa_cvolume *volume,
1180
pa_stream *sync_stream) {
1184
bool volume_set = !!volume;
1189
pa_assert(PA_REFCNT_VALUE(s) >= 1);
1190
pa_assert(direction == PA_STREAM_PLAYBACK || direction == PA_STREAM_RECORD);
1192
PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1193
PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
1194
PA_CHECK_VALIDITY(s->context, s->direct_on_input == PA_INVALID_INDEX || direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1195
PA_CHECK_VALIDITY(s->context, !(flags & ~(PA_STREAM_START_CORKED|
1196
PA_STREAM_INTERPOLATE_TIMING|
1197
PA_STREAM_NOT_MONOTONIC|
1198
PA_STREAM_AUTO_TIMING_UPDATE|
1199
PA_STREAM_NO_REMAP_CHANNELS|
1200
PA_STREAM_NO_REMIX_CHANNELS|
1201
PA_STREAM_FIX_FORMAT|
1203
PA_STREAM_FIX_CHANNELS|
1204
PA_STREAM_DONT_MOVE|
1205
PA_STREAM_VARIABLE_RATE|
1206
PA_STREAM_PEAK_DETECT|
1207
PA_STREAM_START_MUTED|
1208
PA_STREAM_ADJUST_LATENCY|
1209
PA_STREAM_EARLY_REQUESTS|
1210
PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND|
1211
PA_STREAM_START_UNMUTED|
1212
PA_STREAM_FAIL_ON_SUSPEND|
1213
PA_STREAM_RELATIVE_VOLUME|
1214
PA_STREAM_PASSTHROUGH)), PA_ERR_INVALID);
1216
PA_CHECK_VALIDITY(s->context, s->context->version >= 12 || !(flags & PA_STREAM_VARIABLE_RATE), PA_ERR_NOTSUPPORTED);
1217
PA_CHECK_VALIDITY(s->context, s->context->version >= 13 || !(flags & PA_STREAM_PEAK_DETECT), PA_ERR_NOTSUPPORTED);
1218
PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
1219
/* Although some of the other flags are not supported on older
1220
* version, we don't check for them here, because it doesn't hurt
1221
* when they are passed but actually not supported. This makes
1222
* client development easier */
1224
PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_RECORD || !(flags & (PA_STREAM_PEAK_DETECT)), PA_ERR_INVALID);
1225
PA_CHECK_VALIDITY(s->context, !sync_stream || (direction == PA_STREAM_PLAYBACK && sync_stream->direction == PA_STREAM_PLAYBACK), PA_ERR_INVALID);
1226
PA_CHECK_VALIDITY(s->context, (flags & (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS)) != (PA_STREAM_ADJUST_LATENCY|PA_STREAM_EARLY_REQUESTS), PA_ERR_INVALID);
1230
s->direction = direction;
1233
s->syncid = sync_stream->syncid;
1236
s->buffer_attr = *attr;
1237
patch_buffer_attr(s, &s->buffer_attr, &flags);
1240
s->corked = !!(flags & PA_STREAM_START_CORKED);
1242
if (flags & PA_STREAM_INTERPOLATE_TIMING) {
1245
x = pa_rtclock_now();
1247
pa_assert(!s->smoother);
1248
s->smoother = pa_smoother_new(
1249
SMOOTHER_ADJUST_TIME,
1250
SMOOTHER_HISTORY_TIME,
1251
!(flags & PA_STREAM_NOT_MONOTONIC),
1253
SMOOTHER_MIN_HISTORY,
1259
dev = s->direction == PA_STREAM_PLAYBACK ? s->context->conf->default_sink : s->context->conf->default_source;
1261
t = pa_tagstruct_command(
1263
(uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM),
1266
if (s->context->version < 13)
1267
pa_tagstruct_puts(t, pa_proplist_gets(s->proplist, PA_PROP_MEDIA_NAME));
1271
PA_TAG_SAMPLE_SPEC, &s->sample_spec,
1272
PA_TAG_CHANNEL_MAP, &s->channel_map,
1273
PA_TAG_U32, PA_INVALID_INDEX,
1275
PA_TAG_U32, s->buffer_attr.maxlength,
1276
PA_TAG_BOOLEAN, s->corked,
1280
if (pa_sample_spec_valid(&s->sample_spec))
1281
volume = pa_cvolume_reset(&cv, s->sample_spec.channels);
1283
/* This is not really relevant, since no volume was set, and
1284
* the real number of channels is embedded in the format_info
1286
volume = pa_cvolume_reset(&cv, PA_CHANNELS_MAX);
1290
if (s->direction == PA_STREAM_PLAYBACK) {
1293
PA_TAG_U32, s->buffer_attr.tlength,
1294
PA_TAG_U32, s->buffer_attr.prebuf,
1295
PA_TAG_U32, s->buffer_attr.minreq,
1296
PA_TAG_U32, s->syncid,
1299
pa_tagstruct_put_cvolume(t, volume);
1301
pa_tagstruct_putu32(t, s->buffer_attr.fragsize);
1303
if (s->context->version >= 12) {
1306
PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMAP_CHANNELS,
1307
PA_TAG_BOOLEAN, flags & PA_STREAM_NO_REMIX_CHANNELS,
1308
PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_FORMAT,
1309
PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_RATE,
1310
PA_TAG_BOOLEAN, flags & PA_STREAM_FIX_CHANNELS,
1311
PA_TAG_BOOLEAN, flags & PA_STREAM_DONT_MOVE,
1312
PA_TAG_BOOLEAN, flags & PA_STREAM_VARIABLE_RATE,
1316
if (s->context->version >= 13) {
1318
if (s->direction == PA_STREAM_PLAYBACK)
1319
pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1321
pa_tagstruct_put_boolean(t, flags & PA_STREAM_PEAK_DETECT);
1325
PA_TAG_BOOLEAN, flags & PA_STREAM_ADJUST_LATENCY,
1326
PA_TAG_PROPLIST, s->proplist,
1329
if (s->direction == PA_STREAM_RECORD)
1330
pa_tagstruct_putu32(t, s->direct_on_input);
1333
if (s->context->version >= 14) {
1335
if (s->direction == PA_STREAM_PLAYBACK)
1336
pa_tagstruct_put_boolean(t, volume_set);
1338
pa_tagstruct_put_boolean(t, flags & PA_STREAM_EARLY_REQUESTS);
1341
if (s->context->version >= 15) {
1343
if (s->direction == PA_STREAM_PLAYBACK)
1344
pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1346
pa_tagstruct_put_boolean(t, flags & PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND);
1347
pa_tagstruct_put_boolean(t, flags & PA_STREAM_FAIL_ON_SUSPEND);
1350
if (s->context->version >= 17 && s->direction == PA_STREAM_PLAYBACK)
1351
pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1353
if (s->context->version >= 18 && s->direction == PA_STREAM_PLAYBACK)
1354
pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1356
if ((s->context->version >= 21 && s->direction == PA_STREAM_PLAYBACK)
1357
|| s->context->version >= 22) {
1359
pa_tagstruct_putu8(t, s->n_formats);
1360
for (i = 0; i < s->n_formats; i++)
1361
pa_tagstruct_put_format_info(t, s->req_formats[i]);
1364
if (s->context->version >= 22 && s->direction == PA_STREAM_RECORD) {
1365
pa_tagstruct_put_cvolume(t, volume);
1366
pa_tagstruct_put_boolean(t, flags & PA_STREAM_START_MUTED);
1367
pa_tagstruct_put_boolean(t, volume_set);
1368
pa_tagstruct_put_boolean(t, flags & (PA_STREAM_START_MUTED|PA_STREAM_START_UNMUTED));
1369
pa_tagstruct_put_boolean(t, flags & PA_STREAM_RELATIVE_VOLUME);
1370
pa_tagstruct_put_boolean(t, flags & (PA_STREAM_PASSTHROUGH));
1373
pa_pstream_send_tagstruct(s->context->pstream, t);
1374
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL);
1376
pa_stream_set_state(s, PA_STREAM_CREATING);
1382
int pa_stream_connect_playback(
1385
const pa_buffer_attr *attr,
1386
pa_stream_flags_t flags,
1387
const pa_cvolume *volume,
1388
pa_stream *sync_stream) {
1391
pa_assert(PA_REFCNT_VALUE(s) >= 1);
1393
return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
1396
int pa_stream_connect_record(
1399
const pa_buffer_attr *attr,
1400
pa_stream_flags_t flags) {
1403
pa_assert(PA_REFCNT_VALUE(s) >= 1);
1405
return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
1408
int pa_stream_begin_write(
1414
pa_assert(PA_REFCNT_VALUE(s) >= 1);
1416
PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1417
PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1418
PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1419
PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
1420
PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
1422
if (*nbytes != (size_t) -1) {
1425
m = pa_mempool_block_size_max(s->context->mempool);
1426
fs = pa_frame_size(&s->sample_spec);
1433
if (!s->write_memblock) {
1434
s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes);
1435
s->write_data = pa_memblock_acquire(s->write_memblock);
1438
*data = s->write_data;
1439
*nbytes = pa_memblock_get_length(s->write_memblock);
1444
int pa_stream_cancel_write(
1448
pa_assert(PA_REFCNT_VALUE(s) >= 1);
1450
PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1451
PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1452
PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1453
PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE);
1455
pa_assert(s->write_data);
1457
pa_memblock_release(s->write_memblock);
1458
pa_memblock_unref(s->write_memblock);
1459
s->write_memblock = NULL;
1460
s->write_data = NULL;
1465
int pa_stream_write_ext_free(
1469
pa_free_cb_t free_cb,
1472
pa_seek_mode_t seek) {
1475
pa_assert(PA_REFCNT_VALUE(s) >= 1);
1478
PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1479
PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1480
PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1481
PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
1482
PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
1483
PA_CHECK_VALIDITY(s->context,
1484
!s->write_memblock ||
1485
((data >= s->write_data) &&
1486
((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))),
1488
PA_CHECK_VALIDITY(s->context, offset % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1489
PA_CHECK_VALIDITY(s->context, length % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID);
1490
PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID);
1492
if (s->write_memblock) {
1495
/* pa_stream_write_begin() was called before */
1497
pa_memblock_release(s->write_memblock);
1499
chunk.memblock = s->write_memblock;
1500
chunk.index = (const char *) data - (const char *) s->write_data;
1501
chunk.length = length;
1503
s->write_memblock = NULL;
1504
s->write_data = NULL;
1506
pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
1507
pa_memblock_unref(chunk.memblock);
1510
pa_seek_mode_t t_seek = seek;
1511
int64_t t_offset = offset;
1512
size_t t_length = length;
1513
const void *t_data = data;
1515
/* pa_stream_write_begin() was not called before */
1517
while (t_length > 0) {
1522
if (free_cb && !pa_pstream_get_shm(s->context->pstream)) {
1523
chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, free_cb_data, 1);
1524
chunk.length = t_length;
1528
chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool));
1529
chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length);
1531
d = pa_memblock_acquire(chunk.memblock);
1532
memcpy(d, t_data, chunk.length);
1533
pa_memblock_release(chunk.memblock);
1536
pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk);
1539
t_seek = PA_SEEK_RELATIVE;
1541
t_data = (const uint8_t*) t_data + chunk.length;
1542
t_length -= chunk.length;
1544
pa_memblock_unref(chunk.memblock);
1547
if (free_cb && pa_pstream_get_shm(s->context->pstream))
1548
free_cb(free_cb_data);
1551
/* This is obviously wrong since we ignore the seeking index . But
1552
* that's OK, the server side applies the same error */
1553
s->requested_bytes -= (seek == PA_SEEK_RELATIVE ? offset : 0) + (int64_t) length;
1556
pa_log_debug("wrote %lli, now at %lli", (long long) length, (long long) s->requested_bytes);
1559
if (s->direction == PA_STREAM_PLAYBACK) {
1561
/* Update latency request correction */
1562
if (s->write_index_corrections[s->current_write_index_correction].valid) {
1564
if (seek == PA_SEEK_ABSOLUTE) {
1565
s->write_index_corrections[s->current_write_index_correction].corrupt = false;
1566
s->write_index_corrections[s->current_write_index_correction].absolute = true;
1567
s->write_index_corrections[s->current_write_index_correction].value = offset + (int64_t) length;
1568
} else if (seek == PA_SEEK_RELATIVE) {
1569
if (!s->write_index_corrections[s->current_write_index_correction].corrupt)
1570
s->write_index_corrections[s->current_write_index_correction].value += offset + (int64_t) length;
1572
s->write_index_corrections[s->current_write_index_correction].corrupt = true;
1575
/* Update the write index in the already available latency data */
1576
if (s->timing_info_valid) {
1578
if (seek == PA_SEEK_ABSOLUTE) {
1579
s->timing_info.write_index_corrupt = false;
1580
s->timing_info.write_index = offset + (int64_t) length;
1581
} else if (seek == PA_SEEK_RELATIVE) {
1582
if (!s->timing_info.write_index_corrupt)
1583
s->timing_info.write_index += offset + (int64_t) length;
1585
s->timing_info.write_index_corrupt = true;
1588
if (!s->timing_info_valid || s->timing_info.write_index_corrupt)
1589
request_auto_timing_update(s, true);
1595
int pa_stream_write(
1599
pa_free_cb_t free_cb,
1601
pa_seek_mode_t seek) {
1603
return pa_stream_write_ext_free(s, data, length, free_cb, (void*) data, offset, seek);
1606
int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
1608
pa_assert(PA_REFCNT_VALUE(s) >= 1);
1612
PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1613
PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1614
PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1616
if (!s->peek_memchunk.memblock) {
1618
if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
1619
/* record_memblockq is empty. */
1624
} else if (!s->peek_memchunk.memblock) {
1625
/* record_memblockq isn't empty, but it doesn't have any data at
1626
* the current read index. */
1628
*length = s->peek_memchunk.length;
1632
s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
1635
pa_assert(s->peek_data);
1636
*data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
1637
*length = s->peek_memchunk.length;
1641
int pa_stream_drop(pa_stream *s) {
1643
pa_assert(PA_REFCNT_VALUE(s) >= 1);
1645
PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1646
PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1647
PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
1648
PA_CHECK_VALIDITY(s->context, s->peek_memchunk.length > 0, PA_ERR_BADSTATE);
1650
pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);
1652
/* Fix the simulated local read index */
1653
if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
1654
s->timing_info.read_index += (int64_t) s->peek_memchunk.length;
1656
if (s->peek_memchunk.memblock) {
1657
pa_assert(s->peek_data);
1658
s->peek_data = NULL;
1659
pa_memblock_release(s->peek_memchunk.memblock);
1660
pa_memblock_unref(s->peek_memchunk.memblock);
1663
pa_memchunk_reset(&s->peek_memchunk);
1668
size_t pa_stream_writable_size(pa_stream *s) {
1670
pa_assert(PA_REFCNT_VALUE(s) >= 1);
1672
PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1673
PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1674
PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1676
return s->requested_bytes > 0 ? (size_t) s->requested_bytes : 0;
1679
size_t pa_stream_readable_size(pa_stream *s) {
1681
pa_assert(PA_REFCNT_VALUE(s) >= 1);
1683
PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, (size_t) -1);
1684
PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
1685
PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
1687
return pa_memblockq_get_length(s->record_memblockq);
1690
pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1696
pa_assert(PA_REFCNT_VALUE(s) >= 1);
1698
PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1699
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1700
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
1702
/* Ask for a timing update before we cork/uncork to get the best
1703
* accuracy for the transport latency suitable for the
1704
* check_smoother_status() call in the started callback */
1705
request_auto_timing_update(s, true);
1707
o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1709
t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag);
1710
pa_tagstruct_putu32(t, s->channel);
1711
pa_pstream_send_tagstruct(s->context->pstream, t);
1712
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
1714
/* This might cause the read index to continue again, hence
1715
* let's request a timing update */
1716
request_auto_timing_update(s, true);
1721
static pa_usec_t calc_time(pa_stream *s, bool ignore_transport) {
1725
pa_assert(PA_REFCNT_VALUE(s) >= 1);
1726
pa_assert(s->state == PA_STREAM_READY);
1727
pa_assert(s->direction != PA_STREAM_UPLOAD);
1728
pa_assert(s->timing_info_valid);
1729
pa_assert(s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt);
1730
pa_assert(s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt);
1732
if (s->direction == PA_STREAM_PLAYBACK) {
1733
/* The last byte that was written into the output device
1734
* had this time value associated */
1735
usec = pa_bytes_to_usec(s->timing_info.read_index < 0 ? 0 : (uint64_t) s->timing_info.read_index, &s->sample_spec);
1737
if (!s->corked && !s->suspended) {
1739
if (!ignore_transport)
1740
/* Because the latency info took a little time to come
1741
* to us, we assume that the real output time is actually
1743
usec += s->timing_info.transport_usec;
1745
/* However, the output device usually maintains a buffer
1746
too, hence the real sample currently played is a little
1748
if (s->timing_info.sink_usec >= usec)
1751
usec -= s->timing_info.sink_usec;
1755
pa_assert(s->direction == PA_STREAM_RECORD);
1757
/* The last byte written into the server side queue had
1758
* this time value associated */
1759
usec = pa_bytes_to_usec(s->timing_info.write_index < 0 ? 0 : (uint64_t) s->timing_info.write_index, &s->sample_spec);
1761
if (!s->corked && !s->suspended) {
1763
if (!ignore_transport)
1764
/* Add transport latency */
1765
usec += s->timing_info.transport_usec;
1767
/* Add latency of data in device buffer */
1768
usec += s->timing_info.source_usec;
1770
/* If this is a monitor source, we need to correct the
1771
* time by the playback device buffer */
1772
if (s->timing_info.sink_usec >= usec)
1775
usec -= s->timing_info.sink_usec;
1782
static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
1783
pa_operation *o = userdata;
1784
struct timeval local, remote, now;
1786
bool playing = false;
1787
uint64_t underrun_for = 0, playing_for = 0;
1791
pa_assert(PA_REFCNT_VALUE(o) >= 1);
1793
if (!o->context || !o->stream)
1796
i = &o->stream->timing_info;
1798
o->stream->timing_info_valid = false;
1799
i->write_index_corrupt = true;
1800
i->read_index_corrupt = true;
1802
if (command != PA_COMMAND_REPLY) {
1803
if (pa_context_handle_error(o->context, command, t, false) < 0)
1808
if (pa_tagstruct_get_usec(t, &i->sink_usec) < 0 ||
1809
pa_tagstruct_get_usec(t, &i->source_usec) < 0 ||
1810
pa_tagstruct_get_boolean(t, &playing) < 0 ||
1811
pa_tagstruct_get_timeval(t, &local) < 0 ||
1812
pa_tagstruct_get_timeval(t, &remote) < 0 ||
1813
pa_tagstruct_gets64(t, &i->write_index) < 0 ||
1814
pa_tagstruct_gets64(t, &i->read_index) < 0) {
1816
pa_context_fail(o->context, PA_ERR_PROTOCOL);
1820
if (o->context->version >= 13 &&
1821
o->stream->direction == PA_STREAM_PLAYBACK)
1822
if (pa_tagstruct_getu64(t, &underrun_for) < 0 ||
1823
pa_tagstruct_getu64(t, &playing_for) < 0) {
1825
pa_context_fail(o->context, PA_ERR_PROTOCOL);
1829
if (!pa_tagstruct_eof(t)) {
1830
pa_context_fail(o->context, PA_ERR_PROTOCOL);
1833
o->stream->timing_info_valid = true;
1834
i->write_index_corrupt = false;
1835
i->read_index_corrupt = false;
1837
i->playing = (int) playing;
1838
i->since_underrun = (int64_t) (playing ? playing_for : underrun_for);
1840
pa_gettimeofday(&now);
1842
/* Calculate timestamps */
1843
if (pa_timeval_cmp(&local, &remote) <= 0 && pa_timeval_cmp(&remote, &now) <= 0) {
1844
/* local and remote seem to have synchronized clocks */
1846
if (o->stream->direction == PA_STREAM_PLAYBACK)
1847
i->transport_usec = pa_timeval_diff(&remote, &local);
1849
i->transport_usec = pa_timeval_diff(&now, &remote);
1851
i->synchronized_clocks = true;
1852
i->timestamp = remote;
1854
/* clocks are not synchronized, let's estimate latency then */
1855
i->transport_usec = pa_timeval_diff(&now, &local)/2;
1856
i->synchronized_clocks = false;
1857
i->timestamp = local;
1858
pa_timeval_add(&i->timestamp, i->transport_usec);
1861
/* Invalidate read and write indexes if necessary */
1862
if (tag < o->stream->read_index_not_before)
1863
i->read_index_corrupt = true;
1865
if (tag < o->stream->write_index_not_before)
1866
i->write_index_corrupt = true;
1868
if (o->stream->direction == PA_STREAM_PLAYBACK) {
1869
/* Write index correction */
1872
uint32_t ctag = tag;
1874
/* Go through the saved correction values and add up the
1875
* total correction.*/
1876
for (n = 0, j = o->stream->current_write_index_correction+1;
1877
n < PA_MAX_WRITE_INDEX_CORRECTIONS;
1878
n++, j = (j + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS) {
1880
/* Step over invalid data or out-of-date data */
1881
if (!o->stream->write_index_corrections[j].valid ||
1882
o->stream->write_index_corrections[j].tag < ctag)
1885
/* Make sure that everything is in order */
1886
ctag = o->stream->write_index_corrections[j].tag+1;
1888
/* Now fix the write index */
1889
if (o->stream->write_index_corrections[j].corrupt) {
1890
/* A corrupting seek was made */
1891
i->write_index_corrupt = true;
1892
} else if (o->stream->write_index_corrections[j].absolute) {
1893
/* An absolute seek was made */
1894
i->write_index = o->stream->write_index_corrections[j].value;
1895
i->write_index_corrupt = false;
1896
} else if (!i->write_index_corrupt) {
1897
/* A relative seek was made */
1898
i->write_index += o->stream->write_index_corrections[j].value;
1902
/* Clear old correction entries */
1903
for (n = 0; n < PA_MAX_WRITE_INDEX_CORRECTIONS; n++) {
1904
if (!o->stream->write_index_corrections[n].valid)
1907
if (o->stream->write_index_corrections[n].tag <= tag)
1908
o->stream->write_index_corrections[n].valid = false;
1912
if (o->stream->direction == PA_STREAM_RECORD) {
1913
/* Read index correction */
1915
if (!i->read_index_corrupt)
1916
i->read_index -= (int64_t) pa_memblockq_get_length(o->stream->record_memblockq);
1919
/* Update smoother if we're not corked */
1920
if (o->stream->smoother && !o->stream->corked) {
1923
u = x = pa_rtclock_now() - i->transport_usec;
1925
if (o->stream->direction == PA_STREAM_PLAYBACK && o->context->version >= 13) {
1928
/* If we weren't playing then it will take some time
1929
* until the audio will actually come out through the
1930
* speakers. Since we follow that timing here, we need
1931
* to try to fix this up */
1933
su = pa_bytes_to_usec((uint64_t) i->since_underrun, &o->stream->sample_spec);
1935
if (su < i->sink_usec)
1936
x += i->sink_usec - su;
1940
pa_smoother_pause(o->stream->smoother, x);
1942
/* Update the smoother */
1943
if ((o->stream->direction == PA_STREAM_PLAYBACK && !i->read_index_corrupt) ||
1944
(o->stream->direction == PA_STREAM_RECORD && !i->write_index_corrupt))
1945
pa_smoother_put(o->stream->smoother, u, calc_time(o->stream, true));
1948
pa_smoother_resume(o->stream->smoother, x, true);
1952
o->stream->auto_timing_update_requested = false;
1954
if (o->stream->latency_update_callback)
1955
o->stream->latency_update_callback(o->stream, o->stream->latency_update_userdata);
1957
if (o->callback && o->stream && o->stream->state == PA_STREAM_READY) {
1958
pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
1959
cb(o->stream, o->stream->timing_info_valid, o->userdata);
1964
pa_operation_done(o);
1965
pa_operation_unref(o);
1968
pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
1976
pa_assert(PA_REFCNT_VALUE(s) >= 1);
1978
PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
1979
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
1980
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
1982
if (s->direction == PA_STREAM_PLAYBACK) {
1983
/* Find a place to store the write_index correction data for this entry */
1984
cidx = (s->current_write_index_correction + 1) % PA_MAX_WRITE_INDEX_CORRECTIONS;
1986
/* Check if we could allocate a correction slot. If not, there are too many outstanding queries */
1987
PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL);
1989
o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
1991
t = pa_tagstruct_command(
1993
(uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY),
1995
pa_tagstruct_putu32(t, s->channel);
1996
pa_tagstruct_put_timeval(t, pa_gettimeofday(&now));
1998
pa_pstream_send_tagstruct(s->context->pstream, t);
1999
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2001
if (s->direction == PA_STREAM_PLAYBACK) {
2002
/* Fill in initial correction data */
2004
s->current_write_index_correction = cidx;
2006
s->write_index_corrections[cidx].valid = true;
2007
s->write_index_corrections[cidx].absolute = false;
2008
s->write_index_corrections[cidx].corrupt = false;
2009
s->write_index_corrections[cidx].tag = tag;
2010
s->write_index_corrections[cidx].value = 0;
2016
void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2017
pa_stream *s = userdata;
2021
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2025
if (command != PA_COMMAND_REPLY) {
2026
if (pa_context_handle_error(s->context, command, t, false) < 0)
2029
pa_stream_set_state(s, PA_STREAM_FAILED);
2031
} else if (!pa_tagstruct_eof(t)) {
2032
pa_context_fail(s->context, PA_ERR_PROTOCOL);
2036
pa_stream_set_state(s, PA_STREAM_TERMINATED);
2042
int pa_stream_disconnect(pa_stream *s) {
2047
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2049
PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2050
PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
2051
PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
2055
t = pa_tagstruct_command(
2057
(uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
2058
(s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)),
2060
pa_tagstruct_putu32(t, s->channel);
2061
pa_pstream_send_tagstruct(s->context->pstream, t);
2062
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL);
2068
void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2070
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2072
if (pa_detect_fork())
2075
if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2078
s->read_callback = cb;
2079
s->read_userdata = userdata;
2082
void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
2084
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2086
if (pa_detect_fork())
2089
if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2092
s->write_callback = cb;
2093
s->write_userdata = userdata;
2096
void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2098
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2100
if (pa_detect_fork())
2103
if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2106
s->state_callback = cb;
2107
s->state_userdata = userdata;
2110
void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2112
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2114
if (pa_detect_fork())
2117
if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2120
s->overflow_callback = cb;
2121
s->overflow_userdata = userdata;
2124
void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2126
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2128
if (pa_detect_fork())
2131
if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2134
s->underflow_callback = cb;
2135
s->underflow_userdata = userdata;
2138
void pa_stream_set_latency_update_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2140
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2142
if (pa_detect_fork())
2145
if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2148
s->latency_update_callback = cb;
2149
s->latency_update_userdata = userdata;
2152
void pa_stream_set_moved_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2154
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2156
if (pa_detect_fork())
2159
if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2162
s->moved_callback = cb;
2163
s->moved_userdata = userdata;
2166
void pa_stream_set_suspended_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2168
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2170
if (pa_detect_fork())
2173
if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2176
s->suspended_callback = cb;
2177
s->suspended_userdata = userdata;
2180
void pa_stream_set_started_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2182
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2184
if (pa_detect_fork())
2187
if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2190
s->started_callback = cb;
2191
s->started_userdata = userdata;
2194
void pa_stream_set_event_callback(pa_stream *s, pa_stream_event_cb_t cb, void *userdata) {
2196
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2198
if (pa_detect_fork())
2201
if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2204
s->event_callback = cb;
2205
s->event_userdata = userdata;
2208
void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
2210
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2212
if (pa_detect_fork())
2215
if (s->state == PA_STREAM_TERMINATED || s->state == PA_STREAM_FAILED)
2218
s->buffer_attr_callback = cb;
2219
s->buffer_attr_userdata = userdata;
2222
void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2223
pa_operation *o = userdata;
2228
pa_assert(PA_REFCNT_VALUE(o) >= 1);
2233
if (command != PA_COMMAND_REPLY) {
2234
if (pa_context_handle_error(o->context, command, t, false) < 0)
2238
} else if (!pa_tagstruct_eof(t)) {
2239
pa_context_fail(o->context, PA_ERR_PROTOCOL);
2244
pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2245
cb(o->stream, success, o->userdata);
2249
pa_operation_done(o);
2250
pa_operation_unref(o);
2253
pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) {
2259
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2261
PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2262
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2263
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2265
/* Ask for a timing update before we cork/uncork to get the best
2266
* accuracy for the transport latency suitable for the
2267
* check_smoother_status() call in the started callback */
2268
request_auto_timing_update(s, true);
2272
o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2274
t = pa_tagstruct_command(
2276
(uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CORK_PLAYBACK_STREAM : PA_COMMAND_CORK_RECORD_STREAM),
2278
pa_tagstruct_putu32(t, s->channel);
2279
pa_tagstruct_put_boolean(t, !!b);
2280
pa_pstream_send_tagstruct(s->context->pstream, t);
2281
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2283
check_smoother_status(s, false, false, false);
2285
/* This might cause the indexes to hang/start again, hence let's
2286
* request a timing update, after the cork/uncork, too */
2287
request_auto_timing_update(s, true);
2292
static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) {
2298
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2300
PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2301
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2303
o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2305
t = pa_tagstruct_command(s->context, command, &tag);
2306
pa_tagstruct_putu32(t, s->channel);
2307
pa_pstream_send_tagstruct(s->context->pstream, t);
2308
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2313
pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2317
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2319
PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2320
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2321
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2323
/* Ask for a timing update *before* the flush, so that the
2324
* transport usec is as up to date as possible when we get the
2325
* underflow message and update the smoother status*/
2326
request_auto_timing_update(s, true);
2328
if (!(o = stream_send_simple_command(s, (uint32_t) (s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_FLUSH_PLAYBACK_STREAM : PA_COMMAND_FLUSH_RECORD_STREAM), cb, userdata)))
2331
if (s->direction == PA_STREAM_PLAYBACK) {
2333
if (s->write_index_corrections[s->current_write_index_correction].valid)
2334
s->write_index_corrections[s->current_write_index_correction].corrupt = true;
2336
if (s->buffer_attr.prebuf > 0)
2337
check_smoother_status(s, false, false, true);
2339
/* This will change the write index, but leave the
2340
* read index untouched. */
2341
invalidate_indexes(s, false, true);
2344
/* For record streams this has no influence on the write
2345
* index, but the read index might jump. */
2346
invalidate_indexes(s, true, false);
2348
/* Note that we do not update requested_bytes here. This is
2349
* because we cannot really know how data actually was dropped
2350
* from the write index due to this. This 'error' will be applied
2351
* by both client and server and hence we should be fine. */
2356
pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2360
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2362
PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2363
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2364
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2365
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2367
/* Ask for a timing update before we cork/uncork to get the best
2368
* accuracy for the transport latency suitable for the
2369
* check_smoother_status() call in the started callback */
2370
request_auto_timing_update(s, true);
2372
if (!(o = stream_send_simple_command(s, PA_COMMAND_PREBUF_PLAYBACK_STREAM, cb, userdata)))
2375
/* This might cause the read index to hang again, hence
2376
* let's request a timing update */
2377
request_auto_timing_update(s, true);
2382
pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) {
2386
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2388
PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2389
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2390
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
2391
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->buffer_attr.prebuf > 0, PA_ERR_BADSTATE);
2393
/* Ask for a timing update before we cork/uncork to get the best
2394
* accuracy for the transport latency suitable for the
2395
* check_smoother_status() call in the started callback */
2396
request_auto_timing_update(s, true);
2398
if (!(o = stream_send_simple_command(s, PA_COMMAND_TRIGGER_PLAYBACK_STREAM, cb, userdata)))
2401
/* This might cause the read index to start moving again, hence
2402
* let's request a timing update */
2403
request_auto_timing_update(s, true);
2408
pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_success_cb_t cb, void *userdata) {
2412
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2415
PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2416
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2417
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2419
if (s->context->version >= 13) {
2420
pa_proplist *p = pa_proplist_new();
2422
pa_proplist_sets(p, PA_PROP_MEDIA_NAME, name);
2423
o = pa_stream_proplist_update(s, PA_UPDATE_REPLACE, p, cb, userdata);
2424
pa_proplist_free(p);
2429
o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2430
t = pa_tagstruct_command(
2432
(uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_NAME : PA_COMMAND_SET_PLAYBACK_STREAM_NAME),
2434
pa_tagstruct_putu32(t, s->channel);
2435
pa_tagstruct_puts(t, name);
2436
pa_pstream_send_tagstruct(s->context->pstream, t);
2437
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2443
int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) {
2447
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2449
PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2450
PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2451
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2452
PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2453
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2454
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2457
usec = pa_smoother_get(s->smoother, pa_rtclock_now());
2459
usec = calc_time(s, false);
2461
/* Make sure the time runs monotonically */
2462
if (!(s->flags & PA_STREAM_NOT_MONOTONIC)) {
2463
if (usec < s->previous_time)
2464
usec = s->previous_time;
2466
s->previous_time = usec;
2475
static pa_usec_t time_counter_diff(pa_stream *s, pa_usec_t a, pa_usec_t b, int *negative) {
2477
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2485
if (negative && s->direction == PA_STREAM_RECORD) {
2493
int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) {
2499
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2502
PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2503
PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2504
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2505
PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA);
2506
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_PLAYBACK || !s->timing_info.write_index_corrupt, PA_ERR_NODATA);
2507
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_RECORD || !s->timing_info.read_index_corrupt, PA_ERR_NODATA);
2509
if ((r = pa_stream_get_time(s, &t)) < 0)
2512
if (s->direction == PA_STREAM_PLAYBACK)
2513
cindex = s->timing_info.write_index;
2515
cindex = s->timing_info.read_index;
2520
c = pa_bytes_to_usec((uint64_t) cindex, &s->sample_spec);
2522
if (s->direction == PA_STREAM_PLAYBACK)
2523
*r_usec = time_counter_diff(s, c, t, negative);
2525
*r_usec = time_counter_diff(s, t, c, negative);
2530
const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) {
2532
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2534
PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2535
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2536
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2537
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
2539
return &s->timing_info;
2542
const pa_sample_spec* pa_stream_get_sample_spec(pa_stream *s) {
2544
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2546
PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2548
return &s->sample_spec;
2551
const pa_channel_map* pa_stream_get_channel_map(pa_stream *s) {
2553
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2555
PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2557
return &s->channel_map;
2560
const pa_format_info* pa_stream_get_format_info(pa_stream *s) {
2562
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2564
/* We don't have the format till routing is done */
2565
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2566
PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2570
const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) {
2572
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2574
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2575
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2576
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 9, PA_ERR_NOTSUPPORTED);
2578
return &s->buffer_attr;
2581
static void stream_set_buffer_attr_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2582
pa_operation *o = userdata;
2587
pa_assert(PA_REFCNT_VALUE(o) >= 1);
2592
if (command != PA_COMMAND_REPLY) {
2593
if (pa_context_handle_error(o->context, command, t, false) < 0)
2598
if (o->stream->direction == PA_STREAM_PLAYBACK) {
2599
if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2600
pa_tagstruct_getu32(t, &o->stream->buffer_attr.tlength) < 0 ||
2601
pa_tagstruct_getu32(t, &o->stream->buffer_attr.prebuf) < 0 ||
2602
pa_tagstruct_getu32(t, &o->stream->buffer_attr.minreq) < 0) {
2603
pa_context_fail(o->context, PA_ERR_PROTOCOL);
2606
} else if (o->stream->direction == PA_STREAM_RECORD) {
2607
if (pa_tagstruct_getu32(t, &o->stream->buffer_attr.maxlength) < 0 ||
2608
pa_tagstruct_getu32(t, &o->stream->buffer_attr.fragsize) < 0) {
2609
pa_context_fail(o->context, PA_ERR_PROTOCOL);
2614
if (o->stream->context->version >= 13) {
2617
if (pa_tagstruct_get_usec(t, &usec) < 0) {
2618
pa_context_fail(o->context, PA_ERR_PROTOCOL);
2622
if (o->stream->direction == PA_STREAM_RECORD)
2623
o->stream->timing_info.configured_source_usec = usec;
2625
o->stream->timing_info.configured_sink_usec = usec;
2628
if (!pa_tagstruct_eof(t)) {
2629
pa_context_fail(o->context, PA_ERR_PROTOCOL);
2635
pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2636
cb(o->stream, success, o->userdata);
2640
pa_operation_done(o);
2641
pa_operation_unref(o);
2644
pa_operation* pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) {
2648
pa_buffer_attr copy;
2651
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2654
PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2655
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2656
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2657
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2659
/* Ask for a timing update before we cork/uncork to get the best
2660
* accuracy for the transport latency suitable for the
2661
* check_smoother_status() call in the started callback */
2662
request_auto_timing_update(s, true);
2664
o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2666
t = pa_tagstruct_command(
2668
(uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_SET_RECORD_STREAM_BUFFER_ATTR : PA_COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR),
2670
pa_tagstruct_putu32(t, s->channel);
2673
patch_buffer_attr(s, ©, NULL);
2676
pa_tagstruct_putu32(t, attr->maxlength);
2678
if (s->direction == PA_STREAM_PLAYBACK)
2681
PA_TAG_U32, attr->tlength,
2682
PA_TAG_U32, attr->prebuf,
2683
PA_TAG_U32, attr->minreq,
2686
pa_tagstruct_putu32(t, attr->fragsize);
2688
if (s->context->version >= 13)
2689
pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_ADJUST_LATENCY));
2691
if (s->context->version >= 14)
2692
pa_tagstruct_put_boolean(t, !!(s->flags & PA_STREAM_EARLY_REQUESTS));
2694
pa_pstream_send_tagstruct(s->context->pstream, t);
2695
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_set_buffer_attr_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2697
/* This might cause changes in the read/write index, hence let's
2698
* request a timing update */
2699
request_auto_timing_update(s, true);
2704
uint32_t pa_stream_get_device_index(pa_stream *s) {
2706
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2708
PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2709
PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2710
PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2711
PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2712
PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->device_index != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2714
return s->device_index;
2717
const char *pa_stream_get_device_name(pa_stream *s) {
2719
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2721
PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2722
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2723
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2724
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2725
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->device_name, PA_ERR_BADSTATE);
2727
return s->device_name;
2730
int pa_stream_is_suspended(pa_stream *s) {
2732
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2734
PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2735
PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2736
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2737
PA_CHECK_VALIDITY(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2739
return s->suspended;
2742
int pa_stream_is_corked(pa_stream *s) {
2744
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2746
PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2747
PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2748
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2753
static void stream_update_sample_rate_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
2754
pa_operation *o = userdata;
2759
pa_assert(PA_REFCNT_VALUE(o) >= 1);
2764
if (command != PA_COMMAND_REPLY) {
2765
if (pa_context_handle_error(o->context, command, t, false) < 0)
2771
if (!pa_tagstruct_eof(t)) {
2772
pa_context_fail(o->context, PA_ERR_PROTOCOL);
2777
o->stream->sample_spec.rate = PA_PTR_TO_UINT(o->private);
2778
pa_assert(pa_sample_spec_valid(&o->stream->sample_spec));
2781
pa_stream_success_cb_t cb = (pa_stream_success_cb_t) o->callback;
2782
cb(o->stream, success, o->userdata);
2786
pa_operation_done(o);
2787
pa_operation_unref(o);
2790
pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) {
2796
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2798
PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2799
PA_CHECK_VALIDITY_RETURN_NULL(s->context, pa_sample_rate_valid(rate), PA_ERR_INVALID);
2800
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2801
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2802
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE);
2803
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 12, PA_ERR_NOTSUPPORTED);
2805
o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2806
o->private = PA_UINT_TO_PTR(rate);
2808
t = pa_tagstruct_command(
2810
(uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_SAMPLE_RATE : PA_COMMAND_UPDATE_PLAYBACK_STREAM_SAMPLE_RATE),
2812
pa_tagstruct_putu32(t, s->channel);
2813
pa_tagstruct_putu32(t, rate);
2815
pa_pstream_send_tagstruct(s->context->pstream, t);
2816
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_update_sample_rate_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2821
pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) {
2827
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2829
PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2830
PA_CHECK_VALIDITY_RETURN_NULL(s->context, mode == PA_UPDATE_SET || mode == PA_UPDATE_MERGE || mode == PA_UPDATE_REPLACE, PA_ERR_INVALID);
2831
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2832
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2833
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2835
o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2837
t = pa_tagstruct_command(
2839
(uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_UPDATE_RECORD_STREAM_PROPLIST : PA_COMMAND_UPDATE_PLAYBACK_STREAM_PROPLIST),
2841
pa_tagstruct_putu32(t, s->channel);
2842
pa_tagstruct_putu32(t, (uint32_t) mode);
2843
pa_tagstruct_put_proplist(t, p);
2845
pa_pstream_send_tagstruct(s->context->pstream, t);
2846
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2848
/* Please note that we don't update s->proplist here, because we
2849
* don't export that field */
2854
pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) {
2858
const char * const*k;
2861
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2863
PA_CHECK_VALIDITY_RETURN_NULL(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2864
PA_CHECK_VALIDITY_RETURN_NULL(s->context, keys && keys[0], PA_ERR_INVALID);
2865
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
2866
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
2867
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2869
o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata);
2871
t = pa_tagstruct_command(
2873
(uint32_t) (s->direction == PA_STREAM_RECORD ? PA_COMMAND_REMOVE_RECORD_STREAM_PROPLIST : PA_COMMAND_REMOVE_PLAYBACK_STREAM_PROPLIST),
2875
pa_tagstruct_putu32(t, s->channel);
2877
for (k = keys; *k; k++)
2878
pa_tagstruct_puts(t, *k);
2880
pa_tagstruct_puts(t, NULL);
2882
pa_pstream_send_tagstruct(s->context->pstream, t);
2883
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref);
2885
/* Please note that we don't update s->proplist here, because we
2886
* don't export that field */
2891
int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx) {
2893
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2895
PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED);
2896
PA_CHECK_VALIDITY(s->context, sink_input_idx != PA_INVALID_INDEX, PA_ERR_INVALID);
2897
PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
2898
PA_CHECK_VALIDITY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED);
2900
s->direct_on_input = sink_input_idx;
2905
uint32_t pa_stream_get_monitor_stream(pa_stream *s) {
2907
pa_assert(PA_REFCNT_VALUE(s) >= 1);
2909
PA_CHECK_VALIDITY_RETURN_ANY(s->context, !pa_detect_fork(), PA_ERR_FORKED, PA_INVALID_INDEX);
2910
PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direct_on_input != PA_INVALID_INDEX, PA_ERR_BADSTATE, PA_INVALID_INDEX);
2911
PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->context->version >= 13, PA_ERR_NOTSUPPORTED, PA_INVALID_INDEX);
2913
return s->direct_on_input;