97
97
* A typical live source will timestamp the buffers it creates with the
98
* current stream time of the pipeline. This is one reason why a live source
98
* current running time of the pipeline. This is one reason why a live source
99
99
* can only produce data in the PLAYING state, when the clock is actually
100
* distributed and running.
100
* distributed and running.
103
* Live sources that synchronize and block on the clock (and audio source, for
103
* Live sources that synchronize and block on the clock (an audio source, for
104
104
* example) can since 0.10.12 use gst_base_src_wait_playing() when the ::create
105
105
* function was interrupted by a state change to PAUSED.
108
108
* The #GstBaseSrc::get_times method can be used to implement pseudo-live
109
* sources. The base source will wait for the specified stream time returned in
110
* #GstBaseSrc::get_times before pushing out the buffer.
111
110
* It only makes sense to implement the ::get_times function if the source is
111
* a live source. The ::get_times function should return timestamps starting
112
* from 0, as if it were a non-live source. The base class will make sure that
113
* the timestamps are transformed into the current running_time.
114
* The base source will then wait for the calculated running_time before pushing
118
* For live sources, the base class will by default report a latency of 0.
119
* For pseudo live sources, the base class will by default measure the difference
120
* between the first buffer timestamp and the start time of get_times and will
121
* report this value as the latency.
122
* Subclasses should override the query function when this behaviour is not
115
126
* There is only support in #GstBaseSrc for exactly one source pad, which
568
589
gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
569
590
GstClockTime * min_latency, GstClockTime * max_latency)
594
GST_OBJECT_LOCK (src);
573
596
*live = src->is_live;
598
/* if we have a startup latency, report this one, else report 0. Subclasses
599
* are supposed to override the query function if they want something
601
if (src->priv->latency != -1)
602
min = src->priv->latency;
577
609
*max_latency = -1;
578
GST_LIVE_UNLOCK (src);
611
GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
612
", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
614
GST_OBJECT_UNLOCK (src);
620
* gst_base_src_set_do_timestamp:
622
* @timestamp: enable or disable timestamping
624
* Configure @src to automatically timestamp outgoing buffers based on the
625
* current running_time of the pipeline. This property is mostly useful for live
631
gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
633
GST_OBJECT_LOCK (src);
634
src->priv->do_timestamp = timestamp;
635
GST_OBJECT_UNLOCK (src);
639
* gst_base_src_get_do_timestamp:
642
* Query if @src timestamps outgoing buffers based on the current running_time.
644
* Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
649
gst_base_src_get_do_timestamp (GstBaseSrc * src)
653
GST_OBJECT_LOCK (src);
654
res = src->priv->do_timestamp;
655
GST_OBJECT_UNLOCK (src);
584
661
gst_base_src_setcaps (GstPad * pad, GstCaps * caps)
787
/* this is the duration as configured by the subclass. */
710
788
duration = src->segment.duration;
712
790
if (duration != -1) {
713
/* convert to requested format */
791
/* convert to requested format, if this fails, we have a duration
792
* but we cannot answer the query, we must return FALSE. */
715
794
gst_pad_query_convert (src->srcpad, src->segment.format,
716
795
duration, &format, &duration);
797
/* The subclass did not configure a duration, we assume that the
798
* media has an unknown duration then and we return TRUE to report
799
* this. Note that this is not the same as returning FALSE, which
800
* means that we cannot report the duration at all. */
720
803
gst_query_set_duration (query, format, duration);
1031
1122
gst_pad_pause_task (src->srcpad);
1033
/* unblock streaming thread */
1035
gst_base_src_unlock (src);
1124
/* unblock streaming thread. */
1125
gst_base_src_set_flushing (src, TRUE, FALSE, unlock, &playing);
1037
1127
/* grab streaming lock, this should eventually be possible, either
1038
* because the task is paused or our streaming thread stopped
1039
* because our peer is flushing. */
1128
* because the task is paused, our streaming thread stopped
1129
* or because our peer is flushing. */
1040
1130
GST_PAD_STREAM_LOCK (src->srcpad);
1043
gst_base_src_unlock_stop (src);
1132
gst_base_src_set_flushing (src, FALSE, playing, unlock, NULL);
1045
1134
/* If we configured the seeksegment above, don't overwrite it now. Otherwise
1046
1135
* copy the current segment info into the temp segment that we can actually
1233
1332
case GST_EVENT_NAVIGATION:
1333
/* could make sense for elements that do something with navigation events
1334
* but then they would need to override the send_event function */
1336
case GST_EVENT_LATENCY:
1337
/* does not seem to make sense currently */
1341
case GST_EVENT_CUSTOM_UPSTREAM:
1342
/* override send_event if you want this */
1344
case GST_EVENT_CUSTOM_DOWNSTREAM:
1345
case GST_EVENT_CUSTOM_BOTH:
1346
/* FIXME, insert event in the dataflow */
1348
case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1349
case GST_EVENT_CUSTOM_BOTH_OOB:
1350
/* insert a random custom event into the pipeline */
1351
GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1352
result = gst_pad_push_event (src->srcpad, event);
1353
/* we gave away the ref to the event in the push */
1239
gst_event_unref (event);
1360
/* if we still have a ref to the event, unref it now */
1362
gst_event_unref (event);
1406
1533
if (bclass->get_times)
1407
1534
bclass->get_times (basesrc, buffer, &start, &end);
1409
/* if we don't have a timestamp, we don't sync */
1536
/* get buffer timestamp */
1537
timestamp = GST_BUFFER_TIMESTAMP (buffer);
1539
/* grab the lock to prepare for clocking and calculate the startup
1541
GST_OBJECT_LOCK (basesrc);
1543
/* if we are asked to sync against the clock we are a pseudo live element */
1544
pseudo_live = (start != -1 && basesrc->is_live);
1545
/* check for the first buffer */
1546
first = (basesrc->priv->latency == -1);
1548
if (timestamp != -1 && pseudo_live) {
1549
GstClockTime latency;
1551
/* we have a timestamp and a sync time, latency is the diff */
1552
if (timestamp <= start)
1553
latency = start - timestamp;
1558
GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
1559
GST_TIME_ARGS (latency));
1560
/* first time we calculate latency, just configure */
1561
basesrc->priv->latency = latency;
1563
if (basesrc->priv->latency != latency) {
1564
/* we have a new latency, FIXME post latency message */
1565
basesrc->priv->latency = latency;
1566
GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
1567
GST_TIME_ARGS (latency));
1571
GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
1572
basesrc->is_live, start != -1);
1573
basesrc->priv->latency = 0;
1576
/* get clock, if no clock, we can't sync or do timestamps */
1577
if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
1580
base_time = GST_ELEMENT_CAST (basesrc)->base_time;
1582
do_timestamp = basesrc->priv->do_timestamp;
1584
/* first buffer, calculate the timestamp offset */
1586
GstClockTime running_time;
1588
now = gst_clock_get_time (clock);
1589
running_time = now - base_time;
1591
GST_LOG_OBJECT (basesrc,
1592
"startup timestamp: %" GST_TIME_FORMAT ", running_time %"
1593
GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
1594
GST_TIME_ARGS (running_time));
1596
if (pseudo_live && timestamp != -1) {
1597
/* live source and we need to sync, add startup latency to all timestamps
1598
* to get the real running_time. Live sources should always timestamp
1599
* according to the current running time. */
1600
basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
1602
GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
1603
GST_TIME_ARGS (basesrc->priv->ts_offset));
1605
basesrc->priv->ts_offset = 0;
1606
GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
1609
if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
1611
timestamp = running_time;
1615
GST_BUFFER_TIMESTAMP (buffer) = timestamp;
1617
GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1618
GST_TIME_ARGS (timestamp));
1621
/* add the timestamp offset we need for sync */
1622
timestamp += basesrc->priv->ts_offset;
1624
/* not the first buffer, the timestamp is the diff between the clock and
1626
if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (timestamp)) {
1627
now = gst_clock_get_time (clock);
1629
GST_BUFFER_TIMESTAMP (buffer) = now - base_time;
1631
GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1632
GST_TIME_ARGS (now - base_time));
1636
/* if we don't have a buffer timestamp, we don't sync */
1410
1637
if (!GST_CLOCK_TIME_IS_VALID (start))
1413
/* now do clocking */
1414
GST_OBJECT_LOCK (basesrc);
1415
base_time = GST_ELEMENT_CAST (basesrc)->base_time;
1640
if (basesrc->is_live && GST_CLOCK_TIME_IS_VALID (timestamp)) {
1641
/* for pseudo live sources, add our ts_offset to the timestamp */
1642
GST_BUFFER_TIMESTAMP (buffer) += basesrc->priv->ts_offset;
1643
start += basesrc->priv->ts_offset;
1417
1646
GST_LOG_OBJECT (basesrc,
1418
1647
"waiting for clock, base time %" GST_TIME_FORMAT
1419
1648
", stream_start %" GST_TIME_FORMAT,
1420
1649
GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
1422
result = gst_base_src_wait (basesrc, start + base_time);
1423
1650
GST_OBJECT_UNLOCK (basesrc);
1652
result = gst_base_src_wait (basesrc, clock, start + base_time);
1425
1654
GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
1429
1658
/* special cases */
1432
GST_DEBUG_OBJECT (basesrc, "get_times returned invalid start");
1661
GST_DEBUG_OBJECT (basesrc, "we have no clock");
1662
GST_OBJECT_UNLOCK (basesrc);
1663
return GST_CLOCK_OK;
1667
GST_DEBUG_OBJECT (basesrc, "no sync needed");
1668
GST_OBJECT_UNLOCK (basesrc);
1433
1669
return GST_CLOCK_OK;
1825
2108
GST_ELEMENT_ERROR (src, STREAM, FAILED,
1826
2109
(_("Internal data flow error.")), ("element returned NULL buffer"));
2110
GST_LIVE_UNLOCK (src);
1827
2111
/* we finished the segment on error */
1828
src->data.ABI.running = FALSE;
1829
gst_pad_pause_task (pad);
1830
gst_pad_push_event (pad, gst_event_new_eos ());
1831
src->priv->last_sent_eos = TRUE;
2112
ret = GST_FLOW_ERROR;
1836
/* this will always be called between start() and stop(). So you can rely on
1837
* resources allocated by start() and freed from stop(). This needs to be added
1838
* to the docs at some point. */
1840
gst_base_src_unlock (GstBaseSrc * basesrc)
1842
GstBaseSrcClass *bclass;
1843
gboolean result = TRUE;
1845
GST_DEBUG ("unlock");
1846
/* unblock whatever the subclass is doing */
1847
bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1849
result = bclass->unlock (basesrc);
1851
GST_DEBUG ("unschedule clock");
1852
/* and unblock the clock as well, if any */
1853
GST_OBJECT_LOCK (basesrc);
1854
if (basesrc->clock_id) {
1855
gst_clock_id_unschedule (basesrc->clock_id);
1857
GST_OBJECT_UNLOCK (basesrc);
1859
GST_DEBUG ("unlock done");
1864
/* this will always be called between start() and stop(). So you can rely on
1865
* resources allocated by start() and freed from stop(). This needs to be added
1866
* to the docs at some point. */
1868
gst_base_src_unlock_stop (GstBaseSrc * basesrc)
1870
GstBaseSrcClass *bclass;
1871
gboolean result = TRUE;
1873
GST_DEBUG_OBJECT (basesrc, "unlock stop");
1875
/* Finish a previous unblock request, allowing subclasses to flush command
1876
* queues or whatever they need to do */
1877
bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1878
if (bclass->unlock_stop)
1879
result = bclass->unlock_stop (basesrc);
1881
GST_DEBUG_OBJECT (basesrc, "unlock stop done");
1886
2117
/* default negotiation code.
1888
2119
* Take intersection between src and sink pads, take first
2091
gst_base_src_deactivate (GstBaseSrc * basesrc, GstPad * pad)
2095
GST_LIVE_LOCK (basesrc);
2096
basesrc->live_running = TRUE;
2097
GST_LIVE_SIGNAL (basesrc);
2098
GST_LIVE_UNLOCK (basesrc);
2100
/* step 1, unblock clock sync (if any) */
2101
result = gst_base_src_unlock (basesrc);
2103
/* step 2, make sure streaming finishes */
2104
result &= gst_pad_stop_task (pad);
2106
/* step 3, clear the unblock condition */
2107
result &= gst_base_src_unlock_stop (basesrc);
2321
/* start or stop flushing dataprocessing
2324
gst_base_src_set_flushing (GstBaseSrc * basesrc,
2325
gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing)
2327
GstBaseSrcClass *bclass;
2329
bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2331
if (flushing && unlock) {
2332
/* unlock any subclasses, we need to do this before grabbing the
2333
* LIVE_LOCK since we hold this lock before going into ::create. We pass an
2334
* unlock to the params because of backwards compat (see seek handler)*/
2336
bclass->unlock (basesrc);
2339
/* the live lock is released when we are blocked, waiting for playing or
2340
* when we sync to the clock. */
2341
GST_LIVE_LOCK (basesrc);
2343
*playing = basesrc->live_running;
2344
basesrc->priv->flushing = flushing;
2346
/* if we are locked in the live lock, signal it to make it flush */
2347
basesrc->live_running = TRUE;
2349
/* step 1, now that we have the LIVE lock, clear our unlock request */
2350
if (bclass->unlock_stop)
2351
bclass->unlock_stop (basesrc);
2353
/* step 2, unblock clock sync (if any) or any other blocking thing */
2354
if (basesrc->clock_id)
2355
gst_clock_id_unschedule (basesrc->clock_id);
2357
/* signal the live source that it can start playing */
2358
basesrc->live_running = live_play;
2360
GST_LIVE_SIGNAL (basesrc);
2361
GST_LIVE_UNLOCK (basesrc);
2366
/* the purpose of this function is to make sure that a live source blocks in the
2367
* LIVE lock or leaves the LIVE lock and continues playing. */
2369
gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
2371
GstBaseSrcClass *bclass;
2373
bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2375
/* unlock subclasses locked in ::create, we only do this when we stop playing. */
2377
GST_DEBUG_OBJECT (basesrc, "unlock");
2379
bclass->unlock (basesrc);
2382
/* we are now able to grab the LIVE lock, when we get it, we can be
2383
* waiting for PLAYING while blocked in the LIVE cond or we can be waiting
2385
GST_LIVE_LOCK (basesrc);
2387
/* unblock clock sync (if any) */
2388
if (basesrc->clock_id)
2389
gst_clock_id_unschedule (basesrc->clock_id);
2391
/* configure what to do when we get to the LIVE lock. */
2392
basesrc->live_running = live_play;
2397
/* clear our unlock request when going to PLAYING */
2398
GST_DEBUG_OBJECT (basesrc, "unlock stop");
2399
if (bclass->unlock_stop)
2400
bclass->unlock_stop (basesrc);
2402
/* for live sources we restart the timestamp correction */
2403
basesrc->priv->latency = -1;
2404
/* have to restart the task in case it stopped because of the unlock when
2405
* we went to PAUSED. Only do this if we operating in push mode. */
2406
GST_OBJECT_LOCK (basesrc->srcpad);
2407
start = (GST_PAD_ACTIVATE_MODE (basesrc->srcpad) == GST_ACTIVATE_PUSH);
2408
GST_OBJECT_UNLOCK (basesrc->srcpad);
2410
gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
2413
GST_LIVE_SIGNAL (basesrc);
2414
GST_LIVE_UNLOCK (basesrc);
2112
2419
static gboolean
2202
2506
/* if not random_access, we cannot operate in pull mode for now */
2203
2507
if (G_UNLIKELY (!gst_base_src_check_get_range (basesrc)))
2204
2508
goto no_get_range;
2510
/* stop flushing now but for live sources, still block in the LIVE lock when
2511
* we are not yet PLAYING */
2512
gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
2206
2514
GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
2207
/* call the unlock function. We have no task to stop. */
2208
if (G_UNLIKELY (!gst_base_src_deactivate (basesrc, pad)))
2209
goto deactivate_failed;
2515
/* flush all, there is no task to stop */
2516
gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2211
2518
/* don't send EOS when going from PAUSED => READY when in pull mode */
2212
2519
basesrc->priv->last_sent_eos = TRUE;