~ubuntu-branches/ubuntu/trusty/gstreamer1.0/trusty-proposed

« back to all changes in this revision

Viewing changes to libs/gst/base/gstcollectpads.c

  • Committer: Package Import Robot
  • Author(s): Sebastian Dröge
  • Date: 2012-05-21 11:14:06 UTC
  • mfrom: (1.1.1)
  • Revision ID: package-import@ubuntu.com-20120521111406-1wfas0o9fdaxjyo8
Tags: 0.11.91-1
* New upstream release, "I will give you five magic beans!":
  + debian/libgstreamer.symbols:
    - Update symbols file.
* debian/libgstreamer-dev.install:
  + Don't ship useless .la file for the core plugins.
  + Don't ship .a and .la files for the library.
* debian/control.in:
  + Update debhelper dependency version and Standards-Version.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* GStreamer
 
2
 * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
 
3
 * Copyright (C) 2008 Mark Nauwelaerts <mnauw@users.sourceforge.net>
 
4
 * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
 
5
 *
 
6
 * gstcollectpads.c:
 
7
 *
 
8
 * This library is free software; you can redistribute it and/or
 
9
 * modify it under the terms of the GNU Library General Public
 
10
 * License as published by the Free Software Foundation; either
 
11
 * version 2 of the License, or (at your option) any later version.
 
12
 *
 
13
 * This library is distributed in the hope that it will be useful,
 
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
16
 * Library General Public License for more details.
 
17
 *
 
18
 * You should have received a copy of the GNU Library General Public
 
19
 * License along with this library; if not, write to the
 
20
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 
21
 * Boston, MA 02111-1307, USA.
 
22
 */
 
23
/**
 
24
 * SECTION:gstcollectpads
 
25
 * @short_description: manages a set of pads that operate in collect mode
 
26
 * @see_also:
 
27
 *
 
28
 * Manages a set of pads that operate in collect mode. This means that control
 
29
 * is given to the manager of this object when all pads have data.
 
30
 * <itemizedlist>
 
31
 *   <listitem><para>
 
32
 *     Collectpads are created with gst_collect_pads_new(). A callback should then
 
33
 *     be installed with gst_collect_pads_set_function ().
 
34
 *   </para></listitem>
 
35
 *   <listitem><para>
 
36
 *     Pads are added to the collection with gst_collect_pads_add_pad()/
 
37
 *     gst_collect_pads_remove_pad(). The pad
 
38
 *     has to be a sinkpad. The chain and event functions of the pad are
 
39
 *     overridden. The element_private of the pad is used to store
 
40
 *     private information for the collectpads.
 
41
 *   </para></listitem>
 
42
 *   <listitem><para>
 
43
 *     For each pad, data is queued in the _chain function or by
 
44
 *     performing a pull_range.
 
45
 *   </para></listitem>
 
46
 *   <listitem><para>
 
47
 *     When data is queued on all pads in waiting mode, the callback function is called.
 
48
 *   </para></listitem>
 
49
 *   <listitem><para>
 
50
 *     Data can be dequeued from the pad with the gst_collect_pads_pop() method.
 
51
 *     One can peek at the data with the gst_collect_pads_peek() function.
 
52
 *     These functions will return NULL if the pad received an EOS event. When all
 
53
 *     pads return NULL from a gst_collect_pads_peek(), the element can emit an EOS
 
54
 *     event itself.
 
55
 *   </para></listitem>
 
56
 *   <listitem><para>
 
57
 *     Data can also be dequeued in byte units using the gst_collect_pads_available(),
 
58
 *     gst_collect_pads_read() and gst_collect_pads_flush() calls.
 
59
 *   </para></listitem>
 
60
 *   <listitem><para>
 
61
 *     Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in
 
62
 *     their state change functions to start and stop the processing of the collectpads.
 
63
 *     The gst_collect_pads_stop() call should be called before calling the parent
 
64
 *     element state change function in the PAUSED_TO_READY state change to ensure
 
65
 *     no pad is blocked and the element can finish streaming.
 
66
 *   </para></listitem>
 
67
 *   <listitem><para>
 
68
 *     gst_collect_pads_collect() and gst_collect_pads_collect_range() can be used by
 
69
 *     elements that start a #GstTask to drive the collect_pads. This feature is however
 
70
 *     not yet implemented.
 
71
 *   </para></listitem>
 
72
 *   <listitem><para>
 
73
 *     gst_collect_pads_set_waiting() sets a pad to waiting or non-waiting mode.
 
74
 *     CollectPads element is not waiting for data to be collected on non-waiting pads.
 
75
 *     Thus these pads may but need not have data when the callback is called.
 
76
 *     All pads are in waiting mode by default.
 
77
 *   </para></listitem>
 
78
 * </itemizedlist>
 
79
 *
 
80
 * Last reviewed on 2011-10-28 (0.10.36)
 
81
 *
 
82
 * Since: 0.10.36
 
83
 */
 
84
 
 
85
#ifdef HAVE_CONFIG_H
 
86
#  include "config.h"
 
87
#endif
 
88
 
 
89
#include <gst/gst_private.h>
 
90
 
 
91
#include "gstcollectpads.h"
 
92
 
 
93
#include "../../../gst/glib-compat-private.h"
 
94
 
 
95
GST_DEBUG_CATEGORY_STATIC (collect_pads_debug);
 
96
#define GST_CAT_DEFAULT collect_pads_debug
 
97
 
 
98
#define parent_class gst_collect_pads_parent_class
 
99
G_DEFINE_TYPE (GstCollectPads, gst_collect_pads, GST_TYPE_OBJECT);
 
100
 
 
101
struct _GstCollectDataPrivate
 
102
{
 
103
  /* refcounting for struct, and destroy callback */
 
104
  GstCollectDataDestroyNotify destroy_notify;
 
105
  gint refcount;
 
106
};
 
107
 
 
108
struct _GstCollectPadsPrivate
 
109
{
 
110
  /* with LOCK and/or STREAM_LOCK */
 
111
  gboolean started;
 
112
 
 
113
  /* with STREAM_LOCK */
 
114
  guint32 cookie;               /* @data list cookie */
 
115
  guint numpads;                /* number of pads in @data */
 
116
  guint queuedpads;             /* number of pads with a buffer */
 
117
  guint eospads;                /* number of pads that are EOS */
 
118
  GstClockTime earliest_time;   /* Current earliest time */
 
119
  GstCollectData *earliest_data;        /* Pad data for current earliest time */
 
120
 
 
121
  /* with LOCK */
 
122
  GSList *pad_list;             /* updated pad list */
 
123
  guint32 pad_cookie;           /* updated cookie */
 
124
 
 
125
  GstCollectPadsFunction func;  /* function and user_data for callback */
 
126
  gpointer user_data;
 
127
  GstCollectPadsBufferFunction buffer_func;     /* function and user_data for buffer callback */
 
128
  gpointer buffer_user_data;
 
129
  GstCollectPadsCompareFunction compare_func;
 
130
  gpointer compare_user_data;
 
131
  GstCollectPadsEventFunction event_func;       /* function and data for event callback */
 
132
  gpointer event_user_data;
 
133
  GstCollectPadsQueryFunction query_func;
 
134
  gpointer query_user_data;
 
135
  GstCollectPadsClipFunction clip_func;
 
136
  gpointer clip_user_data;
 
137
 
 
138
  /* no other lock needed */
 
139
  GMutex evt_lock;              /* these make up sort of poor man's event signaling */
 
140
  GCond evt_cond;
 
141
  guint32 evt_cookie;
 
142
};
 
143
 
 
144
static void gst_collect_pads_clear (GstCollectPads * pads,
 
145
    GstCollectData * data);
 
146
static GstFlowReturn gst_collect_pads_chain (GstPad * pad, GstObject * parent,
 
147
    GstBuffer * buffer);
 
148
static gboolean gst_collect_pads_event (GstPad * pad, GstObject * parent,
 
149
    GstEvent * event);
 
150
static gboolean gst_collect_pads_query (GstPad * pad, GstObject * parent,
 
151
    GstQuery * query);
 
152
static void gst_collect_pads_finalize (GObject * object);
 
153
static GstFlowReturn gst_collect_pads_default_collected (GstCollectPads *
 
154
    pads, gpointer user_data);
 
155
static gint gst_collect_pads_default_compare_func (GstCollectPads * pads,
 
156
    GstCollectData * data1, GstClockTime timestamp1, GstCollectData * data2,
 
157
    GstClockTime timestamp2, gpointer user_data);
 
158
static gboolean gst_collect_pads_recalculate_full (GstCollectPads * pads);
 
159
static void ref_data (GstCollectData * data);
 
160
static void unref_data (GstCollectData * data);
 
161
 
 
162
static gboolean gst_collect_pads_event_default_internal (GstCollectPads *
 
163
    pads, GstCollectData * data, GstEvent * event, gpointer user_data);
 
164
static gboolean gst_collect_pads_query_default_internal (GstCollectPads *
 
165
    pads, GstCollectData * data, GstQuery * query, gpointer user_data);
 
166
 
 
167
 
 
168
/* Some properties are protected by LOCK, others by STREAM_LOCK
 
169
 * However, manipulating either of these partitions may require
 
170
 * to signal/wake a _WAIT, so use a separate (sort of) event to prevent races
 
171
 * Alternative implementations are possible, e.g. some low-level re-implementing
 
172
 * of the 2 above locks to drop both of them atomically when going into _WAIT.
 
173
 */
 
174
#define GST_COLLECT_PADS_GET_EVT_COND(pads) (&((GstCollectPads *)pads)->priv->evt_cond)
 
175
#define GST_COLLECT_PADS_GET_EVT_LOCK(pads) (&((GstCollectPads *)pads)->priv->evt_lock)
 
176
#define GST_COLLECT_PADS_EVT_WAIT(pads, cookie) G_STMT_START {    \
 
177
  g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
 
178
  /* should work unless a lot of event'ing and thread starvation */\
 
179
  while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie)         \
 
180
    g_cond_wait (GST_COLLECT_PADS_GET_EVT_COND (pads),            \
 
181
        GST_COLLECT_PADS_GET_EVT_LOCK (pads));                    \
 
182
  cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
 
183
  g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
 
184
} G_STMT_END
 
185
#define GST_COLLECT_PADS_EVT_WAIT_TIMED(pads, cookie, timeout) G_STMT_START { \
 
186
  GTimeVal __tv; \
 
187
  \
 
188
  g_get_current_time (&tv); \
 
189
  g_time_val_add (&tv, timeout); \
 
190
  \
 
191
  g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
 
192
  /* should work unless a lot of event'ing and thread starvation */\
 
193
  while (cookie == ((GstCollectPads *) pads)->priv->evt_cookie)         \
 
194
    g_cond_timed_wait (GST_COLLECT_PADS_GET_EVT_COND (pads),            \
 
195
        GST_COLLECT_PADS_GET_EVT_LOCK (pads), &tv);                    \
 
196
  cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
 
197
  g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
 
198
} G_STMT_END
 
199
#define GST_COLLECT_PADS_EVT_BROADCAST(pads) G_STMT_START {       \
 
200
  g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
 
201
  /* never mind wrap-around */                                     \
 
202
  ++(((GstCollectPads *) pads)->priv->evt_cookie);                      \
 
203
  g_cond_broadcast (GST_COLLECT_PADS_GET_EVT_COND (pads));        \
 
204
  g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
 
205
} G_STMT_END
 
206
#define GST_COLLECT_PADS_EVT_INIT(cookie) G_STMT_START {          \
 
207
  g_mutex_lock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));            \
 
208
  cookie = ((GstCollectPads *) pads)->priv->evt_cookie;                 \
 
209
  g_mutex_unlock (GST_COLLECT_PADS_GET_EVT_LOCK (pads));          \
 
210
} G_STMT_END
 
211
 
 
212
static void
 
213
gst_collect_pads_class_init (GstCollectPadsClass * klass)
 
