90
90
* distributed and running.
92
92
* Live sources that synchronize and block on the clock (an audio source, for
93
* example) can since 0.10.12 use gst_base_src_wait_playing() when the
93
* example) can use gst_base_src_wait_playing() when the
94
94
* #GstBaseSrcClass.create() function was interrupted by a state change to
138
138
* EOS message posted on the pipeline's bus to know when all data has
139
139
* been processed and the pipeline can safely be stopped.
141
* Since GStreamer 0.10.16 an application may send an EOS event to a source
142
* element to make it perform the EOS logic (send EOS event downstream or post a
141
* An application may send an EOS event to a source element to make it
142
* perform the EOS logic (send EOS event downstream or post a
143
143
* #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
144
144
* with the gst_element_send_event() function on the element or its parent bin.
593
589
* If not @dynamic, size is only updated when needed, such as when trying to
594
590
* read past current tracked size. Otherwise, size is checked for upon each
600
594
gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
841
823
gboolean ret = TRUE;
843
825
if (src->priv->stream_start_pending) {
844
ret = gst_pad_push_event (src->srcpad, gst_event_new_stream_start ());
829
gst_pad_create_stream_id (src->srcpad, GST_ELEMENT_CAST (src), NULL);
831
GST_DEBUG_OBJECT (src, "Pushing STREAM_START");
833
gst_pad_push_event (src->srcpad,
834
gst_event_new_stream_start (stream_id));
845
835
src->priv->stream_start_pending = FALSE;
866
857
bclass = GST_BASE_SRC_GET_CLASS (src);
868
859
gst_base_src_send_stream_start (src);
869
gst_pad_push_event (src->srcpad, gst_event_new_caps (caps));
871
861
if (bclass->set_caps)
872
862
res = bclass->set_caps (src, caps);
864
res = gst_pad_set_caps (src->srcpad, caps);
1282
1290
* seek format, adjust by the relative seek offset and then convert back to
1283
1291
* the processing format
1285
GstSeekType cur_type, stop_type;
1293
GstSeekType start_type, stop_type;
1287
1295
GstSeekFlags flags;
1288
1296
GstFormat seek_format, dest_format;
1291
1299
gboolean res = TRUE;
1293
1301
gst_event_parse_seek (event, &rate, &seek_format, &flags,
1294
&cur_type, &cur, &stop_type, &stop);
1302
&start_type, &start, &stop_type, &stop);
1295
1303
dest_format = segment->format;
1297
1305
if (seek_format == dest_format) {
1298
1306
gst_segment_do_seek (segment, rate, seek_format, flags,
1299
cur_type, cur, stop_type, stop, &update);
1307
start_type, start, stop_type, stop, &update);
1303
if (cur_type != GST_SEEK_TYPE_NONE) {
1304
/* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1311
if (start_type != GST_SEEK_TYPE_NONE) {
1312
/* FIXME: Handle seek_end by converting the input segment vals */
1306
gst_pad_query_convert (src->srcpad, seek_format, cur, dest_format,
1308
cur_type = GST_SEEK_TYPE_SET;
1314
gst_pad_query_convert (src->srcpad, seek_format, start, dest_format,
1316
start_type = GST_SEEK_TYPE_SET;
1311
1319
if (res && stop_type != GST_SEEK_TYPE_NONE) {
1312
/* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1320
/* FIXME: Handle seek_end by converting the input segment vals */
1314
1322
gst_pad_query_convert (src->srcpad, seek_format, stop, dest_format,
1319
1327
/* And finally, configure our output segment in the desired format */
1320
gst_segment_do_seek (segment, rate, dest_format, flags, cur_type, cur,
1328
gst_segment_do_seek (segment, rate, dest_format, flags, start_type, start,
1321
1329
stop_type, stop, &update);
1482
1490
* instead of EOS when doing a segment seek.
1484
1492
static gboolean
1485
gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event)
1493
gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1487
1495
gboolean res = TRUE, tres;
1489
1497
GstFormat seek_format, dest_format;
1490
1498
GstSeekFlags flags;
1491
GstSeekType cur_type, stop_type;
1499
GstSeekType start_type, stop_type;
1493
1501
gboolean flush, playing;
1494
1502
gboolean update;
1495
1503
gboolean relative_seek = FALSE;
1508
1516
gst_event_parse_seek (event, &rate, &seek_format, &flags,
1509
&cur_type, &cur, &stop_type, &stop);
1517
&start_type, &start, &stop_type, &stop);
1511
relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) ||
1519
relative_seek = SEEK_TYPE_IS_RELATIVE (start_type) ||
1512
1520
SEEK_TYPE_IS_RELATIVE (stop_type);
1514
1522
if (dest_format != seek_format && !relative_seek) {
1541
1549
gst_pad_pause_task (src->srcpad);
1543
1551
/* unblock streaming thread. */
1544
gst_base_src_set_flushing (src, TRUE, FALSE, &playing);
1553
gst_base_src_set_flushing (src, TRUE, FALSE, &playing);
1546
1555
/* grab streaming lock, this should eventually be possible, either
1547
1556
* because the task is paused, our streaming thread stopped
1556
1565
GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1559
gst_base_src_set_flushing (src, FALSE, playing, NULL);
1569
gst_base_src_set_flushing (src, FALSE, playing, NULL);
1561
1571
/* If we configured the seeksegment above, don't overwrite it now. Otherwise
1562
1572
* copy the current segment info into the temp segment that we can actually
1579
1589
/* The seek format matches our processing format, no need to ask the
1580
1590
* the subclass to configure the segment. */
1581
1591
gst_segment_do_seek (&seeksegment, rate, seek_format, flags,
1582
cur_type, cur, stop_type, stop, &update);
1592
start_type, start, stop_type, stop, &update);
1585
1595
/* Else, no seek event passed, so we're just (re)starting the
1619
1629
memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1620
1630
GST_OBJECT_UNLOCK (src);
1622
if (seeksegment.flags & GST_SEEK_FLAG_SEGMENT) {
1632
if (seeksegment.flags & GST_SEGMENT_FLAG_SEGMENT) {
1623
1633
GstMessage *message;
1625
1635
message = gst_message_new_segment_start (GST_OBJECT (src),
1674
1684
switch (GST_EVENT_TYPE (event)) {
1675
1685
/* bidirectional events */
1676
1686
case GST_EVENT_FLUSH_START:
1687
GST_DEBUG_OBJECT (src, "pushing flush-start event downstream");
1688
result = gst_pad_push_event (src->srcpad, event);
1677
1691
case GST_EVENT_FLUSH_STOP:
1692
GST_LIVE_LOCK (src->srcpad);
1693
src->priv->segment_pending = TRUE;
1678
1694
/* sending random flushes downstream can break stuff,
1679
1695
* especially sync since all segment info will get flushed */
1696
GST_DEBUG_OBJECT (src, "pushing flush-stop event downstream");
1697
result = gst_pad_push_event (src->srcpad, event);
1698
GST_LIVE_UNLOCK (src->srcpad);
1682
1702
/* downstream serialized events */
1761
1781
GST_DEBUG_OBJECT (src, "performing seek");
1762
1782
/* when we are running in push mode, we can execute the
1763
1783
* seek right now. */
1764
result = gst_base_src_perform_seek (src, event);
1784
result = gst_base_src_perform_seek (src, event, TRUE);
1766
1786
GstEvent **event_p;
1856
1876
if (!gst_base_src_seekable (src))
1857
1877
goto not_seekable;
1859
result = gst_base_src_perform_seek (src, event);
1879
result = gst_base_src_perform_seek (src, event, TRUE);
1861
1881
case GST_EVENT_FLUSH_START:
1862
1882
/* cancel any blocking getrange, is normally called
2012
2032
GstBaseSrcClass *bclass;
2013
2033
GstClockTime base_time;
2014
2034
GstClock *clock;
2015
GstClockTime now = GST_CLOCK_TIME_NONE, timestamp;
2035
GstClockTime now = GST_CLOCK_TIME_NONE, pts, dts, timestamp;
2016
2036
gboolean do_timestamp, first, pseudo_live, is_live;
2018
2038
bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2022
2042
bclass->get_times (basesrc, buffer, &start, &end);
2024
2044
/* get buffer timestamp */
2025
timestamp = GST_BUFFER_TIMESTAMP (buffer);
2045
dts = GST_BUFFER_DTS (buffer);
2046
pts = GST_BUFFER_PTS (buffer);
2048
if (GST_CLOCK_TIME_IS_VALID (dts))
2027
2053
/* grab the lock to prepare for clocking and calculate the startup
2081
2107
running_time = now - base_time;
2083
2109
GST_LOG_OBJECT (basesrc,
2084
"startup timestamp: %" GST_TIME_FORMAT ", running_time %"
2085
GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
2086
GST_TIME_ARGS (running_time));
2110
"startup PTS: %" GST_TIME_FORMAT ", DTS %" GST_TIME_FORMAT
2111
", running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (pts),
2112
GST_TIME_ARGS (dts), GST_TIME_ARGS (running_time));
2088
2114
if (pseudo_live && timestamp != -1) {
2089
2115
/* live source and we need to sync, add startup latency to all timestamps
2098
2124
GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
2101
if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
2103
timestamp = running_time;
2107
GST_BUFFER_TIMESTAMP (buffer) = timestamp;
2109
GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
2110
GST_TIME_ARGS (timestamp));
2127
if (!GST_CLOCK_TIME_IS_VALID (dts)) {
2133
GST_BUFFER_DTS (buffer) = dts;
2135
GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2136
GST_TIME_ARGS (dts));
2113
/* add the timestamp offset we need for sync */
2114
timestamp += basesrc->priv->ts_offset;
2116
2139
/* not the first buffer, the timestamp is the diff between the clock and
2118
if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (timestamp)) {
2141
if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (dts)) {
2119
2142
now = gst_clock_get_time (clock);
2121
GST_BUFFER_TIMESTAMP (buffer) = now - base_time;
2144
dts = now - base_time;
2145
GST_BUFFER_DTS (buffer) = dts;
2123
GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
2124
GST_TIME_ARGS (now - base_time));
2147
GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2148
GST_TIME_ARGS (dts));
2151
if (!GST_CLOCK_TIME_IS_VALID (pts)) {
2152
if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT))
2155
GST_BUFFER_PTS (buffer) = dts;
2157
GST_LOG_OBJECT (basesrc, "created PTS %" GST_TIME_FORMAT,
2158
GST_TIME_ARGS (pts));
2128
2161
/* if we don't have a buffer timestamp, we don't sync */
2129
2162
if (!GST_CLOCK_TIME_IS_VALID (start))
2132
if (is_live && GST_CLOCK_TIME_IS_VALID (timestamp)) {
2133
2166
/* for pseudo live sources, add our ts_offset to the timestamp */
2134
GST_BUFFER_TIMESTAMP (buffer) += basesrc->priv->ts_offset;
2167
if (GST_CLOCK_TIME_IS_VALID (pts))
2168
GST_BUFFER_PTS (buffer) += basesrc->priv->ts_offset;
2169
if (GST_CLOCK_TIME_IS_VALID (dts))
2170
GST_BUFFER_DTS (buffer) += basesrc->priv->ts_offset;
2135
2171
start += basesrc->priv->ts_offset;
2296
2333
"calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2297
2334
G_GINT64_FORMAT, offset, length, src->segment.time);
2336
res_buf = in_buf = *buf;
2301
2338
ret = bclass->create (src, offset, length, &res_buf);
2314
2351
if (G_UNLIKELY (ret != GST_FLOW_OK))
2354
/* fallback in case the create function didn't fill a provided buffer */
2355
if (in_buf != NULL && res_buf != in_buf) {
2359
GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, src, "create function didn't "
2360
"fill the provided buffer, copying");
2362
gst_buffer_map (in_buf, &info, GST_MAP_WRITE);
2363
copied_size = gst_buffer_extract (res_buf, 0, info.data, info.size);
2364
gst_buffer_unmap (in_buf, &info);
2365
gst_buffer_set_size (in_buf, copied_size);
2367
gst_buffer_copy_into (in_buf, res_buf, GST_BUFFER_COPY_METADATA, 0, -1);
2369
gst_buffer_unref (res_buf);
2317
2373
/* no timestamp set and we are at offset 0, we can timestamp with 0 */
2318
2374
if (offset == 0 && src->segment.time == 0
2319
&& GST_BUFFER_TIMESTAMP (res_buf) == -1 && !src->is_live) {
2375
&& GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) {
2320
2376
GST_DEBUG_OBJECT (src, "setting first timestamp to 0");
2321
2377
res_buf = gst_buffer_make_writable (res_buf);
2322
GST_BUFFER_TIMESTAMP (res_buf) = 0;
2378
GST_BUFFER_DTS (res_buf) = 0;
2325
2381
/* now sync before pushing the buffer */
2689
2745
format, position);
2690
2746
gst_message_set_seqnum (message, src->priv->seqnum);
2691
2747
gst_element_post_message (GST_ELEMENT_CAST (src), message);
2748
event = gst_event_new_segment_done (format, position);
2749
gst_event_set_seqnum (event, src->priv->seqnum);
2750
gst_pad_push_event (pad, event);
2693
2752
event = gst_event_new_eos ();
2694
2753
gst_event_set_seqnum (event, src->priv->seqnum);
2839
2898
gst_query_add_allocation_param (query, allocator, ¶ms);
2841
gst_allocator_unref (allocator);
2900
gst_object_unref (allocator);
2844
2903
gst_query_set_nth_allocation_pool (query, 0, pool, size, min, max);
3146
3205
basesrc->pending_seek = NULL;
3147
3206
GST_OBJECT_UNLOCK (basesrc);
3149
/* The perform seek code will start the task when finished. */
3150
if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event)))
3208
/* The perform seek code will start the task when finished. We don't have to
3209
* unlock the streaming thread because it is not running yet */
3210
if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
3151
3211
goto seek_failed;
3285
3344
bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3346
GST_DEBUG_OBJECT (basesrc, "flushing %d, live_play %d", flushing, live_play);
3287
3348
if (flushing) {
3288
3349
gst_base_src_activate_pool (basesrc, FALSE);
3289
3350
/* unlock any subclasses, we need to do this before grabbing the
3383
3444
GST_OBJECT_UNLOCK (basesrc->srcpad);
3385
3446
gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
3447
basesrc->srcpad, NULL);
3387
3448
GST_DEBUG_OBJECT (basesrc, "signal");
3388
3449
GST_LIVE_SIGNAL (basesrc);
3471
3532
GstPadMode mode, gboolean active)
3535
GstBaseSrc *src = GST_BASE_SRC (parent);
3537
src->priv->stream_start_pending = FALSE;
3475
3539
switch (mode) {
3476
3540
case GST_PAD_MODE_PULL:
3477
3541
res = gst_base_src_activate_pull (pad, parent, active);
3479
3543
case GST_PAD_MODE_PUSH:
3544
src->priv->stream_start_pending = active;
3480
3545
res = gst_base_src_activate_push (pad, parent, active);
3501
3566
case GST_STATE_CHANGE_NULL_TO_READY:
3503
3568
case GST_STATE_CHANGE_READY_TO_PAUSED:
3504
basesrc->priv->stream_start_pending = TRUE;
3505
3569
no_preroll = gst_base_src_is_live (basesrc);
3507
3571
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3535
3599
* already did this */
3536
3600
g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3537
3601
gst_event_replace (&basesrc->pending_seek, NULL);
3538
basesrc->priv->stream_start_pending = FALSE;
3541
3604
case GST_STATE_CHANGE_READY_TO_NULL: