~ubuntu-branches/ubuntu/quantal/gst-plugins-bad0.10/quantal-proposed

« back to all changes in this revision

Viewing changes to ext/curl/gstcurlsink.c

  • Committer: Bazaar Package Importer
  • Author(s): Ken VanDine
  • Date: 2011-07-19 14:32:43 UTC
  • mfrom: (18.4.21 sid)
  • Revision ID: james.westby@ubuntu.com-20110719143243-p7pnkh45akfp0ihk
Tags: 0.10.22-2ubuntu1
* Rebased on debian unstable, remaining changes:
  - debian/gstreamer-plugins-bad.install
    * don't include dtmf, liveadder, rtpmux, autoconvert and shm, we include 
      them in -good

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* GStreamer
 
2
 * Copyright (C) 2011 Axis Communications <dev-gstreamer@axis.com>
 
3
 *
 
4
 * This library is free software; you can redistribute it and/or
 
5
 * modify it under the terms of the GNU Library General Public
 
6
 * License as published by the Free Software Foundation; either
 
7
 * version 2 of the License, or (at your option) any later version.
 
8
 *
 
9
 * This library is distributed in the hope that it will be useful,
 
10
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
12
 * Library General Public License for more details.
 
13
 *
 
14
 * You should have received a copy of the GNU Library General Public
 
15
 * License along with this library; if not, write to the
 
16
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 
17
 * Boston, MA 02111-1307, USA.
 
18
 */
 
19
 
 
20
/**
 
21
 * SECTION:element-curlsink
 
22
 * @short_description: sink that uploads data to a server using libcurl
 
23
 * @see_also:
 
24
 *
 
25
 * This is a network sink that uses libcurl as a client to upload data to
 
26
 * a server (e.g. a HTTP/FTP server).
 
27
 *
 
28
 * <refsect2>
 
29
 * <title>Example launch line (upload a JPEG file to an HTTP server)</title>
 
30
 * |[
 
31
 * gst-launch filesrc filesrc location=image.jpg ! jpegparse ! curlsink  \
 
32
 *     file-name=image.jpg  \
 
33
 *     location=http://192.168.0.1:8080/cgi-bin/patupload.cgi/  \
 
34
 *     user=test passwd=test  \
 
35
 *     content-type=image/jpeg  \
 
36
 *     use-content-length=false
 
37
 * ]|
 
38
 * </refsect2>
 
39
 */
 
40
 
 
41
#ifdef HAVE_CONFIG_H
 
42
#include "config.h"
 
43
#endif
 
44
 
 
45
#include <curl/curl.h>
 
46
#include <string.h>
 
47
#include <stdio.h>
 
48
 
 
49
#include <sys/socket.h>
 
50
#include <sys/types.h>
 
51
#include <netinet/in.h>
 
52
#include <unistd.h>
 
53
#include <netinet/ip.h>
 
54
#include <netinet/tcp.h>
 
55
#include <sys/stat.h>
 
56
#include <fcntl.h>
 
57
 
 
58
#include "gstcurlsink.h"
 
59
 
 
60
/* Default values */
 
61
#define GST_CAT_DEFAULT                gst_curl_sink_debug
 
62
#define DEFAULT_URL                    "localhost:5555"
 
63
#define DEFAULT_TIMEOUT                30
 
64
#define DEFAULT_PROXY_PORT             3128
 
65
#define DEFAULT_QOS_DSCP               0
 
66
#define DEFAULT_ACCEPT_SELF_SIGNED     FALSE
 
67
#define DEFAULT_USE_CONTENT_LENGTH     FALSE
 
68
 
 
69
#define DSCP_MIN                       0
 
70
#define DSCP_MAX                       63
 
71
#define RESPONSE_100_CONTINUE          100
 
72
#define RESPONSE_CONNECT_PROXY         200
 
73
 
 
74
/* Plugin specific settings */
 
75
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
 
76
    GST_PAD_SINK,
 
77
    GST_PAD_ALWAYS,
 
78
    GST_STATIC_CAPS_ANY);
 
79
 
 
80
GST_DEBUG_CATEGORY_STATIC (gst_curl_sink_debug);
 
81
 
 
82
enum
 
83
{
 
84
  PROP_0,
 
85
  PROP_LOCATION,
 
86
  PROP_USER_NAME,
 
87
  PROP_USER_PASSWD,
 
88
  PROP_PROXY,
 
89
  PROP_PROXY_PORT,
 
90
  PROP_PROXY_USER_NAME,
 
91
  PROP_PROXY_USER_PASSWD,
 
92
  PROP_FILE_NAME,
 
93
  PROP_TIMEOUT,
 
94
  PROP_QOS_DSCP,
 
95
  PROP_ACCEPT_SELF_SIGNED,
 
96
  PROP_USE_CONTENT_LENGTH,
 
97
  PROP_CONTENT_TYPE
 
98
};
 
99
static gboolean proxy_auth = FALSE;
 
100
static gboolean proxy_conn_established = FALSE;
 
101
 
 
102
/* Object class function declarations */
 
103
static void gst_curl_sink_finalize (GObject * gobject);
 
104
static void gst_curl_sink_set_property (GObject * object, guint prop_id,
 
105
    const GValue * value, GParamSpec * pspec);
 
106
static void gst_curl_sink_get_property (GObject * object, guint prop_id,
 
107
    GValue * value, GParamSpec * pspec);
 
108
 
 
109
/* BaseSink class function declarations */
 
110
static GstFlowReturn gst_curl_sink_render (GstBaseSink * bsink,
 
111
    GstBuffer * buf);
 
112
static gboolean gst_curl_sink_event (GstBaseSink * bsink, GstEvent * event);
 
113
static gboolean gst_curl_sink_start (GstBaseSink * bsink);
 
114
static gboolean gst_curl_sink_stop (GstBaseSink * bsink);
 
115
static gboolean gst_curl_sink_unlock (GstBaseSink * bsink);
 
116
static gboolean gst_curl_sink_unlock_stop (GstBaseSink * bsink);
 
117
 
 
118
/* private functions */
 
119
static gboolean gst_curl_sink_transfer_setup_unlocked (GstCurlSink * sink);
 
120
static gboolean gst_curl_sink_transfer_set_options_unlocked (GstCurlSink
 
121
    * sink);
 
122
static gboolean gst_curl_sink_transfer_start_unlocked (GstCurlSink * sink);
 
123
static void gst_curl_sink_transfer_cleanup (GstCurlSink * sink);
 
124
static size_t gst_curl_sink_transfer_read_cb (void *ptr, size_t size,
 
125
    size_t nmemb, void *stream);
 
126
static size_t gst_curl_sink_transfer_write_cb (void *ptr, size_t size,
 
127
    size_t nmemb, void *stream);
 
128
static GstFlowReturn gst_curl_sink_handle_transfer (GstCurlSink * sink);
 
129
static int gst_curl_sink_transfer_socket_cb (void *clientp,
 
130
    curl_socket_t curlfd, curlsocktype purpose);
 
131
static gpointer gst_curl_sink_transfer_thread_func (gpointer data);
 
132
static CURLcode gst_curl_sink_transfer_check (GstCurlSink * sink);
 
133
static gint gst_curl_sink_setup_dscp_unlocked (GstCurlSink * sink);
 
134
 
 
135
static gboolean gst_curl_sink_wait_for_data_unlocked (GstCurlSink * sink);
 
136
static void gst_curl_sink_new_file_notify_unlocked (GstCurlSink * sink);
 
137
static void gst_curl_sink_transfer_thread_notify_unlocked (GstCurlSink * sink);
 
138
static void gst_curl_sink_transfer_thread_close_unlocked (GstCurlSink * sink);
 
139
static void gst_curl_sink_wait_for_transfer_thread_to_send_unlocked (GstCurlSink
 
140
    * sink);
 
141
static void gst_curl_sink_data_sent_notify_unlocked (GstCurlSink * sink);
 
142
 
 
143
static void
 
144
_do_init (GType type)
 
145
{
 
146
  GST_DEBUG_CATEGORY_INIT (gst_curl_sink_debug, "curlsink", 0,
 
147
      "curl sink element");
 
148
}
 
149
 
 
150
GST_BOILERPLATE_FULL (GstCurlSink, gst_curl_sink, GstBaseSink,
 
151
    GST_TYPE_BASE_SINK, _do_init);
 
152
 
 
153
static void
 
154
gst_curl_sink_base_init (gpointer g_class)
 
155
{
 
156
  GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
 
157
 
 
158
  gst_element_class_add_pad_template (element_class,
 
159
      gst_static_pad_template_get (&sinktemplate));
 
160
  gst_element_class_set_details_simple (element_class,
 
161
      "Curl sink",
 
162
      "Sink/Network",
 
163
      "Upload data over the network to a server using libcurl",
 
164
      "Patricia Muscalu <patricia@axis.com>");
 
165
}
 
166
 
 
167
static void
 
168
gst_curl_sink_class_init (GstCurlSinkClass * klass)
 
169
{
 
170
  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
 
171
  GstBaseSinkClass *gstbasesink_class = (GstBaseSinkClass *) klass;
 
172
 
 
173
  GST_DEBUG_OBJECT (klass, "class_init");
 
174
 
 
175
  gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_curl_sink_event);
 
176
  gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_curl_sink_render);
 
177
  gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_curl_sink_start);
 
178
  gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_curl_sink_stop);
 
179
  gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_curl_sink_unlock);
 
180
  gstbasesink_class->unlock_stop =
 
181
      GST_DEBUG_FUNCPTR (gst_curl_sink_unlock_stop);
 
182
  gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_curl_sink_finalize);
 
183
 
 
184
  gobject_class->set_property = gst_curl_sink_set_property;
 
185
  gobject_class->get_property = gst_curl_sink_get_property;
 
186
 
 
187
  /* FIXME: check against souphttpsrc and use same names for same properties */
 
188
  g_object_class_install_property (gobject_class, PROP_LOCATION,
 
189
      g_param_spec_string ("location", "Location",
 
190
          "URI location to write to", NULL,
 
191
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
192
  g_object_class_install_property (gobject_class, PROP_USER_NAME,
 
193
      g_param_spec_string ("user", "User name",
 
194
          "User name to use for server authentication", NULL,
 
195
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
196
  g_object_class_install_property (gobject_class, PROP_USER_PASSWD,
 
197
      g_param_spec_string ("passwd", "User password",
 
198
          "User password to use for server authentication", NULL,
 
199
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
200
  g_object_class_install_property (gobject_class, PROP_PROXY,
 
201
      g_param_spec_string ("proxy", "Proxy", "HTTP proxy server URI", NULL,
 
202
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
203
  g_object_class_install_property (gobject_class, PROP_PROXY_PORT,
 
204
      g_param_spec_int ("proxy-port", "Proxy port",
 
205
          "HTTP proxy server port", 0, G_MAXINT, DEFAULT_PROXY_PORT,
 
206
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
207
  g_object_class_install_property (gobject_class, PROP_PROXY_USER_NAME,
 
208
      g_param_spec_string ("proxy-user", "Proxy user name",
 
209
          "Proxy user name to use for proxy authentication",
 
210
          NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
211
  g_object_class_install_property (gobject_class, PROP_PROXY_USER_PASSWD,
 
212
      g_param_spec_string ("proxy-passwd", "Proxy user password",
 
213
          "Proxy user password to use for proxy authentication",
 
214
          NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
215
  g_object_class_install_property (gobject_class, PROP_FILE_NAME,
 
216
      g_param_spec_string ("file-name", "Base file name",
 
217
          "The base file name for the uploaded images", NULL,
 
218
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
219
  g_object_class_install_property (gobject_class, PROP_TIMEOUT,
 
220
      g_param_spec_int ("timeout", "Timeout",
 
221
          "Number of seconds waiting to write before timeout",
 
222
          0, G_MAXINT, DEFAULT_TIMEOUT,
 
223
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
224
  g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
 
225
      g_param_spec_int ("qos-dscp",
 
226
          "QoS diff srv code point",
 
227
          "Quality of Service, differentiated services code point (0 default)",
 
228
          DSCP_MIN, DSCP_MAX, DEFAULT_QOS_DSCP,
 
229
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
230
  g_object_class_install_property (gobject_class, PROP_ACCEPT_SELF_SIGNED,
 
231
      g_param_spec_boolean ("accept-self-signed",
 
232
          "Accept self-signed certificates",
 
233
          "Accept self-signed SSL/TLS certificates",
 
234
          DEFAULT_ACCEPT_SELF_SIGNED,
 
235
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
236
  g_object_class_install_property (gobject_class, PROP_USE_CONTENT_LENGTH,
 
237
      g_param_spec_boolean ("use-content-length", "Use content length header",
 
238
          "Use the Content-Length HTTP header instead of "
 
239
          "Transfer-Encoding header", DEFAULT_USE_CONTENT_LENGTH,
 
240
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
241
  g_object_class_install_property (gobject_class, PROP_CONTENT_TYPE,
 
242
      g_param_spec_string ("content-type", "Content type",
 
243
          "The mime type of the body of the request", NULL,
 
244
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
245
}
 
246
 
 
247
static void
 
248
gst_curl_sink_init (GstCurlSink * sink, GstCurlSinkClass * klass)
 
249
{
 
250
  sink->transfer_buf = g_malloc (sizeof (TransferBuffer));
 
251
  sink->transfer_cond = g_malloc (sizeof (TransferCondition));
 
252
  sink->transfer_cond->cond = g_cond_new ();
 
253
  sink->transfer_cond->data_sent = FALSE;
 
254
  sink->transfer_cond->data_available = FALSE;
 
255
  sink->timeout = DEFAULT_TIMEOUT;
 
256
  sink->proxy_port = DEFAULT_PROXY_PORT;
 
257
  sink->qos_dscp = DEFAULT_QOS_DSCP;
 
258
  sink->url = g_strdup (DEFAULT_URL);
 
259
  sink->header_list = NULL;
 
260
  sink->accept_self_signed = DEFAULT_ACCEPT_SELF_SIGNED;
 
261
  sink->use_content_length = DEFAULT_USE_CONTENT_LENGTH;
 
262
  sink->transfer_thread_close = FALSE;
 
263
  sink->new_file = TRUE;
 
264
  sink->proxy_headers_set = FALSE;
 
265
  sink->content_type = NULL;
 
266
}
 
267
 
 
268
static void
 
269
gst_curl_sink_finalize (GObject * gobject)
 
270
{
 
271
  GstCurlSink *this = GST_CURL_SINK (gobject);
 
272
 
 
273
  GST_DEBUG ("finalizing curlsink");
 
274
  if (this->transfer_thread != NULL) {
 
275
    g_thread_join (this->transfer_thread);
 
276
  }
 
277
 
 
278
  gst_curl_sink_transfer_cleanup (this);
 
279
  g_cond_free (this->transfer_cond->cond);
 
280
  g_free (this->transfer_cond);
 
281
 
 
282
  g_free (this->transfer_buf);
 
283
 
 
284
  g_free (this->url);
 
285
  g_free (this->user);
 
286
  g_free (this->passwd);
 
287
  g_free (this->proxy);
 
288
  g_free (this->proxy_user);
 
289
  g_free (this->proxy_passwd);
 
290
  g_free (this->file_name);
 
291
  g_free (this->content_type);
 
292
 
 
293
  if (this->header_list) {
 
294
    curl_slist_free_all (this->header_list);
 
295
    this->header_list = NULL;
 
296
  }
 
297
 
 
298
  if (this->fdset != NULL) {
 
299
    gst_poll_free (this->fdset);
 
300
    this->fdset = NULL;
 
301
  }
 
302
  G_OBJECT_CLASS (parent_class)->finalize (gobject);
 
303
}
 
304
 
 
305
static GstFlowReturn
 
306
gst_curl_sink_render (GstBaseSink * bsink, GstBuffer * buf)
 
307
{
 
308
  GstCurlSink *sink = GST_CURL_SINK (bsink);
 
309
  guint8 *data;
 
310
  size_t size;
 
311
  GstFlowReturn ret;
 
312
 
 
313
  GST_LOG ("enter render");
 
314
 
 
315
  sink = GST_CURL_SINK (bsink);
 
316
  data = GST_BUFFER_DATA (buf);
 
317
  size = GST_BUFFER_SIZE (buf);
 
318
 
 
319
  if (sink->content_type == NULL) {
 
320
    GstCaps *caps;
 
321
    GstStructure *structure;
 
322
    const gchar *mime_type;
 
323
 
 
324
    caps = buf->caps;
 
325
    structure = gst_caps_get_structure (caps, 0);
 
326
    mime_type = gst_structure_get_name (structure);
 
327
    sink->content_type = g_strdup (mime_type);
 
328
  }
 
329
 
 
330
  GST_OBJECT_LOCK (sink);
 
331
 
 
332
  /* check if the transfer thread has encountered problems while the
 
333
   * pipeline thread was working elsewhere */
 
334
  if (sink->flow_ret != GST_FLOW_OK) {
 
335
    goto done;
 
336
  }
 
337
 
 
338
  g_assert (sink->transfer_cond->data_available == FALSE);
 
339
 
 
340
  /* if there is no transfer thread created, lets create one */
 
341
  if (sink->transfer_thread == NULL) {
 
342
    if (!gst_curl_sink_transfer_start_unlocked (sink)) {
 
343
      sink->flow_ret = GST_FLOW_ERROR;
 
344
      goto done;
 
345
    }
 
346
  }
 
347
 
 
348
  /* make data available for the transfer thread and notify */
 
349
  sink->transfer_buf->ptr = data;
 
350
  sink->transfer_buf->len = size;
 
351
  sink->transfer_buf->offset = 0;
 
352
  gst_curl_sink_transfer_thread_notify_unlocked (sink);
 
353
 
 
354
  /* wait for the transfer thread to send the data. This will be notified
 
355
   * either when transfer is completed by the curl read callback or by
 
356
   * the thread function if an error has occured. */
 
357
  gst_curl_sink_wait_for_transfer_thread_to_send_unlocked (sink);
 
358
 
 
359
done:
 
360
  ret = sink->flow_ret;
 
361
  GST_OBJECT_UNLOCK (sink);
 
362
 
 
363
  GST_LOG ("exit render");
 
364
 
 
365
  return ret;
 
366
}
 
367
 
 
368
static gboolean
 
369
gst_curl_sink_event (GstBaseSink * bsink, GstEvent * event)
 
370
{
 
371
  GstCurlSink *sink = GST_CURL_SINK (bsink);
 
372
 
 
373
  switch (event->type) {
 
374
    case GST_EVENT_EOS:
 
375
      GST_DEBUG_OBJECT (sink, "received EOS");
 
376
      GST_OBJECT_LOCK (sink);
 
377
      gst_curl_sink_transfer_thread_close_unlocked (sink);
 
378
      GST_OBJECT_UNLOCK (sink);
 
379
      if (sink->transfer_thread != NULL) {
 
380
        g_thread_join (sink->transfer_thread);
 
381
        sink->transfer_thread = NULL;
 
382
      }
 
383
      break;
 
384
    default:
 
385
      break;
 
386
  }
 
387
  return TRUE;
 
388
}
 
389
 
 
390
static gboolean
 
391
gst_curl_sink_start (GstBaseSink * bsink)
 
392
{
 
393
  GstCurlSink *sink;
 
394
 
 
395
  sink = GST_CURL_SINK (bsink);
 
396
 
 
397
  if ((sink->fdset = gst_poll_new (TRUE)) == NULL) {
 
398
    GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ_WRITE,
 
399
        ("gst_poll_new failed: %s", g_strerror (errno)), (NULL));
 
400
    return FALSE;
 
401
  }
 
402
 
 
403
  return TRUE;
 
404
}
 
405
 
 
406
static gboolean
 
407
gst_curl_sink_stop (GstBaseSink * bsink)
 
408
{
 
409
  GstCurlSink *sink = GST_CURL_SINK (bsink);
 
410
 
 
411
  GST_OBJECT_LOCK (sink);
 
412
  gst_curl_sink_transfer_thread_close_unlocked (sink);
 
413
  GST_OBJECT_UNLOCK (sink);
 
414
  if (sink->fdset != NULL) {
 
415
    gst_poll_free (sink->fdset);
 
416
    sink->fdset = NULL;
 
417
  }
 
418
 
 
419
  return TRUE;
 
420
}
 
421
 
 
422
static gboolean
 
423
gst_curl_sink_unlock (GstBaseSink * bsink)
 
424
{
 
425
  GstCurlSink *sink;
 
426
 
 
427
  sink = GST_CURL_SINK (bsink);
 
428
 
 
429
  GST_LOG_OBJECT (sink, "Flushing");
 
430
  gst_poll_set_flushing (sink->fdset, TRUE);
 
431
 
 
432
  return TRUE;
 
433
}
 
434
 
 
435
static gboolean
 
436
gst_curl_sink_unlock_stop (GstBaseSink * bsink)
 
437
{
 
438
  GstCurlSink *sink;
 
439
 
 
440
  sink = GST_CURL_SINK (bsink);
 
441
 
 
442
  GST_LOG_OBJECT (sink, "No longer flushing");
 
443
  gst_poll_set_flushing (sink->fdset, FALSE);
 
444
 
 
445
  return TRUE;
 
446
}
 
447
 
 
448
static void
 
449
gst_curl_sink_set_property (GObject * object, guint prop_id,
 
450
    const GValue * value, GParamSpec * pspec)
 
451
{
 
452
  GstCurlSink *sink;
 
453
  GstState cur_state;
 
454
 
 
455
  g_return_if_fail (GST_IS_CURL_SINK (object));
 
456
  sink = GST_CURL_SINK (object);
 
457
 
 
458
  gst_element_get_state (GST_ELEMENT (sink), &cur_state, NULL, 0);
 
459
  if (cur_state != GST_STATE_PLAYING && cur_state != GST_STATE_PAUSED) {
 
460
    GST_OBJECT_LOCK (sink);
 
461
 
 
462
    switch (prop_id) {
 
463
      case PROP_LOCATION:
 
464
        g_free (sink->url);
 
465
        sink->url = g_value_dup_string (value);
 
466
        GST_DEBUG_OBJECT (sink, "url set to %s", sink->url);
 
467
        break;
 
468
      case PROP_USER_NAME:
 
469
        g_free (sink->user);
 
470
        sink->user = g_value_dup_string (value);
 
471
        GST_DEBUG_OBJECT (sink, "user set to %s", sink->user);
 
472
        break;
 
473
      case PROP_USER_PASSWD:
 
474
        g_free (sink->passwd);
 
475
        sink->passwd = g_value_dup_string (value);
 
476
        GST_DEBUG_OBJECT (sink, "passwd set to %s", sink->passwd);
 
477
        break;
 
478
      case PROP_PROXY:
 
479
        g_free (sink->proxy);
 
480
        sink->proxy = g_value_dup_string (value);
 
481
        GST_DEBUG_OBJECT (sink, "proxy set to %s", sink->proxy);
 
482
        break;
 
483
      case PROP_PROXY_PORT:
 
484
        sink->proxy_port = g_value_get_int (value);
 
485
        GST_DEBUG_OBJECT (sink, "proxy port set to %d", sink->proxy_port);
 
486
        break;
 
487
      case PROP_PROXY_USER_NAME:
 
488
        g_free (sink->proxy_user);
 
489
        sink->proxy_user = g_value_dup_string (value);
 
490
        GST_DEBUG_OBJECT (sink, "proxy user set to %s", sink->proxy_user);
 
491
        break;
 
492
      case PROP_PROXY_USER_PASSWD:
 
493
        g_free (sink->proxy_passwd);
 
494
        sink->proxy_passwd = g_value_dup_string (value);
 
495
        GST_DEBUG_OBJECT (sink, "proxy password set to %s", sink->proxy_passwd);
 
496
        break;
 
497
      case PROP_FILE_NAME:
 
498
        g_free (sink->file_name);
 
499
        sink->file_name = g_value_dup_string (value);
 
500
        GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name);
 
501
        break;
 
502
      case PROP_TIMEOUT:
 
503
        sink->timeout = g_value_get_int (value);
 
504
        GST_DEBUG_OBJECT (sink, "timeout set to %d", sink->timeout);
 
505
        break;
 
506
      case PROP_QOS_DSCP:
 
507
        sink->qos_dscp = g_value_get_int (value);
 
508
        gst_curl_sink_setup_dscp_unlocked (sink);
 
509
        GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp);
 
510
        break;
 
511
      case PROP_ACCEPT_SELF_SIGNED:
 
512
        sink->accept_self_signed = g_value_get_boolean (value);
 
513
        GST_DEBUG_OBJECT (sink, "accept_self_signed set to %d",
 
514
            sink->accept_self_signed);
 
515
        break;
 
516
      case PROP_USE_CONTENT_LENGTH:
 
517
        sink->use_content_length = g_value_get_boolean (value);
 
518
        GST_DEBUG_OBJECT (sink, "use_content_length set to %d",
 
519
            sink->use_content_length);
 
520
        break;
 
521
      case PROP_CONTENT_TYPE:
 
522
        g_free (sink->content_type);
 
523
        sink->content_type = g_value_dup_string (value);
 
524
        GST_DEBUG_OBJECT (sink, "content type set to %s", sink->content_type);
 
525
        break;
 
526
      default:
 
527
        GST_DEBUG_OBJECT (sink, "invalid property id %d", prop_id);
 
528
        break;
 
529
    }
 
530
 
 
531
    GST_OBJECT_UNLOCK (sink);
 
532
 
 
533
    return;
 
534
  }
 
535
 
 
536
  /* in PLAYING or PAUSED state */
 
537
  GST_OBJECT_LOCK (sink);
 
538
 
 
539
  switch (prop_id) {
 
540
    case PROP_FILE_NAME:
 
541
      g_free (sink->file_name);
 
542
      sink->file_name = g_value_dup_string (value);
 
543
      GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name);
 
544
      gst_curl_sink_new_file_notify_unlocked (sink);
 
545
      break;
 
546
    case PROP_TIMEOUT:
 
547
      sink->timeout = g_value_get_int (value);
 
548
      GST_DEBUG_OBJECT (sink, "timeout set to %d", sink->timeout);
 
549
      break;
 
550
    case PROP_QOS_DSCP:
 
551
      sink->qos_dscp = g_value_get_int (value);
 
552
      gst_curl_sink_setup_dscp_unlocked (sink);
 
553
      GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp);
 
554
      break;
 
555
    case PROP_CONTENT_TYPE:
 
556
      g_free (sink->content_type);
 
557
      sink->content_type = g_value_dup_string (value);
 
558
      GST_DEBUG_OBJECT (sink, "content type set to %s", sink->content_type);
 
559
      break;
 
560
    default:
 
561
      GST_WARNING_OBJECT (sink, "cannot set property when PLAYING");
 
562
      break;
 
563
  }
 
564
 
 
565
  GST_OBJECT_UNLOCK (sink);
 
566
}
 
567
 
 
568
static void
 
569
gst_curl_sink_get_property (GObject * object, guint prop_id,
 
570
    GValue * value, GParamSpec * pspec)
 
571
{
 
572
  GstCurlSink *sink;
 
573
 
 
574
  g_return_if_fail (GST_IS_CURL_SINK (object));
 
575
  sink = GST_CURL_SINK (object);
 
576
 
 
577
  switch (prop_id) {
 
578
    case PROP_LOCATION:
 
579
      g_value_set_string (value, sink->url);
 
580
      break;
 
581
    case PROP_USER_NAME:
 
582
      g_value_set_string (value, sink->user);
 
583
      break;
 
584
    case PROP_USER_PASSWD:
 
585
      g_value_set_string (value, sink->passwd);
 
586
      break;
 
587
    case PROP_PROXY:
 
588
      g_value_set_string (value, sink->proxy);
 
589
      break;
 
590
    case PROP_PROXY_PORT:
 
591
      g_value_set_int (value, sink->proxy_port);
 
592
      break;
 
593
    case PROP_PROXY_USER_NAME:
 
594
      g_value_set_string (value, sink->proxy_user);
 
595
      break;
 
596
    case PROP_PROXY_USER_PASSWD:
 
597
      g_value_set_string (value, sink->proxy_passwd);
 
598
      break;
 
599
    case PROP_FILE_NAME:
 
600
      g_value_set_string (value, sink->file_name);
 
601
      break;
 
602
    case PROP_TIMEOUT:
 
603
      g_value_set_int (value, sink->timeout);
 
604
      break;
 
605
    case PROP_QOS_DSCP:
 
606
      g_value_set_int (value, sink->qos_dscp);
 
607
      break;
 
608
    case PROP_ACCEPT_SELF_SIGNED:
 
609
      g_value_set_boolean (value, sink->accept_self_signed);
 
610
      break;
 
611
    case PROP_USE_CONTENT_LENGTH:
 
612
      g_value_set_boolean (value, sink->use_content_length);
 
613
      break;
 
614
    case PROP_CONTENT_TYPE:
 
615
      g_value_set_string (value, sink->content_type);
 
616
      break;
 
617
    default:
 
618
      GST_DEBUG_OBJECT (sink, "invalid property id");
 
619
      break;
 
620
  }
 
621
}
 
622
 
 
623
static void
 
624
gst_curl_sink_set_http_header_unlocked (GstCurlSink * sink)
 
625
{
 
626
  gchar *tmp;
 
627
 
 
628
  if (sink->header_list) {
 
629
    curl_slist_free_all (sink->header_list);
 
630
    sink->header_list = NULL;
 
631
  }
 
632
 
 
633
  if (proxy_auth && !sink->proxy_headers_set && !proxy_conn_established) {
 
634
    sink->header_list =
 
635
        curl_slist_append (sink->header_list, "Content-Length: 0");
 
636
    sink->proxy_headers_set = TRUE;
 
637
    goto set_headers;
 
638
  }
 
639
  if (sink->use_content_length) {
 
640
    /* if content length is used we assume that every buffer is one
 
641
     * entire file, which is the case when uploading several jpegs */
 
642
    tmp = g_strdup_printf ("Content-Length: %d", (int) sink->transfer_buf->len);
 
643
    sink->header_list = curl_slist_append (sink->header_list, tmp);
 
644
    g_free (tmp);
 
645
  } else {
 
646
    /* when sending a POST request to a HTTP 1.1 server, you can send data
 
647
     * without knowing the size before starting the POST if you use chunked
 
648
     * encoding */
 
649
    sink->header_list = curl_slist_append (sink->header_list,
 
650
        "Transfer-Encoding: chunked");
 
651
  }
 
652
 
 
653
  tmp = g_strdup_printf ("Content-Type: %s", sink->content_type);
 
654
  sink->header_list = curl_slist_append (sink->header_list, tmp);
 
655
  g_free (tmp);
 
656
 
 
657
set_headers:
 
658
 
 
659
  tmp = g_strdup_printf ("Content-Disposition: attachment; filename="
 
660
      "\"%s\"", sink->file_name);
 
661
  sink->header_list = curl_slist_append (sink->header_list, tmp);
 
662
  g_free (tmp);
 
663
  curl_easy_setopt (sink->curl, CURLOPT_HTTPHEADER, sink->header_list);
 
664
}
 
665
 
 
666
static gboolean
 
667
gst_curl_sink_transfer_set_options_unlocked (GstCurlSink * sink)
 
668
{
 
669
#ifdef DEBUG
 
670
  curl_easy_setopt (sink->curl, CURLOPT_VERBOSE, 1);
 
671
#endif
 
672
 
 
673
  curl_easy_setopt (sink->curl, CURLOPT_URL, sink->url);
 
674
  curl_easy_setopt (sink->curl, CURLOPT_CONNECTTIMEOUT, sink->timeout);
 
675
 
 
676
  curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTDATA, sink);
 
677
  curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTFUNCTION,
 
678
      gst_curl_sink_transfer_socket_cb);
 
679
 
 
680
  if (sink->user != NULL && strlen (sink->user)) {
 
681
    curl_easy_setopt (sink->curl, CURLOPT_USERNAME, sink->user);
 
682
    curl_easy_setopt (sink->curl, CURLOPT_PASSWORD, sink->passwd);
 
683
    curl_easy_setopt (sink->curl, CURLOPT_HTTPAUTH, CURLAUTH_ANY);
 
684
  }
 
685
 
 
686
  if (sink->accept_self_signed && g_str_has_prefix (sink->url, "https")) {
 
687
    /* TODO verify the authenticity of the peer's certificate */
 
688
    curl_easy_setopt (sink->curl, CURLOPT_SSL_VERIFYPEER, 0L);
 
689
    /* TODO check the servers's claimed identity */
 
690
    curl_easy_setopt (sink->curl, CURLOPT_SSL_VERIFYHOST, 0L);
 
691
  }
 
692
 
 
693
  /* proxy settings */
 
694
  if (sink->proxy != NULL && strlen (sink->proxy)) {
 
695
    if (curl_easy_setopt (sink->curl, CURLOPT_PROXY, sink->proxy)
 
696
        != CURLE_OK) {
 
697
      return FALSE;
 
698
    }
 
699
    if (curl_easy_setopt (sink->curl, CURLOPT_PROXYPORT, sink->proxy_port)
 
700
        != CURLE_OK) {
 
701
      return FALSE;
 
702
    }
 
703
    if (sink->proxy_user != NULL &&
 
704
        strlen (sink->proxy_user) &&
 
705
        sink->proxy_passwd != NULL && strlen (sink->proxy_passwd)) {
 
706
      curl_easy_setopt (sink->curl, CURLOPT_PROXYUSERNAME, sink->proxy_user);
 
707
      curl_easy_setopt (sink->curl, CURLOPT_PROXYPASSWORD, sink->proxy_passwd);
 
708
      curl_easy_setopt (sink->curl, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
 
709
      proxy_auth = TRUE;
 
710
    }
 
711
    /* tunnel all operations through a given HTTP proxy */
 
712
    if (curl_easy_setopt (sink->curl, CURLOPT_HTTPPROXYTUNNEL, 1L)
 
713
        != CURLE_OK) {
 
714
      return FALSE;
 
715
    }
 
716
  }
 
717
 
 
718
  /* POST options */
 
719
  curl_easy_setopt (sink->curl, CURLOPT_POST, 1L);
 
720
 
 
721
  curl_easy_setopt (sink->curl, CURLOPT_READFUNCTION,
 
722
      gst_curl_sink_transfer_read_cb);
 
723
  curl_easy_setopt (sink->curl, CURLOPT_READDATA, sink);
 
724
  curl_easy_setopt (sink->curl, CURLOPT_WRITEFUNCTION,
 
725
      gst_curl_sink_transfer_write_cb);
 
726
 
 
727
  return TRUE;
 
728
}
 
729
 
 
730
static size_t
 
731
gst_curl_sink_transfer_read_cb (void *curl_ptr, size_t size, size_t nmemb,
 
732
    void *stream)
 
733
{
 
734
  GstCurlSink *sink;
 
735
  TransferBuffer *buffer;
 
736
  size_t max_bytes_to_send;
 
737
  guint buf_len;
 
738
 
 
739
  sink = (GstCurlSink *) stream;
 
740
 
 
741
  /* wait for data to come available, if new file or thread close is set
 
742
   * then zero will be returned to indicate end of current transfer */
 
743
  GST_OBJECT_LOCK (sink);
 
744
  if (gst_curl_sink_wait_for_data_unlocked (sink) == FALSE) {
 
745
    GST_LOG ("returning 0, no more data to send in this file");
 
746
    GST_OBJECT_UNLOCK (sink);
 
747
    return 0;
 
748
  }
 
749
  GST_OBJECT_UNLOCK (sink);
 
750
 
 
751
 
 
752
  max_bytes_to_send = size * nmemb;
 
753
  buffer = sink->transfer_buf;
 
754
 
 
755
  buf_len = buffer->len;
 
756
  GST_LOG ("write buf len=%" G_GSIZE_FORMAT ", offset=%" G_GSIZE_FORMAT,
 
757
      buffer->len, buffer->offset);
 
758
 
 
759
  /* more data in buffer */
 
760
  if (buffer->len > 0) {
 
761
    size_t bytes_to_send = MIN (max_bytes_to_send, buf_len);
 
762
 
 
763
    memcpy ((guint8 *) curl_ptr, buffer->ptr + buffer->offset, bytes_to_send);
 
764
 
 
765
    buffer->offset = buffer->offset + bytes_to_send;
 
766
    buffer->len = buffer->len - bytes_to_send;
 
767
 
 
768
    /* the last data chunk */
 
769
    if (bytes_to_send == buf_len) {
 
770
      buffer->ptr = NULL;
 
771
      buffer->offset = 0;
 
772
      buffer->len = 0;
 
773
      GST_OBJECT_LOCK (sink);
 
774
      gst_curl_sink_data_sent_notify_unlocked (sink);
 
775
      GST_OBJECT_UNLOCK (sink);
 
776
    }
 
777
 
 
778
    GST_LOG ("sent : %" G_GSIZE_FORMAT, bytes_to_send);
 
779
 
 
780
    return bytes_to_send;
 
781
  } else {
 
782
    GST_WARNING ("got zero-length buffer");
 
783
    return 0;
 
784
  }
 
785
}
 
786
 
 
787
static size_t
 
788
gst_curl_sink_transfer_write_cb (void G_GNUC_UNUSED * ptr, size_t size,
 
789
    size_t nmemb, void G_GNUC_UNUSED * stream)
 
790
{
 
791
  size_t realsize = size * nmemb;
 
792
 
 
793
  GST_DEBUG ("response %s", (gchar *) ptr);
 
794
  return realsize;
 
795
}
 
796
 
 
797
static CURLcode
 
798
gst_curl_sink_transfer_check (GstCurlSink * sink)
 
799
{
 
800
  CURLcode code = CURLE_OK;
 
801
  CURL *easy;
 
802
  CURLMsg *msg;
 
803
  gint msgs_left;
 
804
  gchar *eff_url = NULL;
 
805
 
 
806
  do {
 
807
    easy = NULL;
 
808
    while ((msg = curl_multi_info_read (sink->multi_handle, &msgs_left))) {
 
809
      if (msg->msg == CURLMSG_DONE) {
 
810
        easy = msg->easy_handle;
 
811
        code = msg->data.result;
 
812
        break;
 
813
      }
 
814
    }
 
815
    if (easy) {
 
816
      curl_easy_getinfo (easy, CURLINFO_EFFECTIVE_URL, &eff_url);
 
817
      GST_DEBUG ("transfer done %s (%s-%d)\n", eff_url,
 
818
          curl_easy_strerror (code), code);
 
819
    }
 
820
  } while (easy);
 
821
 
 
822
  return code;
 
823
}
 
824
 
 
825
static GstFlowReturn
 
826
gst_curl_sink_handle_transfer (GstCurlSink * sink)
 
827
{
 
828
  gint retval;
 
829
  gint running_handles;
 
830
  gint timeout;
 
831
  CURLMcode m_code;
 
832
  CURLcode e_code;
 
833
  glong resp = -1;
 
834
  glong resp_proxy = -1;
 
835
 
 
836
  GST_OBJECT_LOCK (sink);
 
837
  timeout = sink->timeout;
 
838
  GST_OBJECT_UNLOCK (sink);
 
839
 
 
840
  /* Receiving CURLM_CALL_MULTI_PERFORM means that libcurl may have more data
 
841
     available to send or receive - call simply curl_multi_perform before
 
842
     poll() on more actions */
 
843
  do {
 
844
    m_code = curl_multi_perform (sink->multi_handle, &running_handles);
 
845
  } while (m_code == CURLM_CALL_MULTI_PERFORM);
 
846
 
 
847
  while (running_handles && (m_code == CURLM_OK)) {
 
848
    if (!proxy_conn_established && (resp_proxy != RESPONSE_CONNECT_PROXY)
 
849
        && proxy_auth) {
 
850
      curl_easy_getinfo (sink->curl, CURLINFO_HTTP_CONNECTCODE, &resp_proxy);
 
851
      if ((resp_proxy == RESPONSE_CONNECT_PROXY)) {
 
852
        GST_LOG ("received HTTP/1.0 200 Connection Established");
 
853
        /* Workaround: redefine HTTP headers before connecting to HTTP server.
 
854
         * When talking to proxy, the Content-Length: 0 is send with the request.
 
855
         */
 
856
        curl_multi_remove_handle (sink->multi_handle, sink->curl);
 
857
        gst_curl_sink_set_http_header_unlocked (sink);
 
858
        curl_multi_add_handle (sink->multi_handle, sink->curl);
 
859
        proxy_conn_established = TRUE;
 
860
      }
 
861
    }
 
862
 
 
863
    retval = gst_poll_wait (sink->fdset, timeout * GST_SECOND);
 
864
    if (G_UNLIKELY (retval == -1)) {
 
865
      if (errno == EAGAIN || errno == EINTR) {
 
866
        GST_DEBUG_OBJECT (sink, "interrupted by signal");
 
867
      } else if (errno == EBUSY) {
 
868
        goto poll_stopped;
 
869
      } else {
 
870
        goto poll_error;
 
871
      }
 
872
    } else if (G_UNLIKELY (retval == 0)) {
 
873
      GST_DEBUG ("timeout");
 
874
      goto poll_timeout;
 
875
    }
 
876
 
 
877
    /* readable/writable sockets */
 
878
    do {
 
879
      m_code = curl_multi_perform (sink->multi_handle, &running_handles);
 
880
    } while (m_code == CURLM_CALL_MULTI_PERFORM);
 
881
 
 
882
    if (resp != RESPONSE_100_CONTINUE) {
 
883
      curl_easy_getinfo (sink->curl, CURLINFO_RESPONSE_CODE, &resp);
 
884
    }
 
885
  }
 
886
 
 
887
  if (resp != RESPONSE_100_CONTINUE) {
 
888
    /* No 100 Continue response received. Using POST with HTTP 1.1 implies
 
889
     * the use of a "Expect: 100-continue" header. If the server doesn't
 
890
     * send HTTP/1.1 100 Continue, libcurl will not call transfer_read_cb
 
891
     * in order to send POST data.
 
892
     */
 
893
    goto no_100_continue_response;
 
894
  }
 
895
 
 
896
  if (m_code != CURLM_OK) {
 
897
    goto curl_multi_error;
 
898
  }
 
899
 
 
900
  /* problems still might have occurred on individual transfers even when
 
901
   * curl_multi_perform returns CURLM_OK */
 
902
  if ((e_code = gst_curl_sink_transfer_check (sink)) != CURLE_OK) {
 
903
    goto curl_easy_error;
 
904
  }
 
905
 
 
906
  /* check response code */
 
907
  curl_easy_getinfo (sink->curl, CURLINFO_RESPONSE_CODE, &resp);
 
908
  GST_DEBUG_OBJECT (sink, "response code: %ld", resp);
 
909
  if (resp < 200 || resp >= 300) {
 
910
    goto response_error;
 
911
  }
 
912
 
 
913
  return GST_FLOW_OK;
 
914
 
 
915
poll_error:
 
916
  {
 
917
    GST_DEBUG_OBJECT (sink, "poll failed: %s", g_strerror (errno));
 
918
    GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("poll failed"), (NULL));
 
919
    return GST_FLOW_ERROR;
 
920
  }
 
921
 
 
922
poll_stopped:
 
923
  {
 
924
    GST_DEBUG_OBJECT (sink, "poll stopped");
 
925
    GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("poll stopped"), (NULL));
 
926
    return GST_FLOW_ERROR;
 
927
  }
 
928
 
 
929
poll_timeout:
 
930
  {
 
931
    GST_DEBUG_OBJECT (sink, "poll timed out");
 
932
    GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("poll timed out"), (NULL));
 
933
    return GST_FLOW_ERROR;
 
934
  }
 
935
 
 
936
curl_multi_error:
 
937
  {
 
938
    GST_DEBUG_OBJECT (sink, "curl multi error");
 
939
    GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("%s",
 
940
            curl_multi_strerror (m_code)), (NULL));
 
941
    return GST_FLOW_ERROR;
 
942
  }
 
943
 
 
944
curl_easy_error:
 
945
  {
 
946
    GST_DEBUG_OBJECT (sink, "curl easy error");
 
947
    GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("%s",
 
948
            curl_easy_strerror (e_code)), (NULL));
 
949
    return GST_FLOW_ERROR;
 
950
  }
 
951
 
 
952
no_100_continue_response:
 
953
  {
 
954
    GST_DEBUG_OBJECT (sink, "100 continue response missing");
 
955
    GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("100 continue response missing"),
 
956
        (NULL));
 
957
    return GST_FLOW_ERROR;
 
958
  }
 
959
 
 
960
response_error:
 
961
  {
 
962
    GST_DEBUG_OBJECT (sink, "response error");
 
963
    GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("response error: %ld", resp),
 
964
        (NULL));
 
965
    return GST_FLOW_ERROR;
 
966
  }
 
967
}
 
968
 
 
969
/* This function gets called by libcurl after the socket() call but before
 
970
 * the connect() call. */
 
971
static int
 
972
gst_curl_sink_transfer_socket_cb (void *clientp, curl_socket_t curlfd,
 
973
    curlsocktype G_GNUC_UNUSED purpose)
 
974
{
 
975
  GstCurlSink *sink;
 
976
  gboolean ret = TRUE;
 
977
 
 
978
  sink = (GstCurlSink *) clientp;
 
979
 
 
980
  g_assert (sink);
 
981
 
 
982
  if (curlfd < 0) {
 
983
    /* signal an unrecoverable error to the library which will close the socket
 
984
       and return CURLE_COULDNT_CONNECT
 
985
     */
 
986
    return 1;
 
987
  }
 
988
 
 
989
  gst_poll_fd_init (&sink->fd);
 
990
  sink->fd.fd = curlfd;
 
991
 
 
992
  ret = ret && gst_poll_add_fd (sink->fdset, &sink->fd);
 
993
  ret = ret && gst_poll_fd_ctl_write (sink->fdset, &sink->fd, TRUE);
 
994
  ret = ret && gst_poll_fd_ctl_read (sink->fdset, &sink->fd, TRUE);
 
995
  GST_DEBUG ("fd: %d", sink->fd.fd);
 
996
  GST_OBJECT_LOCK (sink);
 
997
  gst_curl_sink_setup_dscp_unlocked (sink);
 
998
  GST_OBJECT_UNLOCK (sink);
 
999
 
 
1000
  /* success */
 
1001
  if (ret) {
 
1002
    return 0;
 
1003
  } else {
 
1004
    return 1;
 
1005
  }
 
1006
}
 
1007
 
 
1008
static gboolean
 
1009
gst_curl_sink_transfer_start_unlocked (GstCurlSink * sink)
 
1010
{
 
1011
  GError *error = NULL;
 
1012
  gboolean ret = TRUE;
 
1013
 
 
1014
  GST_LOG ("creating transfer thread");
 
1015
  sink->transfer_thread_close = FALSE;
 
1016
  sink->new_file = TRUE;
 
1017
  sink->transfer_thread =
 
1018
      g_thread_create ((GThreadFunc) gst_curl_sink_transfer_thread_func, sink,
 
1019
      TRUE, &error);
 
1020
 
 
1021
  if (sink->transfer_thread == NULL || error != NULL) {
 
1022
    ret = FALSE;
 
1023
    if (error) {
 
1024
      GST_ERROR_OBJECT (sink, "could not create thread %s", error->message);
 
1025
      g_error_free (error);
 
1026
    } else {
 
1027
      GST_ERROR_OBJECT (sink, "could not create thread for unknown reason");
 
1028
    }
 
1029
  }
 
1030
 
 
1031
  return ret;
 
1032
}
 
1033
 
 
1034
static gpointer
 
1035
gst_curl_sink_transfer_thread_func (gpointer data)
 
1036
{
 
1037
  GstCurlSink *sink = (GstCurlSink *) data;
 
1038
  GstFlowReturn ret = GST_FLOW_OK;
 
1039
  gboolean data_available;
 
1040
 
 
1041
  GST_LOG ("transfer thread started");
 
1042
  GST_OBJECT_LOCK (sink);
 
1043
  if (!gst_curl_sink_transfer_setup_unlocked (sink)) {
 
1044
    GST_DEBUG_OBJECT (sink, "curl setup error");
 
1045
    GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("curl setup error"), (NULL));
 
1046
    sink->flow_ret = GST_FLOW_ERROR;
 
1047
    goto done;
 
1048
  }
 
1049
 
 
1050
  while (!sink->transfer_thread_close && sink->flow_ret == GST_FLOW_OK) {
 
1051
    /* we are working on a new file, clearing flag and setting file
 
1052
     * name in http header */
 
1053
    sink->new_file = FALSE;
 
1054
 
 
1055
    /* wait for data to arrive for this new file, if we get a new file name
 
1056
     * again before getting data we will simply skip transfering anything
 
1057
     * for this file and go directly to the new file */
 
1058
    data_available = gst_curl_sink_wait_for_data_unlocked (sink);
 
1059
    if (data_available) {
 
1060
      gst_curl_sink_set_http_header_unlocked (sink);
 
1061
    }
 
1062
 
 
1063
    /* stay unlocked while handling the actual transfer */
 
1064
    GST_OBJECT_UNLOCK (sink);
 
1065
 
 
1066
    if (data_available) {
 
1067
      curl_multi_add_handle (sink->multi_handle, sink->curl);
 
1068
 
 
1069
      /* Start driving the transfer. */
 
1070
      ret = gst_curl_sink_handle_transfer (sink);
 
1071
 
 
1072
      /* easy handle will be possibly re-used for next transfer, thus it needs to
 
1073
       * be removed from the multi stack and re-added again */
 
1074
      curl_multi_remove_handle (sink->multi_handle, sink->curl);
 
1075
    }
 
1076
 
 
1077
    /* lock again before looping to check the thread closed flag */
 
1078
    GST_OBJECT_LOCK (sink);
 
1079
 
 
1080
    /* if we have transfered data, then set the return code */
 
1081
    if (data_available) {
 
1082
      sink->flow_ret = ret;
 
1083
    }
 
1084
  }
 
1085
 
 
1086
done:
 
1087
  /* if there is a flow error, always notify the render function so it
 
1088
   * can return the flow error up along the pipeline */
 
1089
  if (sink->flow_ret != GST_FLOW_OK) {
 
1090
    gst_curl_sink_data_sent_notify_unlocked (sink);
 
1091
  }
 
1092
 
 
1093
  GST_OBJECT_UNLOCK (sink);
 
1094
  GST_DEBUG ("exit thread func - transfer thread close flag: %d",
 
1095
      sink->transfer_thread_close);
 
1096
 
 
1097
  return NULL;
 
1098
}
 
1099
 
 
1100
static gboolean
 
1101
gst_curl_sink_transfer_setup_unlocked (GstCurlSink * sink)
 
1102
{
 
1103
  g_assert (sink);
 
1104
 
 
1105
  if (sink->curl == NULL) {
 
1106
    /* curl_easy_init automatically calls curl_global_init(3) */
 
1107
    if ((sink->curl = curl_easy_init ()) == NULL) {
 
1108
      g_warning ("Failed to init easy handle");
 
1109
      return FALSE;
 
1110
    }
 
1111
  }
 
1112
 
 
1113
  if (!gst_curl_sink_transfer_set_options_unlocked (sink)) {
 
1114
    g_warning ("Failed to setup easy handle");
 
1115
    GST_OBJECT_UNLOCK (sink);
 
1116
    return FALSE;
 
1117
  }
 
1118
 
 
1119
  /* init a multi stack (non-blocking interface to liburl) */
 
1120
  if (sink->multi_handle == NULL) {
 
1121
    if ((sink->multi_handle = curl_multi_init ()) == NULL) {
 
1122
      return FALSE;
 
1123
    }
 
1124
  }
 
1125
 
 
1126
  return TRUE;
 
1127
}
 
1128
 
 
1129
static void
 
1130
gst_curl_sink_transfer_cleanup (GstCurlSink * sink)
 
1131
{
 
1132
  if (sink->curl != NULL) {
 
1133
    if (sink->multi_handle != NULL) {
 
1134
      curl_multi_remove_handle (sink->multi_handle, sink->curl);
 
1135
    }
 
1136
    curl_easy_cleanup (sink->curl);
 
1137
    sink->curl = NULL;
 
1138
  }
 
1139
 
 
1140
  if (sink->multi_handle != NULL) {
 
1141
    curl_multi_cleanup (sink->multi_handle);
 
1142
    sink->multi_handle = NULL;
 
1143
  }
 
1144
}
 
1145
 
 
1146
static gboolean
 
1147
gst_curl_sink_wait_for_data_unlocked (GstCurlSink * sink)
 
1148
{
 
1149
  gboolean data_available = FALSE;
 
1150
 
 
1151
  GST_LOG ("waiting for data");
 
1152
  while (!sink->transfer_cond->data_available &&
 
1153
      !sink->transfer_thread_close && !sink->new_file) {
 
1154
    g_cond_wait (sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink));
 
1155
  }
 
1156
 
 
1157
  if (sink->transfer_thread_close) {
 
1158
    GST_LOG ("wait for data aborted due to thread close");
 
1159
  } else if (sink->new_file) {
 
1160
    GST_LOG ("wait for data aborted due to new file name");
 
1161
  } else {
 
1162
    GST_LOG ("wait for data completed");
 
1163
    data_available = TRUE;
 
1164
  }
 
1165
 
 
1166
  return data_available;
 
1167
}
 
1168
 
 
1169
static void
 
1170
gst_curl_sink_transfer_thread_notify_unlocked (GstCurlSink * sink)
 
1171
{
 
1172
  GST_LOG ("more data to send");
 
1173
  sink->transfer_cond->data_available = TRUE;
 
1174
  sink->transfer_cond->data_sent = FALSE;
 
1175
  g_cond_signal (sink->transfer_cond->cond);
 
1176
}
 
1177
 
 
1178
static void
 
1179
gst_curl_sink_new_file_notify_unlocked (GstCurlSink * sink)
 
1180
{
 
1181
  GST_LOG ("new file name");
 
1182
  sink->new_file = TRUE;
 
1183
  g_cond_signal (sink->transfer_cond->cond);
 
1184
}
 
1185
 
 
1186
static void
 
1187
gst_curl_sink_transfer_thread_close_unlocked (GstCurlSink * sink)
 
1188
{
 
1189
  GST_LOG ("setting transfer thread close flag");
 
1190
  sink->transfer_thread_close = TRUE;
 
1191
  g_cond_signal (sink->transfer_cond->cond);
 
1192
}
 
1193
 
 
1194
static void
 
1195
gst_curl_sink_wait_for_transfer_thread_to_send_unlocked (GstCurlSink * sink)
 
1196
{
 
1197
  GST_LOG ("waiting for buffer send to complete");
 
1198
 
 
1199
  /* this function should not check if the transfer thread is set to be closed
 
1200
   * since that flag only can be set by the EoS event (by the pipeline thread).
 
1201
   * This can therefore never happen while this function is running since this
 
1202
   * function also is called by the pipeline thread (in the render function) */
 
1203
  while (!sink->transfer_cond->data_sent) {
 
1204
    g_cond_wait (sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink));
 
1205
  }
 
1206
  GST_LOG ("buffer send completed");
 
1207
}
 
1208
 
 
1209
static void
 
1210
gst_curl_sink_data_sent_notify_unlocked (GstCurlSink * sink)
 
1211
{
 
1212
  GST_LOG ("transfer completed");
 
1213
  sink->transfer_cond->data_available = FALSE;
 
1214
  sink->transfer_cond->data_sent = TRUE;
 
1215
  g_cond_signal (sink->transfer_cond->cond);
 
1216
}
 
1217
 
 
1218
static gint
 
1219
gst_curl_sink_setup_dscp_unlocked (GstCurlSink * sink)
 
1220
{
 
1221
  gint tos;
 
1222
  gint af;
 
1223
  gint ret = -1;
 
1224
  union
 
1225
  {
 
1226
    struct sockaddr sa;
 
1227
    struct sockaddr_in6 sa_in6;
 
1228
    struct sockaddr_storage sa_stor;
 
1229
  } sa;
 
1230
  socklen_t slen = sizeof (sa);
 
1231
 
 
1232
  if (getsockname (sink->fd.fd, &sa.sa, &slen) < 0) {
 
1233
    GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
 
1234
    return ret;
 
1235
  }
 
1236
  af = sa.sa.sa_family;
 
1237
 
 
1238
  /* if this is an IPv4-mapped address then do IPv4 QoS */
 
1239
  if (af == AF_INET6) {
 
1240
    GST_DEBUG_OBJECT (sink, "check IP6 socket");
 
1241
    if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
 
1242
      GST_DEBUG_OBJECT (sink, "mapped to IPV4");
 
1243
      af = AF_INET;
 
1244
    }
 
1245
  }
 
1246
  /* extract and shift 6 bits of the DSCP */
 
1247
  tos = (sink->qos_dscp & 0x3f) << 2;
 
1248
 
 
1249
  switch (af) {
 
1250
    case AF_INET:
 
1251
      ret = setsockopt (sink->fd.fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos));
 
1252
      break;
 
1253
    case AF_INET6:
 
1254
#ifdef IPV6_TCLASS
 
1255
      ret = setsockopt (sink->fd.fd, IPPROTO_IPV6, IPV6_TCLASS, &tos,
 
1256
          sizeof (tos));
 
1257
      break;
 
1258
#endif
 
1259
    default:
 
1260
      GST_ERROR_OBJECT (sink, "unsupported AF");
 
1261
      break;
 
1262
  }
 
1263
  if (ret) {
 
1264
    GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));
 
1265
  }
 
1266
 
 
1267
  return ret;
 
1268
}