214
{
 
215
  GObjectClass *gobject_class = (GObjectClass *) klass;
 
216
 
 
217
  g_type_class_add_private (klass, sizeof (GstCollectPadsPrivate));
 
218
 
 
219
  GST_DEBUG_CATEGORY_INIT (collect_pads_debug, "collectpads", 0,
 
220
      "GstCollectPads");
 
221
 
 
222
  gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_collect_pads_finalize);
 
223
}
 
224
 
 
225
static void
 
226
gst_collect_pads_init (GstCollectPads * pads)
 
227
{
 
228
  pads->priv =
 
229
      G_TYPE_INSTANCE_GET_PRIVATE (pads, GST_TYPE_COLLECT_PADS,
 
230
      GstCollectPadsPrivate);
 
231
 
 
232
  pads->data = NULL;
 
233
  pads->priv->cookie = 0;
 
234
  pads->priv->numpads = 0;
 
235
  pads->priv->queuedpads = 0;
 
236
  pads->priv->eospads = 0;
 
237
  pads->priv->started = FALSE;
 
238
 
 
239
  g_rec_mutex_init (&pads->stream_lock);
 
240
 
 
241
  pads->priv->func = gst_collect_pads_default_collected;
 
242
  pads->priv->user_data = NULL;
 
243
  pads->priv->event_func = NULL;
 
244
  pads->priv->event_user_data = NULL;
 
245
 
 
246
  /* members for default muxing */
 
247
  pads->priv->buffer_func = NULL;
 
248
  pads->priv->buffer_user_data = NULL;
 
249
  pads->priv->compare_func = gst_collect_pads_default_compare_func;
 
250
  pads->priv->compare_user_data = NULL;
 
251
  pads->priv->earliest_data = NULL;
 
252
  pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
 
253
 
 
254
  pads->priv->event_func = gst_collect_pads_event_default_internal;
 
255
  pads->priv->query_func = gst_collect_pads_query_default_internal;
 
256
 
 
257
  /* members to manage the pad list */
 
258
  pads->priv->pad_cookie = 0;
 
259
  pads->priv->pad_list = NULL;
 
260
 
 
261
  /* members for event */
 
262
  g_mutex_init (&pads->priv->evt_lock);
 
263
  g_cond_init (&pads->priv->evt_cond);
 
264
  pads->priv->evt_cookie = 0;
 
265
}
 
266
 
 
267
static void
 
268
gst_collect_pads_finalize (GObject * object)
 
269
{
 
270
  GstCollectPads *pads = GST_COLLECT_PADS (object);
 
271
 
 
272
  GST_DEBUG_OBJECT (object, "finalize");
 
273
 
 
274
  g_rec_mutex_clear (&pads->stream_lock);
 
275
 
 
276
  g_cond_clear (&pads->priv->evt_cond);
 
277
  g_mutex_clear (&pads->priv->evt_lock);
 
278
 
 
279
  /* Remove pads and free pads list */
 
280
  g_slist_foreach (pads->priv->pad_list, (GFunc) unref_data, NULL);
 
281
  g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
 
282
  g_slist_free (pads->data);
 
283
  g_slist_free (pads->priv->pad_list);
 
284
 
 
285
  G_OBJECT_CLASS (parent_class)->finalize (object);
 
286
}
 
287
 
 
288
/**
 
289
 * gst_collect_pads_new:
 
290
 *
 
291
 * Create a new instance of #GstCollectsPads.
 
292
 *
 
293
 * MT safe.
 
294
 *
 
295
 * Returns: (transfer full): a new #GstCollectPads, or NULL in case of an error.
 
296
 *
 
297
 * Since: 0.10.36
 
298
 */
 
299
GstCollectPads *
 
300
gst_collect_pads_new (void)
 
301
{
 
302
  GstCollectPads *newcoll;
 
303
 
 
304
  newcoll = g_object_new (GST_TYPE_COLLECT_PADS, NULL);
 
305
 
 
306
  return newcoll;
 
307
}
 
308
 
 
309
/* Must be called with GstObject lock! */
 
310
static void
 
311
gst_collect_pads_set_buffer_function_locked (GstCollectPads * pads,
 
312
    GstCollectPadsBufferFunction func, gpointer user_data)
 
313
{
 
314
  pads->priv->buffer_func = func;
 
315
  pads->priv->buffer_user_data = user_data;
 
316
}
 
317
 
 
318
/**
 
319
 * gst_collect_pads_set_buffer_function:
 
320
 * @pads: the collectpads to use
 
321
 * @func: the function to set
 
322
 * @user_data: (closure): user data passed to the function
 
323
 *
 
324
 * Set the callback function and user data that will be called with
 
325
 * the oldest buffer when all pads have been collected.
 
326
 *
 
327
 * MT safe.
 
328
 *
 
329
 * Since: 0.10.36
 
330
 */
 
331
void
 
332
gst_collect_pads_set_buffer_function (GstCollectPads * pads,
 
333
    GstCollectPadsBufferFunction func, gpointer user_data)
 
334
{
 
335
  g_return_if_fail (pads != NULL);
 
336
  g_return_if_fail (GST_IS_COLLECT_PADS (pads));
 
337
 
 
338
  GST_OBJECT_LOCK (pads);
 
339
  gst_collect_pads_set_buffer_function_locked (pads, func, user_data);
 
340
  GST_OBJECT_UNLOCK (pads);
 
341
}
 
342
 
 
343
/**
 
344
 * gst_collect_pads_set_compare_function:
 
345
 * @pads: the pads to use
 
346
 * @func: the function to set
 
347
 * @user_data: (closure): user data passed to the function
 
348
 *
 
349
 * Set the timestamp comparison function.
 
350
 *
 
351
 * MT safe.
 
352
 *
 
353
 * Since: 0.10.36
 
354
 */
 
355
/* NOTE allowing to change comparison seems not advisable;
 
356
no known use-case, and collaboration with default algorithm is unpredictable.
 
357
If custom compairing/operation is needed, just use a collect function of
 
358
your own */
 
359
void
 
360
gst_collect_pads_set_compare_function (GstCollectPads * pads,
 
361
    GstCollectPadsCompareFunction func, gpointer user_data)
 
362
{
 
363
  g_return_if_fail (pads != NULL);
 
364
  g_return_if_fail (GST_IS_COLLECT_PADS (pads));
 
365
 
 
366
  GST_OBJECT_LOCK (pads);
 
367
  pads->priv->compare_func = func;
 
368
  pads->priv->compare_user_data = user_data;
 
369
  GST_OBJECT_UNLOCK (pads);
 
370
}
 
371
 
 
372
/**
 
373
 * gst_collect_pads_set_function:
 
374
 * @pads: the collectspads to use
 
375
 * @func: the function to set
 
376
 * @user_data: user data passed to the function
 
377
 *
 
378
 * CollectPads provides a default collection algorithm that will determine
 
379
 * the oldest buffer available on all of its pads, and then delegate
 
380
 * to a configured callback.
 
381
 * However, if circumstances are more complicated and/or more control
 
382
 * is desired, this sets a callback that will be invoked instead when
 
383
 * all the pads added to the collection have buffers queued.
 
384
 * Evidently, this callback is not compatible with
 
385
 * gst_collect_pads_set_buffer_function() callback.
 
386
 * If this callback is set, the former will be unset.
 
387
 *
 
388
 * MT safe.
 
389
 *
 
390
 * Since: 0.10.36
 
391
 */
 
392
void
 
393
gst_collect_pads_set_function (GstCollectPads * pads,
 
394
    GstCollectPadsFunction func, gpointer user_data)
 
395
{
 
396
  g_return_if_fail (pads != NULL);
 
397
  g_return_if_fail (GST_IS_COLLECT_PADS (pads));
 
398
 
 
399
  GST_OBJECT_LOCK (pads);
 
400
  pads->priv->func = func;
 
401
  pads->priv->user_data = user_data;
 
402
  gst_collect_pads_set_buffer_function_locked (pads, NULL, NULL);
 
403
  GST_OBJECT_UNLOCK (pads);
 
404
}
 
405
 
 
406
static void
 
407
ref_data (GstCollectData * data)
 
408
{
 
409
  g_assert (data != NULL);
 
410
 
 
411
  g_atomic_int_inc (&(data->priv->refcount));
 
412
}
 
413
 
 
414
static void
 
415
unref_data (GstCollectData * data)
 
416
{
 
417
  g_assert (data != NULL);
 
418
  g_assert (data->priv->refcount > 0);
 
419
 
 
420
  if (!g_atomic_int_dec_and_test (&(data->priv->refcount)))
 
421
    return;
 
422
 
 
423
  if (data->priv->destroy_notify)
 
424
    data->priv->destroy_notify (data);
 
425
 
 
426
  g_object_unref (data->pad);
 
427
  if (data->buffer) {
 
428
    gst_buffer_unref (data->buffer);
 
429
  }
 
430
  g_free (data->priv);
 
431
  g_free (data);
 
432
}
 
433
 
 
434
/**
 
435
 * gst_collect_pads_set_event_function:
 
436
 * @pads: the collectspads to use
 
437
 * @func: the function to set
 
438
 * @user_data: user data passed to the function
 
439
 *
 
440
 * Set the event callback function and user data that will be called when
 
441
 * collectpads has received an event originating from one of the collected
 
442
 * pads.  If the event being processed is a serialized one, this callback is
 
443
 * called with @pads STREAM_LOCK held, otherwise not.  As this lock should be
 
444
 * held when calling a number of CollectPads functions, it should be acquired
 
445
 * if so (unusually) needed.
 
446
 *
 
447
 * MT safe.
 
448
 *
 
449
 * Since: 0.10.36
 
450
 */
 
451
void
 
452
gst_collect_pads_set_event_function (GstCollectPads * pads,
 
453
    GstCollectPadsEventFunction func, gpointer user_data)
 
454
{
 
455
  g_return_if_fail (pads != NULL);
 
456
  g_return_if_fail (GST_IS_COLLECT_PADS (pads));
 
457
 
 
458
  GST_OBJECT_LOCK (pads);
 
459
  pads->priv->event_func = func;
 
460
  pads->priv->event_user_data = user_data;
 
461
  GST_OBJECT_UNLOCK (pads);
 
462
}
 
463
 
 
464
/**
 
465
 * gst_collect_pads_set_query_function:
 
466
 * @pads: the collectspads to use
 
467
 * @func: the function to set
 
468
 * @user_data: user data passed to the function
 
469
 *
 
470
 * Set the query callback function and user data that will be called after
 
471
 * collectpads has received a query originating from one of the collected
 
472
 * pads.  If the query being processed is a serialized one, this callback is
 
473
 * called with @pads STREAM_LOCK held, otherwise not.  As this lock should be
 
474
 * held when calling a number of CollectPads functions, it should be acquired
 
475
 * if so (unusually) needed.
 
476
 *
 
477
 * MT safe.
 
478
 *
 
479
 * Since: 0.10.36
 
480
 */
 
481
void
 
482
gst_collect_pads_set_query_function (GstCollectPads * pads,
 
483
    GstCollectPadsQueryFunction func, gpointer user_data)
 
484
{
 
485
  g_return_if_fail (pads != NULL);
 
486
  g_return_if_fail (GST_IS_COLLECT_PADS (pads));
 
487
 
 
488
  GST_OBJECT_LOCK (pads);
 
489
  pads->priv->query_func = func;
 
490
  pads->priv->query_user_data = user_data;
 
491
  GST_OBJECT_UNLOCK (pads);
 
492
}
 
493
 
 
494
/**
 
495
* gst_collect_pads_clip_running_time:
 
496
* @pads: the collectspads to use
 
497
* @cdata: collect data of corresponding pad
 
498
* @buf: buffer being clipped
 
499
* @outbuf: output buffer with running time, or NULL if clipped
 
500
* @user_data: user data (unused)
 
501
*
 
502
* Convenience clipping function that converts incoming buffer's timestamp
 
503
* to running time, or clips the buffer if outside configured segment.
 
504
*
 
505
* Since: 0.10.37
 
506
*/
 
507
GstFlowReturn
 
508
gst_collect_pads_clip_running_time (GstCollectPads * pads,
 
509
    GstCollectData * cdata, GstBuffer * buf, GstBuffer ** outbuf,
 
510
    gpointer user_data)
 
511
{
 
512
  GstClockTime time;
 
513
 
 
514
  *outbuf = buf;
 
515
  time = GST_BUFFER_TIMESTAMP (buf);
 
516
 
 
517
  /* invalid left alone and passed */
 
518
  if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
 
519
    time = gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time);
 
520
    if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
 
521
      GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment");
 
522
      gst_buffer_unref (buf);
 
523
      *outbuf = NULL;
 
524
    } else {
 
525
      GST_LOG_OBJECT (cdata->pad, "buffer ts %" GST_TIME_FORMAT " -> %"
 
526
          GST_TIME_FORMAT " running time",
 
527
          GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)), GST_TIME_ARGS (time));
 
528
      *outbuf = gst_buffer_make_writable (buf);
 
529
      GST_BUFFER_TIMESTAMP (*outbuf) = time;
 
530
    }
 
531
  }
 
532
 
 
533
  return GST_FLOW_OK;
 
534
}
 
535
 
 
536
 /**
 
537
 * gst_collect_pads_set_clip_function:
 
538
 * @pads: the collectspads to use
 
539
 * @clipfunc: clip function to install
 
540
 * @user_data: user data to pass to @clip_func
 
541
 *
 
542
 * Install a clipping function that is called right after a buffer is received
 
543
 * on a pad managed by @pads. See #GstCollectPad2ClipFunction for more info.
 
544
 *
 
545
 * Since: 0.10.36
 
546
 */
 
547
void
 
548
gst_collect_pads_set_clip_function (GstCollectPads * pads,
 
549
    GstCollectPadsClipFunction clipfunc, gpointer user_data)
 
550
{
 
551
  g_return_if_fail (pads != NULL);
 
552
  g_return_if_fail (GST_IS_COLLECT_PADS (pads));
 
553
 
 
554
  pads->priv->clip_func = clipfunc;
 
555
  pads->priv->clip_user_data = user_data;
 
556
}
 
557
 
 
558
/**
 
559
 * gst_collect_pads_add_pad:
 
560
 * @pads: the collectspads to use
 
561
 * @pad: (transfer none): the pad to add
 
562
 * @size: the size of the returned #GstCollectData structure
 
563
 *
 
564
 * Add a pad to the collection of collect pads. The pad has to be
 
565
 * a sinkpad. The refcount of the pad is incremented. Use
 
566
 * gst_collect_pads_remove_pad() to remove the pad from the collection
 
567
 * again.
 
568
 *
 
569
 * You specify a size for the returned #GstCollectData structure
 
570
 * so that you can use it to store additional information.
 
571
 *
 
572
 * The pad will be automatically activated in push mode when @pads is
 
573
 * started.
 
574
 *
 
575
 * This function calls gst_collect_pads_add_pad_full() passing a value of NULL
 
576
 * for destroy_notify and TRUE for locked.
 
577
 *
 
578
 * MT safe.
 
579
 *
 
580
 * Returns: a new #GstCollectData to identify the new pad. Or NULL
 
581
 *   if wrong parameters are supplied.
 
582
 *
 
583
 * Since: 0.10.36
 
584
 */
 
585
GstCollectData *
 
586
gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size)
 
587
{
 
588
  return gst_collect_pads_add_pad_full (pads, pad, size, NULL, TRUE);
 
589
}
 
590
 
 
591
/**
 
592
 * gst_collect_pads_add_pad_full:
 
593
 * @pads: the collectspads to use
 
594
 * @pad: (transfer none): the pad to add
 
595
 * @size: the size of the returned #GstCollectData structure
 
596
 * @destroy_notify: function to be called before the returned #GstCollectData
 
597
 * structure is freed
 
598
 * @lock: whether to lock this pad in usual waiting state
 
599
 *
 
600
 * Add a pad to the collection of collect pads. The pad has to be
 
601
 * a sinkpad. The refcount of the pad is incremented. Use
 
602
 * gst_collect_pads_remove_pad() to remove the pad from the collection
 
603
 * again.
 
604
 *
 
605
 * You specify a size for the returned #GstCollectData structure
 
606
 * so that you can use it to store additional information.
 
607
 *
 
608
 * You can also specify a #GstCollectDataDestroyNotify that will be called
 
609
 * just before the #GstCollectData structure is freed. It is passed the
 
610
 * pointer to the structure and should free any custom memory and resources
 
611
 * allocated for it.
 
612
 *
 
613
 * Keeping a pad locked in waiting state is only relevant when using
 
614
 * the default collection algorithm (providing the oldest buffer).
 
615
 * It ensures a buffer must be available on this pad for a collection
 
616
 * to take place.  This is of typical use to a muxer element where
 
617
 * non-subtitle streams should always be in waiting state,
 
618
 * e.g. to assure that caps information is available on all these streams
 
619
 * when initial headers have to be written.
 
620
 *
 
621
 * The pad will be automatically activated in push mode when @pads is
 
622
 * started.
 
623
 *
 
624
 * MT safe.
 
625
 *
 
626
 * Since: 0.10.36
 
627
 *
 
628
 * Returns: a new #GstCollectData to identify the new pad. Or NULL
 
629
 *   if wrong parameters are supplied.
 
630
 */
 
631
GstCollectData *
 
632
gst_collect_pads_add_pad_full (GstCollectPads * pads, GstPad * pad,
 
633
    guint size, GstCollectDataDestroyNotify destroy_notify, gboolean lock)
 
634
{
 
635
  GstCollectData *data;
 
636
 
 
637
  g_return_val_if_fail (pads != NULL, NULL);
 
638
  g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
 
639
  g_return_val_if_fail (pad != NULL, NULL);
 
640
  g_return_val_if_fail (GST_PAD_IS_SINK (pad), NULL);
 
641
  g_return_val_if_fail (size >= sizeof (GstCollectData), NULL);
 
642
 
 
643
  GST_DEBUG_OBJECT (pads, "adding pad %s:%s", GST_DEBUG_PAD_NAME (pad));
 
644
 
 
645
  data = g_malloc0 (size);
 
646
  data->priv = g_new0 (GstCollectDataPrivate, 1);
 
647
  data->collect = pads;
 
648
  data->pad = gst_object_ref (pad);
 
649
  data->buffer = NULL;
 
650
  data->pos = 0;
 
651
  gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
 
652
  data->state = GST_COLLECT_PADS_STATE_WAITING;
 
653
  data->state |= lock ? GST_COLLECT_PADS_STATE_LOCKED : 0;
 
654
  data->priv->refcount = 1;
 
655
  data->priv->destroy_notify = destroy_notify;
 
656
 
 
657
  GST_OBJECT_LOCK (pads);
 
658
  GST_OBJECT_LOCK (pad);
 
659
  gst_pad_set_element_private (pad, data);
 
660
  GST_OBJECT_UNLOCK (pad);
 
661
  pads->priv->pad_list = g_slist_append (pads->priv->pad_list, data);
 
662
  gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_chain));
 
663
  gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_event));
 
664
  gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_query));
 
665
  /* backward compat, also add to data if stopped, so that the element already
 
666
   * has this in the public data list before going PAUSED (typically)
 
667
   * this can only be done when we are stopped because we don't take the
 
668
   * STREAM_LOCK to protect the pads->data list. */
 
669
  if (!pads->priv->started) {
 
670
    pads->data = g_slist_append (pads->data, data);
 
671
    ref_data (data);
 
672
  }
 
673
  /* activate the pad when needed */
 
674
  if (pads->priv->started)
 
675
    gst_pad_set_active (pad, TRUE);
 
676
  pads->priv->pad_cookie++;
 
677
  GST_OBJECT_UNLOCK (pads);
 
678
 
 
679
  return data;
 
680
}
 
681
 
 
682
static gint
 
683
find_pad (GstCollectData * data, GstPad * pad)
 
684
{
 
685
  if (data->pad == pad)
 
686
    return 0;
 
687
  return 1;
 
688
}
 
689
 
 
690
/**
 
691
 * gst_collect_pads_remove_pad:
 
692
 * @pads: the collectspads to use
 
693
 * @pad: (transfer none): the pad to remove
 
694
 *
 
695
 * Remove a pad from the collection of collect pads. This function will also
 
696
 * free the #GstCollectData and all the resources that were allocated with
 
697
 * gst_collect_pads_add_pad().
 
698
 *
 
699
 * The pad will be deactivated automatically when @pads is stopped.
 
700
 *
 
701
 * MT safe.
 
702
 *
 
703
 * Returns: %TRUE if the pad could be removed.
 
704
 *
 
705
 * Since: 0.10.36
 
706
 */
 
707
gboolean
 
708
gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
 
709
{
 
710
  GstCollectData *data;
 
711
  GSList *list;
 
712
 
 
713
  g_return_val_if_fail (pads != NULL, FALSE);
 
714
  g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
 
715
  g_return_val_if_fail (pad != NULL, FALSE);
 
716
  g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
 
717
 
 
718
  GST_DEBUG_OBJECT (pads, "removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
 
719
 
 
720
  GST_OBJECT_LOCK (pads);
 
721
  list =
 
722
      g_slist_find_custom (pads->priv->pad_list, pad, (GCompareFunc) find_pad);
 
723
  if (!list)
 
724
    goto unknown_pad;
 
725
 
 
726
  data = (GstCollectData *) list->data;
 
727
 
 
728
  GST_DEBUG_OBJECT (pads, "found pad %s:%s at %p", GST_DEBUG_PAD_NAME (pad),
 
729
      data);
 
730
 
 
731
  /* clear the stuff we configured */
 
732
  gst_pad_set_chain_function (pad, NULL);
 
733
  gst_pad_set_event_function (pad, NULL);
 
734
  GST_OBJECT_LOCK (pad);
 
735
  gst_pad_set_element_private (pad, NULL);
 
736
  GST_OBJECT_UNLOCK (pad);
 
737
 
 
738
  /* backward compat, also remove from data if stopped, note that this function
 
739
   * can only be called when we are stopped because we don't take the
 
740
   * STREAM_LOCK to protect the pads->data list. */
 
741
  if (!pads->priv->started) {
 
742
    GSList *dlist;
 
743
 
 
744
    dlist = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
 
745
    if (dlist) {
 
746
      GstCollectData *pdata = dlist->data;
 
747
 
 
748
      pads->data = g_slist_delete_link (pads->data, dlist);
 
749
      unref_data (pdata);
 
750
    }
 
751
  }
 
752
  /* remove from the pad list */
 
753
  pads->priv->pad_list = g_slist_delete_link (pads->priv->pad_list, list);
 
754
  pads->priv->pad_cookie++;
 
755
 
 
756
  /* signal waiters because something changed */
 
757
  GST_COLLECT_PADS_EVT_BROADCAST (pads);
 
758
 
 
759
  /* deactivate the pad when needed */
 
760
  if (!pads->priv->started)
 
761
    gst_pad_set_active (pad, FALSE);
 
762
 
 
763
  /* clean and free the collect data */
 
764
  unref_data (data);
 
765
 
 
766
  GST_OBJECT_UNLOCK (pads);
 
767
 
 
768
  return TRUE;
 
769
 
 
770
unknown_pad:
 
771
  {
 
772
    GST_WARNING_OBJECT (pads, "cannot remove unknown pad %s:%s",
 
773
        GST_DEBUG_PAD_NAME (pad));
 
774
    GST_OBJECT_UNLOCK (pads);
 
775
    return FALSE;
 
776
  }
 
777
}
 
778
 
 
779
/**
 
780
 * gst_collect_pads_is_active:
 
781
 * @pads: the collectspads to use
 
782
 * @pad: the pad to check
 
783
 *
 
784
 * Check if a pad is active.
 
785
 *
 
786
 * This function is currently not implemented.
 
787
 *
 
788
 * MT safe.
 
789
 *
 
790
 * Returns: %TRUE if the pad is active.
 
791
 *
 
792
 * Since: 0.10.36
 
793
 */
 
794
gboolean
 
795
gst_collect_pads_is_active (GstCollectPads * pads, GstPad * pad)
 
796
{
 
797
  g_return_val_if_fail (pads != NULL, FALSE);
 
798
  g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
 
799
  g_return_val_if_fail (pad != NULL, FALSE);
 
800
  g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
 
801
 
 
802
  g_warning ("gst_collect_pads_is_active() is not implemented");
 
803
 
 
804
  return FALSE;
 
805
}
 
806
 
 
807
/**
 
808
 * gst_collect_pads_collect:
 
809
 * @pads: the collectspads to use
 
810
 *
 
811
 * Collect data on all pads. This function is usually called
 
812
 * from a #GstTask function in an element. 
 
813
 *
 
814
 * This function is currently not implemented.
 
815
 *
 
816
 * MT safe.
 
817
 *
 
818
 * Returns: #GstFlowReturn of the operation.
 
819
 *
 
820
 * Since: 0.10.36
 
821
 */
 
822
GstFlowReturn
 
823
gst_collect_pads_collect (GstCollectPads * pads)
 
824
{
 
825
  g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
 
826
  g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
 
827
 
 
828
  g_warning ("gst_collect_pads_collect() is not implemented");
 
829
 
 
830
  return GST_FLOW_NOT_SUPPORTED;
 
831
}
 
832
 
 
833
/**
 
834
 * gst_collect_pads_collect_range:
 
835
 * @pads: the collectspads to use
 
836
 * @offset: the offset to collect
 
837
 * @length: the length to collect
 
838
 *
 
839
 * Collect data with @offset and @length on all pads. This function
 
840
 * is typically called in the getrange function of an element. 
 
841
 *
 
842
 * This function is currently not implemented.
 
843
 *
 
844
 * MT safe.
 
845
 *
 
846
 * Returns: #GstFlowReturn of the operation.
 
847
 *
 
848
 * Since: 0.10.36
 
849
 */
 
850
GstFlowReturn
 
851
gst_collect_pads_collect_range (GstCollectPads * pads, guint64 offset,
 
852
    guint length)
 
853
{
 
854
  g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
 
855
  g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
 
856
 
 
857
  g_warning ("gst_collect_pads_collect_range() is not implemented");
 
858
 
 
859
  return GST_FLOW_NOT_SUPPORTED;
 
860
}
 
861
 
 
862
/*
 
863
 * Must be called with STREAM_LOCK.
 
864
 */
 
865
static void
 
866
gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
 
867
    gboolean flushing)
 
868
{
 
869
  GSList *walk = NULL;
 
870
 
 
871
  /* Update the pads flushing flag */
 
872
  for (walk = pads->data; walk; walk = g_slist_next (walk)) {
 
873
    GstCollectData *cdata = walk->data;
 
874
 
 
875
    if (GST_IS_PAD (cdata->pad)) {
 
876
      GST_OBJECT_LOCK (cdata->pad);
 
877
      if (flushing)
 
878
        GST_PAD_SET_FLUSHING (cdata->pad);
 
879
      else
 
880
        GST_PAD_UNSET_FLUSHING (cdata->pad);
 
881
      if (flushing)
 
882
        GST_COLLECT_PADS_STATE_SET (cdata, GST_COLLECT_PADS_STATE_FLUSHING);
 
883
      else
 
884
        GST_COLLECT_PADS_STATE_UNSET (cdata, GST_COLLECT_PADS_STATE_FLUSHING);
 
885
      gst_collect_pads_clear (pads, cdata);
 
886
      GST_OBJECT_UNLOCK (cdata->pad);
 
887
    }
 
888
  }
 
889
 
 
890
  /* inform _chain of changes */
 
891
  GST_COLLECT_PADS_EVT_BROADCAST (pads);
 
892
}
 
893
 
 
894
/**
 
895
 * gst_collect_pads_set_flushing:
 
896
 * @pads: the collectspads to use
 
897
 * @flushing: desired state of the pads
 
898
 *
 
899
 * Change the flushing state of all the pads in the collection. No pad
 
900
 * is able to accept anymore data when @flushing is %TRUE. Calling this
 
901
 * function with @flushing %FALSE makes @pads accept data again.
 
902
 * Caller must ensure that downstream streaming (thread) is not blocked,
 
903
 * e.g. by sending a FLUSH_START downstream.
 
904
 *
 
905
 * MT safe.
 
906
 *
 
907
 * Since: 0.10.36
 
908
 */
 
909
void
 
910
gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing)
 
911
{
 
912
  g_return_if_fail (pads != NULL);
 
913
  g_return_if_fail (GST_IS_COLLECT_PADS (pads));
 
914
 
 
915
  /* NOTE since this eventually calls _pop, some (STREAM_)LOCK is needed here */
 
916
  GST_COLLECT_PADS_STREAM_LOCK (pads);
 
917
  gst_collect_pads_set_flushing_unlocked (pads, flushing);
 
918
  GST_COLLECT_PADS_STREAM_UNLOCK (pads);
 
919
}
 
920
 
 
921
/**
 
922
 * gst_collect_pads_start:
 
923
 * @pads: the collectspads to use
 
924
 *
 
925
 * Starts the processing of data in the collect_pads.
 
926
 *
 
927
 * MT safe.
 
928
 *
 
929
 * Since: 0.10.36
 
930
 */
 
931
void
 
932
gst_collect_pads_start (GstCollectPads * pads)
 
933
{
 
934
  GSList *collected;
 
935
 
 
936
  g_return_if_fail (pads != NULL);
 
937
  g_return_if_fail (GST_IS_COLLECT_PADS (pads));
 
938
 
 
939
  GST_DEBUG_OBJECT (pads, "starting collect pads");
 
940
 
 
941
  /* make sure stop and collect cannot be called anymore */
 
942
  GST_COLLECT_PADS_STREAM_LOCK (pads);
 
943
 
 
944
  /* make pads streamable */
 
945
  GST_OBJECT_LOCK (pads);
 
946
 
 
947
  /* loop over the master pad list and reset the segment */
 
948
  collected = pads->priv->pad_list;
 
949
  for (; collected; collected = g_slist_next (collected)) {
 
950
    GstCollectData *data;
 
951
 
 
952
    data = collected->data;
 
953
    gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
 
954
  }
 
955
 
 
956
  gst_collect_pads_set_flushing_unlocked (pads, FALSE);
 
957
 
 
958
  /* Start collect pads */
 
959
  pads->priv->started = TRUE;
 
960
  GST_OBJECT_UNLOCK (pads);
 
961
  GST_COLLECT_PADS_STREAM_UNLOCK (pads);
 
962
}
 
963
 
 
964
/**
 
965
 * gst_collect_pads_stop:
 
966
 * @pads: the collectspads to use
 
967
 *
 
968
 * Stops the processing of data in the collect_pads. this function
 
969
 * will also unblock any blocking operations.
 
970
 *
 
971
 * MT safe.
 
972
 *
 
973
 * Since: 0.10.36
 
974
 */
 
975
void
 
976
gst_collect_pads_stop (GstCollectPads * pads)
 
977
{
 
978
  GSList *collected;
 
979
 
 
980
  g_return_if_fail (pads != NULL);
 
981
  g_return_if_fail (GST_IS_COLLECT_PADS (pads));
 
982
 
 
983
  GST_DEBUG_OBJECT (pads, "stopping collect pads");
 
984
 
 
985
  /* make sure collect and start cannot be called anymore */
 
986
  GST_COLLECT_PADS_STREAM_LOCK (pads);
 
987
 
 
988
  /* make pads not accept data anymore */
 
989
  GST_OBJECT_LOCK (pads);
 
990
  gst_collect_pads_set_flushing_unlocked (pads, TRUE);
 
991
 
 
992
  /* Stop collect pads */
 
993
  pads->priv->started = FALSE;
 
994
  pads->priv->eospads = 0;
 
995
  pads->priv->queuedpads = 0;
 
996
 
 
997
  /* loop over the master pad list and flush buffers */
 
998
  collected = pads->priv->pad_list;
 
999
  for (; collected; collected = g_slist_next (collected)) {
 
1000
    GstCollectData *data;
 
1001
    GstBuffer **buffer_p;
 
1002
 
 
1003
    data = collected->data;
 
1004
    if (data->buffer) {
 
1005
      buffer_p = &data->buffer;
 
1006
      gst_buffer_replace (buffer_p, NULL);
 
1007
      data->pos = 0;
 
1008
    }
 
1009
    GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS);
 
1010
  }
 
1011
 
 
1012
  if (pads->priv->earliest_data)
 
1013
    unref_data (pads->priv->earliest_data);
 
1014
  pads->priv->earliest_data = NULL;
 
1015
  pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
 
1016
 
 
1017
  GST_OBJECT_UNLOCK (pads);
 
1018
  /* Wake them up so they can end the chain functions. */
 
1019
  GST_COLLECT_PADS_EVT_BROADCAST (pads);
 
1020
 
 
1021
  GST_COLLECT_PADS_STREAM_UNLOCK (pads);
 
1022
}
 
1023
 
 
1024
/**
 
1025
 * gst_collect_pads_peek:
 
1026
 * @pads: the collectspads to peek
 
1027
 * @data: the data to use
 
1028
 *
 
1029
 * Peek at the buffer currently queued in @data. This function
 
1030
 * should be called with the @pads STREAM_LOCK held, such as in the callback
 
1031
 * handler.
 
1032
 *
 
1033
 * MT safe.
 
1034
 *
 
1035
 * Returns: The buffer in @data or NULL if no buffer is queued.
 
1036
 *  should unref the buffer after usage.
 
1037
 *
 
1038
 * Since: 0.10.36
 
1039
 */
 
1040
GstBuffer *
 
1041
gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
 
1042
{
 
1043
  GstBuffer *result;
 
1044
 
 
1045
  g_return_val_if_fail (pads != NULL, NULL);
 
1046
  g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
 
1047
  g_return_val_if_fail (data != NULL, NULL);
 
1048
 
 
1049
  if ((result = data->buffer))
 
1050
    gst_buffer_ref (result);
 
1051
 
 
1052
  GST_DEBUG_OBJECT (pads, "Peeking at pad %s:%s: buffer=%p",
 
1053
      GST_DEBUG_PAD_NAME (data->pad), result);
 
1054
 
 
1055
  return result;
 
1056
}
 
1057
 
 
1058
/**
 
1059
 * gst_collect_pads_pop:
 
1060
 * @pads: the collectspads to pop
 
1061
 * @data: the data to use
 
1062
 *
 
1063
 * Pop the buffer currently queued in @data. This function
 
1064
 * should be called with the @pads STREAM_LOCK held, such as in the callback
 
1065
 * handler.
 
1066
 *
 
1067
 * MT safe.
 
1068
 *
 
1069
 * Returns: (transfer full): The buffer in @data or NULL if no buffer was
 
1070
 *   queued. You should unref the buffer after usage.
 
1071
 *
 
1072
 * Since: 0.10.36
 
1073
 */
 
1074
GstBuffer *
 
1075
gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
 
1076
{
 
1077
  GstBuffer *result;
 
1078
 
 
1079
  g_return_val_if_fail (pads != NULL, NULL);
 
1080
  g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
 
1081
  g_return_val_if_fail (data != NULL, NULL);
 
1082
 
 
1083
  if ((result = data->buffer)) {
 
1084
    data->buffer = NULL;
 
1085
    data->pos = 0;
 
1086
    /* one less pad with queued data now */
 
1087
    if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
 
1088
      pads->priv->queuedpads--;
 
1089
  }
 
1090
 
 
1091
  GST_COLLECT_PADS_EVT_BROADCAST (pads);
 
1092
 
 
1093
  GST_DEBUG_OBJECT (pads, "Pop buffer on pad %s:%s: buffer=%p",
 
1094
      GST_DEBUG_PAD_NAME (data->pad), result);
 
1095
 
 
1096
  return result;
 
1097
}
 
1098
 
 
1099
/* pop and unref the currently queued buffer, should be called with STREAM_LOCK
 
1100
 * held */
 
1101
static void
 
1102
gst_collect_pads_clear (GstCollectPads * pads, GstCollectData * data)
 
1103
{
 
1104
  GstBuffer *buf;
 
1105
 
 
1106
  if ((buf = gst_collect_pads_pop (pads, data)))
 
1107
    gst_buffer_unref (buf);
 
1108
}
 
1109
 
 
1110
/**
 
1111
 * gst_collect_pads_available:
 
1112
 * @pads: the collectspads to query
 
1113
 *
 
1114
 * Query how much bytes can be read from each queued buffer. This means
 
1115
 * that the result of this call is the maximum number of bytes that can
 
1116
 * be read from each of the pads.
 
1117
 *
 
1118
 * This function should be called with @pads STREAM_LOCK held, such as
 
1119
 * in the callback.
 
1120
 *
 
1121
 * MT safe.
 
1122
 *
 
1123
 * Returns: The maximum number of bytes queued on all pads. This function
 
1124
 * returns 0 if a pad has no queued buffer.
 
1125
 *
 
1126
 * Since: 0.10.36
 
1127
 */
 
1128
/* we might pre-calculate this in some struct field,
 
1129
 * but would then have to maintain this in _chain and particularly _pop, etc,
 
1130
 * even if element is never interested in this information */
 
1131
guint
 
1132
gst_collect_pads_available (GstCollectPads * pads)
 
1133
{
 
1134
  GSList *collected;
 
1135
  guint result = G_MAXUINT;
 
1136
 
 
1137
  g_return_val_if_fail (pads != NULL, 0);
 
1138
  g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
 
1139
 
 
1140
  collected = pads->data;
 
1141
  for (; collected; collected = g_slist_next (collected)) {
 
1142
    GstCollectData *pdata;
 
1143
    GstBuffer *buffer;
 
1144
    gint size;
 
1145
 
 
1146
    pdata = (GstCollectData *) collected->data;
 
1147
 
 
1148
    /* ignore pad with EOS */
 
1149
    if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (pdata,
 
1150
                GST_COLLECT_PADS_STATE_EOS))) {
 
1151
      GST_DEBUG_OBJECT (pads, "pad %p is EOS", pdata);
 
1152
      continue;
 
1153
    }
 
1154
 
 
1155
    /* an empty buffer without EOS is weird when we get here.. */
 
1156
    if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) {
 
1157
      GST_WARNING_OBJECT (pads, "pad %p has no buffer", pdata);
 
1158
      goto not_filled;
 
1159
    }
 
1160
 
 
1161
    /* this is the size left of the buffer */
 
1162
    size = gst_buffer_get_size (buffer) - pdata->pos;
 
1163
    GST_DEBUG_OBJECT (pads, "pad %p has %d bytes left", pdata, size);
 
1164
 
 
1165
    /* need to return the min of all available data */
 
1166
    if (size < result)
 
1167
      result = size;
 
1168
  }
 
1169
  /* nothing changed, all must be EOS then, return 0 */
 
1170
  if (G_UNLIKELY (result == G_MAXUINT))
 
1171
    result = 0;
 
1172
 
 
1173
  return result;
 
1174
 
 
1175
not_filled:
 
1176
  {
 
1177
    return 0;
 
1178
  }
 
1179
}
 
1180
 
 
1181
/**
 
1182
 * gst_collect_pads_flush:
 
1183
 * @pads: the collectspads to query
 
1184
 * @data: the data to use
 
1185
 * @size: the number of bytes to flush
 
1186
 *
 
1187
 * Flush @size bytes from the pad @data.
 
1188
 *
 
1189
 * This function should be called with @pads STREAM_LOCK held, such as
 
1190
 * in the callback.
 
1191
 *
 
1192
 * MT safe.
 
1193
 *
 
1194
 * Returns: The number of bytes flushed This can be less than @size and
 
1195
 * is 0 if the pad was end-of-stream.
 
1196
 *
 
1197
 * Since: 0.10.36
 
1198
 */
 
1199
guint
 
1200
gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
 
1201
    guint size)
 
1202
{
 
1203
  guint flushsize;
 
1204
  gsize bsize;
 
1205
  GstBuffer *buffer;
 
1206
 
 
1207
  g_return_val_if_fail (pads != NULL, 0);
 
1208
  g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
 
1209
  g_return_val_if_fail (data != NULL, 0);
 
1210
 
 
1211
  /* no buffer, must be EOS */
 
1212
  if ((buffer = data->buffer) == NULL)
 
1213
    return 0;
 
1214
 
 
1215
  bsize = gst_buffer_get_size (buffer);
 
1216
 
 
1217
  /* this is what we can flush at max */
 
1218
  flushsize = MIN (size, bsize - data->pos);
 
1219
 
 
1220
  data->pos += size;
 
1221
 
 
1222
  if (data->pos >= bsize)
 
1223
    /* _clear will also reset data->pos to 0 */
 
1224
    gst_collect_pads_clear (pads, data);
 
1225
 
 
1226
  return flushsize;
 
1227
}
 
1228
 
 
1229
/**
 
1230
 * gst_collect_pads_read_buffer:
 
1231
 * @pads: the collectspads to query
 
1232
 * @data: the data to use
 
1233
 * @size: the number of bytes to read
 
1234
 *
 
1235
 * Get a subbuffer of @size bytes from the given pad @data.
 
1236
 *
 
1237
 * This function should be called with @pads STREAM_LOCK held, such as in the
 
1238
 * callback.
 
1239
 *
 
1240
 * MT safe.
 
1241
 *
 
1242
 * Since: 0.10.36
 
1243
 *
 
1244
 * Returns: (transfer full): A sub buffer. The size of the buffer can be less that requested.
 
1245
 * A return of NULL signals that the pad is end-of-stream.
 
1246
 * Unref the buffer after use.
 
1247
 */
 
1248
GstBuffer *
 
1249
gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
 
1250
    guint size)
 
1251
{
 
1252
  guint readsize;
 
1253
  GstBuffer *buffer;
 
1254
 
 
1255
  g_return_val_if_fail (pads != NULL, NULL);
 
1256
  g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
 
1257
  g_return_val_if_fail (data != NULL, NULL);
 
1258
 
 
1259
  /* no buffer, must be EOS */
 
1260
  if ((buffer = data->buffer) == NULL)
 
1261
    return NULL;
 
1262
 
 
1263
  readsize = MIN (size, gst_buffer_get_size (buffer) - data->pos);
 
1264
 
 
1265
  return gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, data->pos,
 
1266
      readsize);
 
1267
}
 
1268
 
 
1269
/**
 
1270
 * gst_collect_pads_take_buffer:
 
1271
 * @pads: the collectspads to query
 
1272
 * @data: the data to use
 
1273
 * @size: the number of bytes to read
 
1274
 *
 
1275
 * Get a subbuffer of @size bytes from the given pad @data. Flushes the amount
 
1276
 * of read bytes.
 
1277
 *
 
1278
 * This function should be called with @pads STREAM_LOCK held, such as in the
 
1279
 * callback.
 
1280
 *
 
1281
 * MT safe.
 
1282
 *
 
1283
 * Since: 0.10.36
 
1284
 *
 
1285
 * Returns: A sub buffer. The size of the buffer can be less that requested.
 
1286
 * A return of NULL signals that the pad is end-of-stream.
 
1287
 * Unref the buffer after use.
 
1288
 */
 
1289
GstBuffer *
 
1290
gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data,
 
1291
    guint size)
 
1292
{
 
1293
  GstBuffer *buffer = gst_collect_pads_read_buffer (pads, data, size);
 
1294
 
 
1295
  if (buffer) {
 
1296
    gst_collect_pads_flush (pads, data, gst_buffer_get_size (buffer));
 
1297
  }
 
1298
  return buffer;
 
1299
}
 
1300
 
 
1301
/**
 
1302
 * gst_collect_pads_set_waiting:
 
1303
 * @pads: the collectspads
 
1304
 * @data: the data to use
 
1305
 * @waiting: boolean indicating whether this pad should operate
 
1306
 *           in waiting or non-waiting mode
 
1307
 *
 
1308
 * Sets a pad to waiting or non-waiting mode, if at least this pad
 
1309
 * has not been created with locked waiting state,
 
1310
 * in which case nothing happens.
 
1311
 *
 
1312
 * This function should be called with @pads STREAM_LOCK held, such as
 
1313
 * in the callback.
 
1314
 *
 
1315
 * MT safe.
 
1316
 *
 
1317
 * Since: 0.10.36
 
1318
 */
 
1319
void
 
1320
gst_collect_pads_set_waiting (GstCollectPads * pads, GstCollectData * data,
 
1321
    gboolean waiting)
 
1322
{
 
1323
  g_return_if_fail (pads != NULL);
 
1324
  g_return_if_fail (GST_IS_COLLECT_PADS (pads));
 
1325
  g_return_if_fail (data != NULL);
 
1326
 
 
1327
  GST_DEBUG_OBJECT (pads, "Setting pad %s to waiting %d, locked %d",
 
1328
      GST_PAD_NAME (data->pad), waiting,
 
1329
      GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED));
 
1330
 
 
1331
  /* Do something only on a change and if not locked */
 
1332
  if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_LOCKED) &&
 
1333
      (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING) !=
 
1334
          ! !waiting)) {
 
1335
    /* Set waiting state for this pad */
 
1336
    if (waiting)
 
1337
      GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_WAITING);
 
1338
    else
 
1339
      GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_WAITING);
 
1340
    /* Update number of queued pads if needed */
 
1341
    if (!data->buffer &&
 
1342
        !GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_EOS)) {
 
1343
      if (waiting)
 
1344
        pads->priv->queuedpads--;
 
1345
      else
 
1346
        pads->priv->queuedpads++;
 
1347
    }
 
1348
 
 
1349
    /* signal waiters because something changed */
 
1350
    GST_COLLECT_PADS_EVT_BROADCAST (pads);
 
1351
  }
 
1352
}
 
1353
 
 
1354
/* see if pads were added or removed and update our stats. Any pad
 
1355
 * added after releasing the LOCK will get collected in the next
 
1356
 * round.
 
1357
 *
 
1358
 * We can do a quick check by checking the cookies, that get changed
 
1359
 * whenever the pad list is updated.
 
1360
 *
 
1361
 * Must be called with STREAM_LOCK.
 
1362
 */
 
1363
static void
 
1364
gst_collect_pads_check_pads (GstCollectPads * pads)
 
1365
{
 
1366
  /* the master list and cookie are protected with LOCK */
 
1367
  GST_OBJECT_LOCK (pads);
 
1368
  if (G_UNLIKELY (pads->priv->pad_cookie != pads->priv->cookie)) {
 
1369
    GSList *collected;
 
1370
 
 
1371
    /* clear list and stats */
 
1372
    g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
 
1373
    g_slist_free (pads->data);
 
1374
    pads->data = NULL;
 
1375
    pads->priv->numpads = 0;
 
1376
    pads->priv->queuedpads = 0;
 
1377
    pads->priv->eospads = 0;
 
1378
    if (pads->priv->earliest_data)
 
1379
      unref_data (pads->priv->earliest_data);
 
1380
    pads->priv->earliest_data = NULL;
 
1381
    pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
 
1382
 
 
1383
    /* loop over the master pad list */
 
1384
    collected = pads->priv->pad_list;
 
1385
    for (; collected; collected = g_slist_next (collected)) {
 
1386
      GstCollectData *data;
 
1387
 
 
1388
      /* update the stats */
 
1389
      pads->priv->numpads++;
 
1390
      data = collected->data;
 
1391
      if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_EOS))
 
1392
        pads->priv->eospads++;
 
1393
      else if (data->buffer || !GST_COLLECT_PADS_STATE_IS_SET (data,
 
1394
              GST_COLLECT_PADS_STATE_WAITING))
 
1395
        pads->priv->queuedpads++;
 
1396
 
 
1397
      /* add to the list of pads to collect */
 
1398
      ref_data (data);
 
1399
      /* preserve order of adding/requesting pads */
 
1400
      pads->data = g_slist_append (pads->data, data);
 
1401
    }
 
1402
    /* and update the cookie */
 
1403
    pads->priv->cookie = pads->priv->pad_cookie;
 
1404
  }
 
1405
  GST_OBJECT_UNLOCK (pads);
 
1406
}
 
1407
 
 
1408
/* checks if all the pads are collected and call the collectfunction
 
1409
 *
 
1410
 * Should be called with STREAM_LOCK.
 
1411
 *
 
1412
 * Returns: The #GstFlowReturn of collection.
 
1413
 */
 
1414
static GstFlowReturn
 
1415
gst_collect_pads_check_collected (GstCollectPads * pads)
 
1416
{
 
1417
  GstFlowReturn flow_ret = GST_FLOW_OK;
 
1418
  GstCollectPadsFunction func;
 
1419
  gpointer user_data;
 
1420
 
 
1421
  g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
 
1422
 
 
1423
  GST_OBJECT_LOCK (pads);
 
1424
  func = pads->priv->func;
 
1425
  user_data = pads->priv->user_data;
 
1426
  GST_OBJECT_UNLOCK (pads);
 
1427
 
 
1428
  g_return_val_if_fail (pads->priv->func != NULL, GST_FLOW_NOT_SUPPORTED);
 
1429
 
 
1430
  /* check for new pads, update stats etc.. */
 
1431
  gst_collect_pads_check_pads (pads);
 
1432
 
 
1433
  if (G_UNLIKELY (pads->priv->eospads == pads->priv->numpads)) {
 
1434
    /* If all our pads are EOS just collect once to let the element
 
1435
     * do its final EOS handling. */
 
1436
    GST_DEBUG_OBJECT (pads, "All active pads (%d) are EOS, calling %s",
 
1437
        pads->priv->numpads, GST_DEBUG_FUNCPTR_NAME (func));
 
1438
 
 
1439
    flow_ret = func (pads, user_data);
 
1440
  } else {
 
1441
    gboolean collected = FALSE;
 
1442
 
 
1443
    /* We call the collected function as long as our condition matches. */
 
1444
    while (((pads->priv->queuedpads + pads->priv->eospads) >=
 
1445
            pads->priv->numpads)) {
 
1446
      GST_DEBUG_OBJECT (pads,
 
1447
          "All active pads (%d + %d >= %d) have data, " "calling %s",
 
1448
          pads->priv->queuedpads, pads->priv->eospads, pads->priv->numpads,
 
1449
          GST_DEBUG_FUNCPTR_NAME (func));
 
1450
 
 
1451
      flow_ret = func (pads, user_data);
 
1452
      collected = TRUE;
 
1453
 
 
1454
      /* break on error */
 
1455
      if (flow_ret != GST_FLOW_OK)
 
1456
        break;
 
1457
      /* Don't keep looping after telling the element EOS or flushing */
 
1458
      if (pads->priv->queuedpads == 0)
 
1459
        break;
 
1460
    }
 
1461
    if (!collected)
 
1462
      GST_DEBUG_OBJECT (pads, "Not all active pads (%d) have data, continuing",
 
1463
          pads->priv->numpads);
 
1464
  }
 
1465
  return flow_ret;
 
1466
}
 
1467
 
 
1468
 
 
1469
/* General overview:
 
1470
 * - only pad with a buffer can determine earliest_data (and earliest_time)
 
1471
 * - only segment info determines (non-)waiting state
 
1472
 * - ? perhaps use _stream_time for comparison
 
1473
 *   (which muxers might have use as well ?)
 
1474
 */
 
1475
 
 
1476
/*
 
1477
 * Function to recalculate the waiting state of all pads.
 
1478
 *
 
1479
 * Must be called with STREAM_LOCK.
 
1480
 *
 
1481
 * Returns TRUE if a pad was set to waiting
 
1482
 * (from non-waiting state).
 
1483
 */
 
1484
static gboolean
 
1485
gst_collect_pads_recalculate_waiting (GstCollectPads * pads)
 
1486
{
 
1487
  GSList *collected;
 
1488
  gboolean result = FALSE;
 
1489
 
 
1490
  /* If earliest time is not known, there is nothing to do. */
 
1491
  if (pads->priv->earliest_data == NULL)
 
1492
    return FALSE;
 
1493
 
 
1494
  for (collected = pads->data; collected; collected = g_slist_next (collected)) {
 
1495
    GstCollectData *data = (GstCollectData *) collected->data;
 
1496
    int cmp_res;
 
1497
 
 
1498
    /* check if pad has a segment */
 
1499
    if (data->segment.format == GST_FORMAT_UNDEFINED) {
 
1500
      GST_WARNING_OBJECT (pads,
 
1501
          "GstCollectPads has no time segment, assuming 0 based.");
 
1502
      gst_segment_init (&data->segment, GST_FORMAT_TIME);
 
1503
      GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
 
1504
    }
 
1505
 
 
1506
    /* check segment format */
 
1507
    if (data->segment.format != GST_FORMAT_TIME) {
 
1508
      GST_ERROR_OBJECT (pads, "GstCollectPads can handle only time segments.");
 
1509
      continue;
 
1510
    }
 
1511
 
 
1512
    /* check if the waiting state should be changed */
 
1513
    cmp_res = pads->priv->compare_func (pads, data, data->segment.start,
 
1514
        pads->priv->earliest_data, pads->priv->earliest_time,
 
1515
        pads->priv->compare_user_data);
 
1516
    if (cmp_res > 0)
 
1517
      /* stop waiting */
 
1518
      gst_collect_pads_set_waiting (pads, data, FALSE);
 
1519
    else {
 
1520
      if (!GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING)) {
 
1521
        /* start waiting */
 
1522
        gst_collect_pads_set_waiting (pads, data, TRUE);
 
1523
        result = TRUE;
 
1524
      }
 
1525
    }
 
1526
  }
 
1527
 
 
1528
  return result;
 
1529
}
 
1530
 
 
1531
/**
 
1532
 * gst_collect_pads_find_best_pad:
 
1533
 * @pads: the collectpads to use
 
1534
 * @data: returns the collectdata for earliest data
 
1535
 * @time: returns the earliest available buffertime
 
1536
 *
 
1537
 * Find the oldest/best pad, i.e. pad holding the oldest buffer and
 
1538
 * and return the corresponding #GstCollectData and buffertime.
 
1539
 *
 
1540
 * This function should be called with STREAM_LOCK held,
 
1541
 * such as in the callback.
 
1542
 *
 
1543
 * Since: 0.10.36
 
1544
 */
 
1545
static void
 
1546
gst_collect_pads_find_best_pad (GstCollectPads * pads,
 
1547
    GstCollectData ** data, GstClockTime * time)
 
1548
{
 
1549
  GSList *collected;
 
1550
  GstCollectData *best = NULL;
 
1551
  GstClockTime best_time = GST_CLOCK_TIME_NONE;
 
1552
 
 
1553
  g_return_if_fail (data != NULL);
 
1554
  g_return_if_fail (time != NULL);
 
1555
 
 
1556
  for (collected = pads->data; collected; collected = g_slist_next (collected)) {
 
1557
    GstBuffer *buffer;
 
1558
    GstCollectData *data = (GstCollectData *) collected->data;
 
1559
    GstClockTime timestamp;
 
1560
 
 
1561
    buffer = gst_collect_pads_peek (pads, data);
 
1562
    /* if we have a buffer check if it is better then the current best one */
 
1563
    if (buffer != NULL) {
 
1564
      timestamp = GST_BUFFER_TIMESTAMP (buffer);
 
1565
      gst_buffer_unref (buffer);
 
1566
      if (best == NULL || pads->priv->compare_func (pads, data, timestamp,
 
1567
              best, best_time, pads->priv->compare_user_data) < 0) {
 
1568
        best = data;
 
1569
        best_time = timestamp;
 
1570
      }
 
1571
    }
 
1572
  }
 
1573
 
 
1574
  /* set earliest time */
 
1575
  *data = best;
 
1576
  *time = best_time;
 
1577
 
 
1578
  GST_DEBUG_OBJECT (pads, "best pad %s, best time %" GST_TIME_FORMAT,
 
1579
      best ? GST_PAD_NAME (((GstCollectData *) best)->pad) : "(nil)",
 
1580
      GST_TIME_ARGS (best_time));
 
1581
}
 
1582
 
 
1583
/*
 
1584
 * Function to recalculate earliest_data and earliest_timestamp. This also calls
 
1585
 * gst_collect_pads_recalculate_waiting
 
1586
 *
 
1587
 * Must be called with STREAM_LOCK.
 
1588
 */
 
1589
static gboolean
 
1590
gst_collect_pads_recalculate_full (GstCollectPads * pads)
 
1591
{
 
1592
  if (pads->priv->earliest_data)
 
1593
    unref_data (pads->priv->earliest_data);
 
1594
  gst_collect_pads_find_best_pad (pads, &pads->priv->earliest_data,
 
1595
      &pads->priv->earliest_time);
 
1596
  if (pads->priv->earliest_data)
 
1597
    ref_data (pads->priv->earliest_data);
 
1598
  return gst_collect_pads_recalculate_waiting (pads);
 
1599
}
 
1600
 
 
1601
/*
 
1602
 * Default collect callback triggered when #GstCollectPads gathered all data.
 
1603
 *
 
1604
 * Called with STREAM_LOCK.
 
1605
 */
 
1606
static GstFlowReturn
 
1607
gst_collect_pads_default_collected (GstCollectPads * pads, gpointer user_data)
 
1608
{
 
1609
  GstCollectData *best = NULL;
 
1610
  GstBuffer *buffer;
 
1611
  GstFlowReturn ret = GST_FLOW_OK;
 
1612
  GstCollectPadsBufferFunction func;
 
1613
  gpointer buffer_user_data;
 
1614
 
 
1615
  g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
 
1616
 
 
1617
  GST_OBJECT_LOCK (pads);
 
1618
  func = pads->priv->buffer_func;
 
1619
  buffer_user_data = pads->priv->buffer_user_data;
 
1620
  GST_OBJECT_UNLOCK (pads);
 
1621
 
 
1622
  g_return_val_if_fail (func != NULL, GST_FLOW_NOT_SUPPORTED);
 
1623
 
 
1624
  /* Find the oldest pad at all cost */
 
1625
  if (gst_collect_pads_recalculate_full (pads)) {
 
1626
    /* waiting was switched on,
 
1627
     * so give another thread a chance to deliver a possibly
 
1628
     * older buffer; don't charge on yet with the current oldest */
 
1629
    ret = GST_FLOW_OK;
 
1630
    goto done;
 
1631
  }
 
1632
 
 
1633
  best = pads->priv->earliest_data;
 
1634
 
 
1635
  /* No data collected means EOS. */
 
1636
  if (G_UNLIKELY (best == NULL)) {
 
1637
    ret = func (pads, best, NULL, buffer_user_data);
 
1638
    if (ret == GST_FLOW_OK)
 
1639
      ret = GST_FLOW_EOS;
 
1640
    goto done;
 
1641
  }
 
1642
 
 
1643
  /* make sure that the pad we take a buffer from is waiting;
 
1644
   * otherwise popping a buffer will seem not to have happened
 
1645
   * and collectpads can get into a busy loop */
 
1646
  gst_collect_pads_set_waiting (pads, best, TRUE);
 
1647
 
 
1648
  /* Send buffer */
 
1649
  buffer = gst_collect_pads_pop (pads, best);
 
1650
  ret = func (pads, best, buffer, buffer_user_data);
 
1651
 
 
1652
  /* maybe non-waiting was forced to waiting above due to
 
1653
   * newsegment events coming too sparsely,
 
1654
   * so re-check to restore state to avoid hanging/waiting */
 
1655
  gst_collect_pads_recalculate_full (pads);
 
1656
 
 
1657
done:
 
1658
  return ret;
 
1659
}
 
1660
 
 
1661
/*
 
1662
 * Default timestamp compare function.
 
1663
 */
 
1664
static gint
 
1665
gst_collect_pads_default_compare_func (GstCollectPads * pads,
 
1666
    GstCollectData * data1, GstClockTime timestamp1,
 
1667
    GstCollectData * data2, GstClockTime timestamp2, gpointer user_data)
 
1668
{
 
1669
 
 
1670
  GST_LOG_OBJECT (pads, "comparing %" GST_TIME_FORMAT
 
1671
      " and %" GST_TIME_FORMAT, GST_TIME_ARGS (timestamp1),
 
1672
      GST_TIME_ARGS (timestamp2));
 
1673
  /* non-valid timestamps go first as they are probably headers or so */
 
1674
  if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp1)))
 
1675
    return GST_CLOCK_TIME_IS_VALID (timestamp2) ? -1 : 0;
 
1676
 
 
1677
  if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (timestamp2)))
 
1678
    return 1;
 
1679
 
 
1680
  /* compare timestamp */
 
1681
  if (timestamp1 < timestamp2)
 
1682
    return -1;
 
1683
 
 
1684
  if (timestamp1 > timestamp2)
 
1685
    return 1;
 
1686
 
 
1687
  return 0;
 
1688
}
 
1689
 
 
1690
/**
 
1691
 * gst_collect_pads_event_default:
 
1692
 * @pads: the collectspads to use
 
1693
 * @data: collect data of corresponding pad
 
1694
 * @event: event being processed
 
1695
 * @discard: process but do not send event downstream
 
1696
 *
 
1697
 * Default GstCollectPads event handling that elements should always
 
1698
 * chain up to to ensure proper operation.  Element might however indicate
 
1699
 * event should not be forwarded downstream.
 
1700
 *
 
1701
 * Since: 0.11.x
 
1702
 */
 
1703
gboolean
 
1704
gst_collect_pads_event_default (GstCollectPads * pads, GstCollectData * data,
 
1705
    GstEvent * event, gboolean discard)
 
1706
{
 
1707
  gboolean res = TRUE;
 
1708
  GstCollectPadsBufferFunction buffer_func;
 
1709
  GstObject *parent;
 
1710
  GstPad *pad;
 
1711
 
 
1712
  GST_OBJECT_LOCK (pads);
 
1713
  buffer_func = pads->priv->buffer_func;
 
1714
  GST_OBJECT_UNLOCK (pads);
 
1715
 
 
1716
  pad = data->pad;
 
1717
  parent = GST_OBJECT_PARENT (pad);
 
1718
 
 
1719
  switch (GST_EVENT_TYPE (event)) {
 
1720
    case GST_EVENT_FLUSH_START:
 
1721
    {
 
1722
      /* forward event to unblock check_collected */
 
1723
      GST_DEBUG_OBJECT (pad, "forwarding flush start");
 
1724
      res = gst_pad_event_default (pad, parent, event);
 
1725
      event = NULL;
 
1726
 
 
1727
      /* now unblock the chain function.
 
1728
       * no cond per pad, so they all unblock,
 
1729
       * non-flushing block again */
 
1730
      GST_COLLECT_PADS_STREAM_LOCK (pads);
 
1731
      GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_FLUSHING);
 
1732
      gst_collect_pads_clear (pads, data);
 
1733
 
 
1734
      /* cater for possible default muxing functionality */
 
1735
      if (buffer_func) {
 
1736
        /* restore to initial state */
 
1737
        gst_collect_pads_set_waiting (pads, data, TRUE);
 
1738
        /* if the current pad is affected, reset state, recalculate later */
 
1739
        if (pads->priv->earliest_data == data) {
 
1740
          unref_data (data);
 
1741
          pads->priv->earliest_data = NULL;
 
1742
          pads->priv->earliest_time = GST_CLOCK_TIME_NONE;
 
1743
        }
 
1744
      }
 
1745
 
 
1746
      GST_COLLECT_PADS_STREAM_UNLOCK (pads);
 
1747
 
 
1748
      goto eat;
 
1749
    }
 
1750
    case GST_EVENT_FLUSH_STOP:
 
1751
    {
 
1752
      /* flush the 1 buffer queue */
 
1753
      GST_COLLECT_PADS_STREAM_LOCK (pads);
 
1754
      GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_FLUSHING);
 
1755
      gst_collect_pads_clear (pads, data);
 
1756
      /* we need new segment info after the flush */
 
1757
      gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
 
1758
      GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
 
1759
      /* if the pad was EOS, remove the EOS flag and
 
1760
       * decrement the number of eospads */
 
1761
      if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
 
1762
                  GST_COLLECT_PADS_STATE_EOS))) {
 
1763
        if (!GST_COLLECT_PADS_STATE_IS_SET (data,
 
1764
                GST_COLLECT_PADS_STATE_WAITING))
 
1765
          pads->priv->queuedpads++;
 
1766
        pads->priv->eospads--;
 
1767
        GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_EOS);
 
1768
      }
 
1769
      GST_COLLECT_PADS_STREAM_UNLOCK (pads);
 
1770
 
 
1771
      goto forward;
 
1772
    }
 
1773
    case GST_EVENT_EOS:
 
1774
    {
 
1775
      GST_COLLECT_PADS_STREAM_LOCK (pads);
 
1776
      /* if the pad was not EOS, make it EOS and so we
 
1777
       * have one more eospad */
 
1778
      if (G_LIKELY (!GST_COLLECT_PADS_STATE_IS_SET (data,
 
1779
                  GST_COLLECT_PADS_STATE_EOS))) {
 
1780
        GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_EOS);
 
1781
        if (!GST_COLLECT_PADS_STATE_IS_SET (data,
 
1782
                GST_COLLECT_PADS_STATE_WAITING))
 
1783
          pads->priv->queuedpads--;
 
1784
        pads->priv->eospads++;
 
1785
      }
 
1786
      /* check if we need collecting anything, we ignore the result. */
 
1787
      gst_collect_pads_check_collected (pads);
 
1788
      GST_COLLECT_PADS_STREAM_UNLOCK (pads);
 
1789
 
 
1790
      goto eat;
 
1791
    }
 
1792
    case GST_EVENT_SEGMENT:
 
1793
    {
 
1794
      GstSegment seg;
 
1795
      gint cmp_res;
 
1796
 
 
1797
      GST_COLLECT_PADS_STREAM_LOCK (pads);
 
1798
 
 
1799
      gst_event_copy_segment (event, &seg);
 
1800
 
 
1801
      GST_DEBUG_OBJECT (data->pad, "got segment %" GST_SEGMENT_FORMAT, &seg);
 
1802
 
 
1803
      /* default collection can not handle other segment formats than time */
 
1804
      if (buffer_func && seg.format != GST_FORMAT_TIME) {
 
1805
        GST_WARNING_OBJECT (pads, "GstCollectPads default collecting "
 
1806
            "can only handle time segments. Non time segment ignored.");
 
1807
        goto newsegment_done;
 
1808
      }
 
1809
 
 
1810
      data->segment = seg;
 
1811
      GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_NEW_SEGMENT);
 
1812
 
 
1813
      /* default muxing functionality */
 
1814
      if (!buffer_func)
 
1815
        goto newsegment_done;
 
1816
 
 
1817
      /* If oldest time is not known, or current pad got newsegment;
 
1818
       * recalculate the state */
 
1819
      if (!pads->priv->earliest_data || pads->priv->earliest_data == data) {
 
1820
        gst_collect_pads_recalculate_full (pads);
 
1821
        goto newsegment_done;
 
1822
      }
 
1823
 
 
1824
      /* Check if the waiting state of the pad should change. */
 
1825
      cmp_res =
 
1826
          pads->priv->compare_func (pads, data, seg.start,
 
1827
          pads->priv->earliest_data, pads->priv->earliest_time,
 
1828
          pads->priv->compare_user_data);
 
1829
 
 
1830
      if (cmp_res > 0)
 
1831
        /* Stop waiting */
 
1832
        gst_collect_pads_set_waiting (pads, data, FALSE);
 
1833
 
 
1834
    newsegment_done:
 
1835
      GST_COLLECT_PADS_STREAM_UNLOCK (pads);
 
1836
      /* we must not forward this event since multiple segments will be
 
1837
       * accumulated and this is certainly not what we want. */
 
1838
      goto eat;
 
1839
    }
 
1840
    case GST_EVENT_CAPS:
 
1841
    case GST_EVENT_STREAM_START:
 
1842
    case GST_EVENT_STREAM_CONFIG:
 
1843
      goto eat;
 
1844
    default:
 
1845
      /* forward other events */
 
1846
      goto forward;
 
1847
  }
 
1848
 
 
1849
eat:
 
1850
  if (event)
 
1851
    gst_event_unref (event);
 
1852
  return res;
 
1853
 
 
1854
forward:
 
1855
  if (discard)
 
1856
    goto eat;
 
1857
  else
 
1858
    return gst_pad_event_default (pad, parent, event);
 
1859
}
 
1860
 
 
1861
static gboolean
 
1862
gst_collect_pads_event_default_internal (GstCollectPads * pads,
 
1863
    GstCollectData * data, GstEvent * event, gpointer user_data)
 
1864
{
 
1865
  return gst_collect_pads_event_default (pads, data, event, FALSE);
 
1866
}
 
1867
 
 
1868
static gboolean
 
1869
gst_collect_pads_event (GstPad * pad, GstObject * parent, GstEvent * event)
 
1870
{
 
1871
  gboolean res = FALSE, need_unlock = FALSE;
 
1872
  GstCollectData *data;
 
1873
  GstCollectPads *pads;
 
1874
  GstCollectPadsEventFunction event_func;
 
1875
  gpointer event_user_data;
 
1876
 
 
1877
  /* some magic to get the managing collect_pads */
 
1878
  GST_OBJECT_LOCK (pad);
 
1879
  data = (GstCollectData *) gst_pad_get_element_private (pad);
 
1880
  if (G_UNLIKELY (data == NULL))
 
1881
    goto pad_removed;
 
1882
  ref_data (data);
 
1883
  GST_OBJECT_UNLOCK (pad);
 
1884
 
 
1885
  res = FALSE;
 
1886
 
 
1887
  pads = data->collect;
 
1888
 
 
1889
  GST_DEBUG_OBJECT (data->pad, "Got %s event on sink pad",
 
1890
      GST_EVENT_TYPE_NAME (event));
 
1891
 
 
1892
  GST_OBJECT_LOCK (pads);
 
1893
  event_func = pads->priv->event_func;
 
1894
  event_user_data = pads->priv->event_user_data;
 
1895
  GST_OBJECT_UNLOCK (pads);
 
1896
 
 
1897
  if (GST_EVENT_IS_SERIALIZED (event)) {
 
1898
    GST_COLLECT_PADS_STREAM_LOCK (pads);
 
1899
    need_unlock = TRUE;
 
1900
  }
 
1901
 
 
1902
  if (G_LIKELY (event_func)) {
 
1903
    res = event_func (pads, data, event, event_user_data);
 
1904
  }
 
1905
 
 
1906
  if (need_unlock)
 
1907
    GST_COLLECT_PADS_STREAM_UNLOCK (pads);
 
1908
 
 
1909
  unref_data (data);
 
1910
  return res;
 
1911
 
 
1912
  /* ERRORS */
 
1913
pad_removed:
 
1914
  {
 
1915
    GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
 
1916
    GST_OBJECT_UNLOCK (pad);
 
1917
    return FALSE;
 
1918
  }
 
1919
}
 
1920
 
 
1921
/**
 
1922
 * gst_collect_pads_query_default:
 
1923
 * @pads: the collectspads to use
 
1924
 * @data: collect data of corresponding pad
 
1925
 * @query: query being processed
 
1926
 * @discard: process but do not send event downstream
 
1927
 *
 
1928
 * Default GstCollectPads query handling that elements should always
 
1929
 * chain up to to ensure proper operation.  Element might however indicate
 
1930
 * query should not be forwarded downstream.
 
1931
 *
 
1932
 * Since: 0.11.x
 
1933
 */
 
1934
gboolean
 
1935
gst_collect_pads_query_default (GstCollectPads * pads, GstCollectData * data,
 
1936
    GstQuery * query, gboolean discard)
 
1937
{
 
1938
  gboolean res = TRUE;
 
1939
  GstObject *parent;
 
1940
  GstPad *pad;
 
1941
 
 
1942
  pad = data->pad;
 
1943
  parent = GST_OBJECT_PARENT (pad);
 
1944
 
 
1945
  switch (GST_QUERY_TYPE (query)) {
 
1946
    case GST_QUERY_SEEKING:
 
1947
    {
 
1948
      GstFormat format;
 
1949
 
 
1950
      /* don't pass it along as some (file)sink might claim it does
 
1951
       * whereas with a collectpads in between that will not likely work */
 
1952
      gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
 
1953
      gst_query_set_seeking (query, format, FALSE, 0, -1);
 
1954
      res = TRUE;
 
1955
      discard = TRUE;
 
1956
      break;
 
1957
    }
 
1958
    default:
 
1959
      break;
 
1960
  }
 
1961
 
 
1962
  if (!discard)
 
1963
    return gst_pad_query_default (pad, parent, query);
 
1964
  else
 
1965
    return res;
 
1966
}
 
1967
 
 
1968
static gboolean
 
1969
gst_collect_pads_query_default_internal (GstCollectPads * pads,
 
1970
    GstCollectData * data, GstQuery * query, gpointer user_data)
 
1971
{
 
1972
  return gst_collect_pads_query_default (pads, data, query, FALSE);
 
1973
}
 
1974
 
 
1975
static gboolean
 
1976
gst_collect_pads_query (GstPad * pad, GstObject * parent, GstQuery * query)
 
1977
{
 
1978
  gboolean res = FALSE, need_unlock = FALSE;
 
1979
  GstCollectData *data;
 
1980
  GstCollectPads *pads;
 
1981
  GstCollectPadsQueryFunction query_func;
 
1982
  gpointer query_user_data;
 
1983
 
 
1984
  GST_DEBUG_OBJECT (pad, "Got %s query on sink pad",
 
1985
      GST_QUERY_TYPE_NAME (query));
 
1986
 
 
1987
  /* some magic to get the managing collect_pads */
 
1988
  GST_OBJECT_LOCK (pad);
 
1989
  data = (GstCollectData *) gst_pad_get_element_private (pad);
 
1990
  if (G_UNLIKELY (data == NULL))
 
1991
    goto pad_removed;
 
1992
  ref_data (data);
 
1993
  GST_OBJECT_UNLOCK (pad);
 
1994
 
 
1995
  pads = data->collect;
 
1996
 
 
1997
  GST_OBJECT_LOCK (pads);
 
1998
  query_func = pads->priv->query_func;
 
1999
  query_user_data = pads->priv->query_user_data;
 
2000
  GST_OBJECT_UNLOCK (pads);
 
2001
 
 
2002
  if (GST_QUERY_IS_SERIALIZED (query)) {
 
2003
    GST_COLLECT_PADS_STREAM_LOCK (pads);
 
2004
    need_unlock = TRUE;
 
2005
  }
 
2006
 
 
2007
  if (G_LIKELY (query_func)) {
 
2008
    res = query_func (pads, data, query, query_user_data);
 
2009
  }
 
2010
 
 
2011
  if (need_unlock)
 
2012
    GST_COLLECT_PADS_STREAM_UNLOCK (pads);
 
2013
 
 
2014
  unref_data (data);
 
2015
  return res;
 
2016
 
 
2017
  /* ERRORS */
 
2018
pad_removed:
 
2019
  {
 
2020
    GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
 
2021
    GST_OBJECT_UNLOCK (pad);
 
2022
    return FALSE;
 
2023
  }
 
2024
}
 
2025
 
 
2026
 
 
2027
/* For each buffer we receive we check if our collected condition is reached
 
2028
 * and if so we call the collected function. When this is done we check if
 
2029
 * data has been unqueued. If data is still queued we wait holding the stream
 
2030
 * lock to make sure no EOS event can happen while we are ready to be
 
2031
 * collected 
 
2032
 */
 
2033
static GstFlowReturn
 
2034
gst_collect_pads_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
 
2035
{
 
2036
  GstCollectData *data;
 
2037
  GstCollectPads *pads;
 
2038
  GstFlowReturn ret;
 
2039
  GstBuffer **buffer_p;
 
2040
  guint32 cookie;
 
2041
 
 
2042
  GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
 
2043
 
 
2044
  /* some magic to get the managing collect_pads */
 
2045
  GST_OBJECT_LOCK (pad);
 
2046
  data = (GstCollectData *) gst_pad_get_element_private (pad);
 
2047
  if (G_UNLIKELY (data == NULL))
 
2048
    goto no_data;
 
2049
  ref_data (data);
 
2050
  GST_OBJECT_UNLOCK (pad);
 
2051
 
 
2052
  pads = data->collect;
 
2053
 
 
2054
  GST_COLLECT_PADS_STREAM_LOCK (pads);
 
2055
  /* if not started, bail out */
 
2056
  if (G_UNLIKELY (!pads->priv->started))
 
2057
    goto not_started;
 
2058
  /* check if this pad is flushing */
 
2059
  if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
 
2060
              GST_COLLECT_PADS_STATE_FLUSHING)))
 
2061
    goto flushing;
 
2062
  /* pad was EOS, we can refuse this data */
 
2063
  if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
 
2064
              GST_COLLECT_PADS_STATE_EOS)))
 
2065
    goto eos;
 
2066
 
 
2067
  /* see if we need to clip */
 
2068
  if (pads->priv->clip_func) {
 
2069
    GstBuffer *outbuf = NULL;
 
2070
    ret =
 
2071
        pads->priv->clip_func (pads, data, buffer, &outbuf,
 
2072
        pads->priv->clip_user_data);
 
2073
    buffer = outbuf;
 
2074
 
 
2075
    if (G_UNLIKELY (outbuf == NULL))
 
2076
      goto clipped;
 
2077
 
 
2078
    if (G_UNLIKELY (ret == GST_FLOW_EOS))
 
2079
      goto eos;
 
2080
    else if (G_UNLIKELY (ret != GST_FLOW_OK))
 
2081
      goto error;
 
2082
  }
 
2083
 
 
2084
  GST_DEBUG_OBJECT (pads, "Queuing buffer %p for pad %s:%s", buffer,
 
2085
      GST_DEBUG_PAD_NAME (pad));
 
2086
 
 
2087
  /* One more pad has data queued */
 
2088
  if (GST_COLLECT_PADS_STATE_IS_SET (data, GST_COLLECT_PADS_STATE_WAITING))
 
2089
    pads->priv->queuedpads++;
 
2090
  buffer_p = &data->buffer;
 
2091
  gst_buffer_replace (buffer_p, buffer);
 
2092
 
 
2093
  /* update segment last position if in TIME */
 
2094
  if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
 
2095
    GstClockTime timestamp = GST_BUFFER_TIMESTAMP (buffer);
 
2096
 
 
2097
    if (GST_CLOCK_TIME_IS_VALID (timestamp))
 
2098
      data->segment.position = timestamp;
 
2099
  }
 
2100
 
 
2101
  /* While we have data queued on this pad try to collect stuff */
 
2102
  do {
 
2103
    /* Check if our collected condition is matched and call the collected
 
2104
     * function if it is */
 
2105
    ret = gst_collect_pads_check_collected (pads);
 
2106
    /* when an error occurs, we want to report this back to the caller ASAP
 
2107
     * without having to block if the buffer was not popped */
 
2108
    if (G_UNLIKELY (ret != GST_FLOW_OK))
 
2109
      goto error;
 
2110
 
 
2111
    /* data was consumed, we can exit and accept new data */
 
2112
    if (data->buffer == NULL)
 
2113
      break;
 
2114
 
 
2115
    /* Having the _INIT here means we don't care about any broadcast up to here
 
2116
     * (most of which occur with STREAM_LOCK held, so could not have happened
 
2117
     * anyway).  We do care about e.g. a remove initiated broadcast as of this
 
2118
     * point.  Putting it here also makes this thread ignores any evt it raised
 
2119
     * itself (as is a usual WAIT semantic).
 
2120
     */
 
2121
    GST_COLLECT_PADS_EVT_INIT (cookie);
 
2122
 
 
2123
    /* pad could be removed and re-added */
 
2124
    unref_data (data);
 
2125
    GST_OBJECT_LOCK (pad);
 
2126
    if (G_UNLIKELY ((data = gst_pad_get_element_private (pad)) == NULL))
 
2127
      goto pad_removed;
 
2128
    ref_data (data);
 
2129
    GST_OBJECT_UNLOCK (pad);
 
2130
 
 
2131
    GST_DEBUG_OBJECT (pads, "Pad %s:%s has a buffer queued, waiting",
 
2132
        GST_DEBUG_PAD_NAME (pad));
 
2133
 
 
2134
    /* wait to be collected, this must happen from another thread triggered
 
2135
     * by the _chain function of another pad. We release the lock so we
 
2136
     * can get stopped or flushed as well. We can however not get EOS
 
2137
     * because we still hold the STREAM_LOCK.
 
2138
     */
 
2139
    GST_COLLECT_PADS_STREAM_UNLOCK (pads);
 
2140
    GST_COLLECT_PADS_EVT_WAIT (pads, cookie);
 
2141
    GST_COLLECT_PADS_STREAM_LOCK (pads);
 
2142
 
 
2143
    GST_DEBUG_OBJECT (pads, "Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
 
2144
 
 
2145
    /* after a signal, we could be stopped */
 
2146
    if (G_UNLIKELY (!pads->priv->started))
 
2147
      goto not_started;
 
2148
    /* check if this pad is flushing */
 
2149
    if (G_UNLIKELY (GST_COLLECT_PADS_STATE_IS_SET (data,
 
2150
                GST_COLLECT_PADS_STATE_FLUSHING)))
 
2151
      goto flushing;
 
2152
  }
 
2153
  while (data->buffer != NULL);
 
2154
 
 
2155
unlock_done:
 
2156
  GST_COLLECT_PADS_STREAM_UNLOCK (pads);
 
2157
  unref_data (data);
 
2158
  if (buffer)
 
2159
    gst_buffer_unref (buffer);
 
2160
  return ret;
 
2161
 
 
2162
pad_removed:
 
2163
  {
 
2164
    GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
 
2165
    GST_OBJECT_UNLOCK (pad);
 
2166
    ret = GST_FLOW_NOT_LINKED;
 
2167
    goto unlock_done;
 
2168
  }
 
2169
  /* ERRORS */
 
2170
no_data:
 
2171
  {
 
2172
    GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
 
2173
    GST_OBJECT_UNLOCK (pad);
 
2174
    gst_buffer_unref (buffer);
 
2175
    return GST_FLOW_NOT_LINKED;
 
2176
  }
 
2177
not_started:
 
2178
  {
 
2179
    GST_DEBUG ("not started");
 
2180
    gst_collect_pads_clear (pads, data);
 
2181
    ret = GST_FLOW_FLUSHING;
 
2182
    goto unlock_done;
 
2183
  }
 
2184
flushing:
 
2185
  {
 
2186
    GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
 
2187
    gst_collect_pads_clear (pads, data);
 
2188
    ret = GST_FLOW_FLUSHING;
 
2189
    goto unlock_done;
 
2190
  }
 
2191
eos:
 
2192
  {
 
2193
    /* we should not post an error for this, just inform upstream that
 
2194
     * we don't expect anything anymore */
 
2195
    GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad));
 
2196
    ret = GST_FLOW_EOS;
 
2197
    goto unlock_done;
 
2198
  }
 
2199
clipped:
 
2200
  {
 
2201
    GST_DEBUG ("clipped buffer on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
 
2202
    ret = GST_FLOW_OK;
 
2203
    goto unlock_done;
 
2204
  }
 
2205
error:
 
2206
  {
 
2207
    /* we print the error, the element should post a reasonable error
 
2208
     * message for fatal errors */
 
2209
    GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret));
 
2210
    gst_collect_pads_clear (pads, data);
 
2211
    goto unlock_done;
 
2212
  }
 
2213
}