2
* Copyright (C) 2011 Axis Communications <dev-gstreamer@axis.com>
4
* This library is free software; you can redistribute it and/or
5
* modify it under the terms of the GNU Library General Public
6
* License as published by the Free Software Foundation; either
7
* version 2 of the License, or (at your option) any later version.
9
* This library is distributed in the hope that it will be useful,
10
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12
* Library General Public License for more details.
14
* You should have received a copy of the GNU Library General Public
15
* License along with this library; if not, write to the
16
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17
* Boston, MA 02111-1307, USA.
21
* SECTION:element-curlsink
22
* @short_description: sink that uploads data to a server using libcurl
25
* This is a network sink that uses libcurl as a client to upload data to
26
* a server (e.g. a HTTP/FTP server).
29
* <title>Example launch line (upload a JPEG file to an HTTP server)</title>
31
* gst-launch filesrc filesrc location=image.jpg ! jpegparse ! curlsink \
32
* file-name=image.jpg \
33
* location=http://192.168.0.1:8080/cgi-bin/patupload.cgi/ \
34
* user=test passwd=test \
35
* content-type=image/jpeg \
36
* use-content-length=false
45
#include <curl/curl.h>
49
#include <sys/socket.h>
50
#include <sys/types.h>
51
#include <netinet/in.h>
53
#include <netinet/ip.h>
54
#include <netinet/tcp.h>
58
#include "gstcurlsink.h"
61
#define GST_CAT_DEFAULT gst_curl_sink_debug
62
#define DEFAULT_URL "localhost:5555"
63
#define DEFAULT_TIMEOUT 30
64
#define DEFAULT_PROXY_PORT 3128
65
#define DEFAULT_QOS_DSCP 0
66
#define DEFAULT_ACCEPT_SELF_SIGNED FALSE
67
#define DEFAULT_USE_CONTENT_LENGTH FALSE
71
#define RESPONSE_100_CONTINUE 100
72
#define RESPONSE_CONNECT_PROXY 200
74
/* Plugin specific settings */
75
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
80
GST_DEBUG_CATEGORY_STATIC (gst_curl_sink_debug);
91
PROP_PROXY_USER_PASSWD,
95
PROP_ACCEPT_SELF_SIGNED,
96
PROP_USE_CONTENT_LENGTH,
99
static gboolean proxy_auth = FALSE;
100
static gboolean proxy_conn_established = FALSE;
102
/* Object class function declarations */
103
static void gst_curl_sink_finalize (GObject * gobject);
104
static void gst_curl_sink_set_property (GObject * object, guint prop_id,
105
const GValue * value, GParamSpec * pspec);
106
static void gst_curl_sink_get_property (GObject * object, guint prop_id,
107
GValue * value, GParamSpec * pspec);
109
/* BaseSink class function declarations */
110
static GstFlowReturn gst_curl_sink_render (GstBaseSink * bsink,
112
static gboolean gst_curl_sink_event (GstBaseSink * bsink, GstEvent * event);
113
static gboolean gst_curl_sink_start (GstBaseSink * bsink);
114
static gboolean gst_curl_sink_stop (GstBaseSink * bsink);
115
static gboolean gst_curl_sink_unlock (GstBaseSink * bsink);
116
static gboolean gst_curl_sink_unlock_stop (GstBaseSink * bsink);
118
/* private functions */
119
static gboolean gst_curl_sink_transfer_setup_unlocked (GstCurlSink * sink);
120
static gboolean gst_curl_sink_transfer_set_options_unlocked (GstCurlSink
122
static gboolean gst_curl_sink_transfer_start_unlocked (GstCurlSink * sink);
123
static void gst_curl_sink_transfer_cleanup (GstCurlSink * sink);
124
static size_t gst_curl_sink_transfer_read_cb (void *ptr, size_t size,
125
size_t nmemb, void *stream);
126
static size_t gst_curl_sink_transfer_write_cb (void *ptr, size_t size,
127
size_t nmemb, void *stream);
128
static GstFlowReturn gst_curl_sink_handle_transfer (GstCurlSink * sink);
129
static int gst_curl_sink_transfer_socket_cb (void *clientp,
130
curl_socket_t curlfd, curlsocktype purpose);
131
static gpointer gst_curl_sink_transfer_thread_func (gpointer data);
132
static CURLcode gst_curl_sink_transfer_check (GstCurlSink * sink);
133
static gint gst_curl_sink_setup_dscp_unlocked (GstCurlSink * sink);
135
static gboolean gst_curl_sink_wait_for_data_unlocked (GstCurlSink * sink);
136
static void gst_curl_sink_new_file_notify_unlocked (GstCurlSink * sink);
137
static void gst_curl_sink_transfer_thread_notify_unlocked (GstCurlSink * sink);
138
static void gst_curl_sink_transfer_thread_close_unlocked (GstCurlSink * sink);
139
static void gst_curl_sink_wait_for_transfer_thread_to_send_unlocked (GstCurlSink
141
static void gst_curl_sink_data_sent_notify_unlocked (GstCurlSink * sink);
144
_do_init (GType type)
146
GST_DEBUG_CATEGORY_INIT (gst_curl_sink_debug, "curlsink", 0,
147
"curl sink element");
150
GST_BOILERPLATE_FULL (GstCurlSink, gst_curl_sink, GstBaseSink,
151
GST_TYPE_BASE_SINK, _do_init);
154
gst_curl_sink_base_init (gpointer g_class)
156
GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
158
gst_element_class_add_pad_template (element_class,
159
gst_static_pad_template_get (&sinktemplate));
160
gst_element_class_set_details_simple (element_class,
163
"Upload data over the network to a server using libcurl",
164
"Patricia Muscalu <patricia@axis.com>");
168
gst_curl_sink_class_init (GstCurlSinkClass * klass)
170
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
171
GstBaseSinkClass *gstbasesink_class = (GstBaseSinkClass *) klass;
173
GST_DEBUG_OBJECT (klass, "class_init");
175
gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_curl_sink_event);
176
gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_curl_sink_render);
177
gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_curl_sink_start);
178
gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_curl_sink_stop);
179
gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_curl_sink_unlock);
180
gstbasesink_class->unlock_stop =
181
GST_DEBUG_FUNCPTR (gst_curl_sink_unlock_stop);
182
gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_curl_sink_finalize);
184
gobject_class->set_property = gst_curl_sink_set_property;
185
gobject_class->get_property = gst_curl_sink_get_property;
187
/* FIXME: check against souphttpsrc and use same names for same properties */
188
g_object_class_install_property (gobject_class, PROP_LOCATION,
189
g_param_spec_string ("location", "Location",
190
"URI location to write to", NULL,
191
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
192
g_object_class_install_property (gobject_class, PROP_USER_NAME,
193
g_param_spec_string ("user", "User name",
194
"User name to use for server authentication", NULL,
195
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196
g_object_class_install_property (gobject_class, PROP_USER_PASSWD,
197
g_param_spec_string ("passwd", "User password",
198
"User password to use for server authentication", NULL,
199
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
200
g_object_class_install_property (gobject_class, PROP_PROXY,
201
g_param_spec_string ("proxy", "Proxy", "HTTP proxy server URI", NULL,
202
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
203
g_object_class_install_property (gobject_class, PROP_PROXY_PORT,
204
g_param_spec_int ("proxy-port", "Proxy port",
205
"HTTP proxy server port", 0, G_MAXINT, DEFAULT_PROXY_PORT,
206
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
207
g_object_class_install_property (gobject_class, PROP_PROXY_USER_NAME,
208
g_param_spec_string ("proxy-user", "Proxy user name",
209
"Proxy user name to use for proxy authentication",
210
NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
211
g_object_class_install_property (gobject_class, PROP_PROXY_USER_PASSWD,
212
g_param_spec_string ("proxy-passwd", "Proxy user password",
213
"Proxy user password to use for proxy authentication",
214
NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
215
g_object_class_install_property (gobject_class, PROP_FILE_NAME,
216
g_param_spec_string ("file-name", "Base file name",
217
"The base file name for the uploaded images", NULL,
218
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
219
g_object_class_install_property (gobject_class, PROP_TIMEOUT,
220
g_param_spec_int ("timeout", "Timeout",
221
"Number of seconds waiting to write before timeout",
222
0, G_MAXINT, DEFAULT_TIMEOUT,
223
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
224
g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
225
g_param_spec_int ("qos-dscp",
226
"QoS diff srv code point",
227
"Quality of Service, differentiated services code point (0 default)",
228
DSCP_MIN, DSCP_MAX, DEFAULT_QOS_DSCP,
229
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
230
g_object_class_install_property (gobject_class, PROP_ACCEPT_SELF_SIGNED,
231
g_param_spec_boolean ("accept-self-signed",
232
"Accept self-signed certificates",
233
"Accept self-signed SSL/TLS certificates",
234
DEFAULT_ACCEPT_SELF_SIGNED,
235
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
236
g_object_class_install_property (gobject_class, PROP_USE_CONTENT_LENGTH,
237
g_param_spec_boolean ("use-content-length", "Use content length header",
238
"Use the Content-Length HTTP header instead of "
239
"Transfer-Encoding header", DEFAULT_USE_CONTENT_LENGTH,
240
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
241
g_object_class_install_property (gobject_class, PROP_CONTENT_TYPE,
242
g_param_spec_string ("content-type", "Content type",
243
"The mime type of the body of the request", NULL,
244
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
248
gst_curl_sink_init (GstCurlSink * sink, GstCurlSinkClass * klass)
250
sink->transfer_buf = g_malloc (sizeof (TransferBuffer));
251
sink->transfer_cond = g_malloc (sizeof (TransferCondition));
252
sink->transfer_cond->cond = g_cond_new ();
253
sink->transfer_cond->data_sent = FALSE;
254
sink->transfer_cond->data_available = FALSE;
255
sink->timeout = DEFAULT_TIMEOUT;
256
sink->proxy_port = DEFAULT_PROXY_PORT;
257
sink->qos_dscp = DEFAULT_QOS_DSCP;
258
sink->url = g_strdup (DEFAULT_URL);
259
sink->header_list = NULL;
260
sink->accept_self_signed = DEFAULT_ACCEPT_SELF_SIGNED;
261
sink->use_content_length = DEFAULT_USE_CONTENT_LENGTH;
262
sink->transfer_thread_close = FALSE;
263
sink->new_file = TRUE;
264
sink->proxy_headers_set = FALSE;
265
sink->content_type = NULL;
269
gst_curl_sink_finalize (GObject * gobject)
271
GstCurlSink *this = GST_CURL_SINK (gobject);
273
GST_DEBUG ("finalizing curlsink");
274
if (this->transfer_thread != NULL) {
275
g_thread_join (this->transfer_thread);
278
gst_curl_sink_transfer_cleanup (this);
279
g_cond_free (this->transfer_cond->cond);
280
g_free (this->transfer_cond);
282
g_free (this->transfer_buf);
286
g_free (this->passwd);
287
g_free (this->proxy);
288
g_free (this->proxy_user);
289
g_free (this->proxy_passwd);
290
g_free (this->file_name);
291
g_free (this->content_type);
293
if (this->header_list) {
294
curl_slist_free_all (this->header_list);
295
this->header_list = NULL;
298
if (this->fdset != NULL) {
299
gst_poll_free (this->fdset);
302
G_OBJECT_CLASS (parent_class)->finalize (gobject);
306
gst_curl_sink_render (GstBaseSink * bsink, GstBuffer * buf)
308
GstCurlSink *sink = GST_CURL_SINK (bsink);
313
GST_LOG ("enter render");
315
sink = GST_CURL_SINK (bsink);
316
data = GST_BUFFER_DATA (buf);
317
size = GST_BUFFER_SIZE (buf);
319
if (sink->content_type == NULL) {
321
GstStructure *structure;
322
const gchar *mime_type;
325
structure = gst_caps_get_structure (caps, 0);
326
mime_type = gst_structure_get_name (structure);
327
sink->content_type = g_strdup (mime_type);
330
GST_OBJECT_LOCK (sink);
332
/* check if the transfer thread has encountered problems while the
333
* pipeline thread was working elsewhere */
334
if (sink->flow_ret != GST_FLOW_OK) {
338
g_assert (sink->transfer_cond->data_available == FALSE);
340
/* if there is no transfer thread created, lets create one */
341
if (sink->transfer_thread == NULL) {
342
if (!gst_curl_sink_transfer_start_unlocked (sink)) {
343
sink->flow_ret = GST_FLOW_ERROR;
348
/* make data available for the transfer thread and notify */
349
sink->transfer_buf->ptr = data;
350
sink->transfer_buf->len = size;
351
sink->transfer_buf->offset = 0;
352
gst_curl_sink_transfer_thread_notify_unlocked (sink);
354
/* wait for the transfer thread to send the data. This will be notified
355
* either when transfer is completed by the curl read callback or by
356
* the thread function if an error has occured. */
357
gst_curl_sink_wait_for_transfer_thread_to_send_unlocked (sink);
360
ret = sink->flow_ret;
361
GST_OBJECT_UNLOCK (sink);
363
GST_LOG ("exit render");
369
gst_curl_sink_event (GstBaseSink * bsink, GstEvent * event)
371
GstCurlSink *sink = GST_CURL_SINK (bsink);
373
switch (event->type) {
375
GST_DEBUG_OBJECT (sink, "received EOS");
376
GST_OBJECT_LOCK (sink);
377
gst_curl_sink_transfer_thread_close_unlocked (sink);
378
GST_OBJECT_UNLOCK (sink);
379
if (sink->transfer_thread != NULL) {
380
g_thread_join (sink->transfer_thread);
381
sink->transfer_thread = NULL;
391
gst_curl_sink_start (GstBaseSink * bsink)
395
sink = GST_CURL_SINK (bsink);
397
if ((sink->fdset = gst_poll_new (TRUE)) == NULL) {
398
GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ_WRITE,
399
("gst_poll_new failed: %s", g_strerror (errno)), (NULL));
407
gst_curl_sink_stop (GstBaseSink * bsink)
409
GstCurlSink *sink = GST_CURL_SINK (bsink);
411
GST_OBJECT_LOCK (sink);
412
gst_curl_sink_transfer_thread_close_unlocked (sink);
413
GST_OBJECT_UNLOCK (sink);
414
if (sink->fdset != NULL) {
415
gst_poll_free (sink->fdset);
423
gst_curl_sink_unlock (GstBaseSink * bsink)
427
sink = GST_CURL_SINK (bsink);
429
GST_LOG_OBJECT (sink, "Flushing");
430
gst_poll_set_flushing (sink->fdset, TRUE);
436
gst_curl_sink_unlock_stop (GstBaseSink * bsink)
440
sink = GST_CURL_SINK (bsink);
442
GST_LOG_OBJECT (sink, "No longer flushing");
443
gst_poll_set_flushing (sink->fdset, FALSE);
449
gst_curl_sink_set_property (GObject * object, guint prop_id,
450
const GValue * value, GParamSpec * pspec)
455
g_return_if_fail (GST_IS_CURL_SINK (object));
456
sink = GST_CURL_SINK (object);
458
gst_element_get_state (GST_ELEMENT (sink), &cur_state, NULL, 0);
459
if (cur_state != GST_STATE_PLAYING && cur_state != GST_STATE_PAUSED) {
460
GST_OBJECT_LOCK (sink);
465
sink->url = g_value_dup_string (value);
466
GST_DEBUG_OBJECT (sink, "url set to %s", sink->url);
470
sink->user = g_value_dup_string (value);
471
GST_DEBUG_OBJECT (sink, "user set to %s", sink->user);
473
case PROP_USER_PASSWD:
474
g_free (sink->passwd);
475
sink->passwd = g_value_dup_string (value);
476
GST_DEBUG_OBJECT (sink, "passwd set to %s", sink->passwd);
479
g_free (sink->proxy);
480
sink->proxy = g_value_dup_string (value);
481
GST_DEBUG_OBJECT (sink, "proxy set to %s", sink->proxy);
483
case PROP_PROXY_PORT:
484
sink->proxy_port = g_value_get_int (value);
485
GST_DEBUG_OBJECT (sink, "proxy port set to %d", sink->proxy_port);
487
case PROP_PROXY_USER_NAME:
488
g_free (sink->proxy_user);
489
sink->proxy_user = g_value_dup_string (value);
490
GST_DEBUG_OBJECT (sink, "proxy user set to %s", sink->proxy_user);
492
case PROP_PROXY_USER_PASSWD:
493
g_free (sink->proxy_passwd);
494
sink->proxy_passwd = g_value_dup_string (value);
495
GST_DEBUG_OBJECT (sink, "proxy password set to %s", sink->proxy_passwd);
498
g_free (sink->file_name);
499
sink->file_name = g_value_dup_string (value);
500
GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name);
503
sink->timeout = g_value_get_int (value);
504
GST_DEBUG_OBJECT (sink, "timeout set to %d", sink->timeout);
507
sink->qos_dscp = g_value_get_int (value);
508
gst_curl_sink_setup_dscp_unlocked (sink);
509
GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp);
511
case PROP_ACCEPT_SELF_SIGNED:
512
sink->accept_self_signed = g_value_get_boolean (value);
513
GST_DEBUG_OBJECT (sink, "accept_self_signed set to %d",
514
sink->accept_self_signed);
516
case PROP_USE_CONTENT_LENGTH:
517
sink->use_content_length = g_value_get_boolean (value);
518
GST_DEBUG_OBJECT (sink, "use_content_length set to %d",
519
sink->use_content_length);
521
case PROP_CONTENT_TYPE:
522
g_free (sink->content_type);
523
sink->content_type = g_value_dup_string (value);
524
GST_DEBUG_OBJECT (sink, "content type set to %s", sink->content_type);
527
GST_DEBUG_OBJECT (sink, "invalid property id %d", prop_id);
531
GST_OBJECT_UNLOCK (sink);
536
/* in PLAYING or PAUSED state */
537
GST_OBJECT_LOCK (sink);
541
g_free (sink->file_name);
542
sink->file_name = g_value_dup_string (value);
543
GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name);
544
gst_curl_sink_new_file_notify_unlocked (sink);
547
sink->timeout = g_value_get_int (value);
548
GST_DEBUG_OBJECT (sink, "timeout set to %d", sink->timeout);
551
sink->qos_dscp = g_value_get_int (value);
552
gst_curl_sink_setup_dscp_unlocked (sink);
553
GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp);
555
case PROP_CONTENT_TYPE:
556
g_free (sink->content_type);
557
sink->content_type = g_value_dup_string (value);
558
GST_DEBUG_OBJECT (sink, "content type set to %s", sink->content_type);
561
GST_WARNING_OBJECT (sink, "cannot set property when PLAYING");
565
GST_OBJECT_UNLOCK (sink);
569
gst_curl_sink_get_property (GObject * object, guint prop_id,
570
GValue * value, GParamSpec * pspec)
574
g_return_if_fail (GST_IS_CURL_SINK (object));
575
sink = GST_CURL_SINK (object);
579
g_value_set_string (value, sink->url);
582
g_value_set_string (value, sink->user);
584
case PROP_USER_PASSWD:
585
g_value_set_string (value, sink->passwd);
588
g_value_set_string (value, sink->proxy);
590
case PROP_PROXY_PORT:
591
g_value_set_int (value, sink->proxy_port);
593
case PROP_PROXY_USER_NAME:
594
g_value_set_string (value, sink->proxy_user);
596
case PROP_PROXY_USER_PASSWD:
597
g_value_set_string (value, sink->proxy_passwd);
600
g_value_set_string (value, sink->file_name);
603
g_value_set_int (value, sink->timeout);
606
g_value_set_int (value, sink->qos_dscp);
608
case PROP_ACCEPT_SELF_SIGNED:
609
g_value_set_boolean (value, sink->accept_self_signed);
611
case PROP_USE_CONTENT_LENGTH:
612
g_value_set_boolean (value, sink->use_content_length);
614
case PROP_CONTENT_TYPE:
615
g_value_set_string (value, sink->content_type);
618
GST_DEBUG_OBJECT (sink, "invalid property id");
624
gst_curl_sink_set_http_header_unlocked (GstCurlSink * sink)
628
if (sink->header_list) {
629
curl_slist_free_all (sink->header_list);
630
sink->header_list = NULL;
633
if (proxy_auth && !sink->proxy_headers_set && !proxy_conn_established) {
635
curl_slist_append (sink->header_list, "Content-Length: 0");
636
sink->proxy_headers_set = TRUE;
639
if (sink->use_content_length) {
640
/* if content length is used we assume that every buffer is one
641
* entire file, which is the case when uploading several jpegs */
642
tmp = g_strdup_printf ("Content-Length: %d", (int) sink->transfer_buf->len);
643
sink->header_list = curl_slist_append (sink->header_list, tmp);
646
/* when sending a POST request to a HTTP 1.1 server, you can send data
647
* without knowing the size before starting the POST if you use chunked
649
sink->header_list = curl_slist_append (sink->header_list,
650
"Transfer-Encoding: chunked");
653
tmp = g_strdup_printf ("Content-Type: %s", sink->content_type);
654
sink->header_list = curl_slist_append (sink->header_list, tmp);
659
tmp = g_strdup_printf ("Content-Disposition: attachment; filename="
660
"\"%s\"", sink->file_name);
661
sink->header_list = curl_slist_append (sink->header_list, tmp);
663
curl_easy_setopt (sink->curl, CURLOPT_HTTPHEADER, sink->header_list);
667
gst_curl_sink_transfer_set_options_unlocked (GstCurlSink * sink)
670
curl_easy_setopt (sink->curl, CURLOPT_VERBOSE, 1);
673
curl_easy_setopt (sink->curl, CURLOPT_URL, sink->url);
674
curl_easy_setopt (sink->curl, CURLOPT_CONNECTTIMEOUT, sink->timeout);
676
curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTDATA, sink);
677
curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTFUNCTION,
678
gst_curl_sink_transfer_socket_cb);
680
if (sink->user != NULL && strlen (sink->user)) {
681
curl_easy_setopt (sink->curl, CURLOPT_USERNAME, sink->user);
682
curl_easy_setopt (sink->curl, CURLOPT_PASSWORD, sink->passwd);
683
curl_easy_setopt (sink->curl, CURLOPT_HTTPAUTH, CURLAUTH_ANY);
686
if (sink->accept_self_signed && g_str_has_prefix (sink->url, "https")) {
687
/* TODO verify the authenticity of the peer's certificate */
688
curl_easy_setopt (sink->curl, CURLOPT_SSL_VERIFYPEER, 0L);
689
/* TODO check the servers's claimed identity */
690
curl_easy_setopt (sink->curl, CURLOPT_SSL_VERIFYHOST, 0L);
694
if (sink->proxy != NULL && strlen (sink->proxy)) {
695
if (curl_easy_setopt (sink->curl, CURLOPT_PROXY, sink->proxy)
699
if (curl_easy_setopt (sink->curl, CURLOPT_PROXYPORT, sink->proxy_port)
703
if (sink->proxy_user != NULL &&
704
strlen (sink->proxy_user) &&
705
sink->proxy_passwd != NULL && strlen (sink->proxy_passwd)) {
706
curl_easy_setopt (sink->curl, CURLOPT_PROXYUSERNAME, sink->proxy_user);
707
curl_easy_setopt (sink->curl, CURLOPT_PROXYPASSWORD, sink->proxy_passwd);
708
curl_easy_setopt (sink->curl, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
711
/* tunnel all operations through a given HTTP proxy */
712
if (curl_easy_setopt (sink->curl, CURLOPT_HTTPPROXYTUNNEL, 1L)
719
curl_easy_setopt (sink->curl, CURLOPT_POST, 1L);
721
curl_easy_setopt (sink->curl, CURLOPT_READFUNCTION,
722
gst_curl_sink_transfer_read_cb);
723
curl_easy_setopt (sink->curl, CURLOPT_READDATA, sink);
724
curl_easy_setopt (sink->curl, CURLOPT_WRITEFUNCTION,
725
gst_curl_sink_transfer_write_cb);
731
gst_curl_sink_transfer_read_cb (void *curl_ptr, size_t size, size_t nmemb,
735
TransferBuffer *buffer;
736
size_t max_bytes_to_send;
739
sink = (GstCurlSink *) stream;
741
/* wait for data to come available, if new file or thread close is set
742
* then zero will be returned to indicate end of current transfer */
743
GST_OBJECT_LOCK (sink);
744
if (gst_curl_sink_wait_for_data_unlocked (sink) == FALSE) {
745
GST_LOG ("returning 0, no more data to send in this file");
746
GST_OBJECT_UNLOCK (sink);
749
GST_OBJECT_UNLOCK (sink);
752
max_bytes_to_send = size * nmemb;
753
buffer = sink->transfer_buf;
755
buf_len = buffer->len;
756
GST_LOG ("write buf len=%" G_GSIZE_FORMAT ", offset=%" G_GSIZE_FORMAT,
757
buffer->len, buffer->offset);
759
/* more data in buffer */
760
if (buffer->len > 0) {
761
size_t bytes_to_send = MIN (max_bytes_to_send, buf_len);
763
memcpy ((guint8 *) curl_ptr, buffer->ptr + buffer->offset, bytes_to_send);
765
buffer->offset = buffer->offset + bytes_to_send;
766
buffer->len = buffer->len - bytes_to_send;
768
/* the last data chunk */
769
if (bytes_to_send == buf_len) {
773
GST_OBJECT_LOCK (sink);
774
gst_curl_sink_data_sent_notify_unlocked (sink);
775
GST_OBJECT_UNLOCK (sink);
778
GST_LOG ("sent : %" G_GSIZE_FORMAT, bytes_to_send);
780
return bytes_to_send;
782
GST_WARNING ("got zero-length buffer");
788
gst_curl_sink_transfer_write_cb (void G_GNUC_UNUSED * ptr, size_t size,
789
size_t nmemb, void G_GNUC_UNUSED * stream)
791
size_t realsize = size * nmemb;
793
GST_DEBUG ("response %s", (gchar *) ptr);
798
gst_curl_sink_transfer_check (GstCurlSink * sink)
800
CURLcode code = CURLE_OK;
804
gchar *eff_url = NULL;
808
while ((msg = curl_multi_info_read (sink->multi_handle, &msgs_left))) {
809
if (msg->msg == CURLMSG_DONE) {
810
easy = msg->easy_handle;
811
code = msg->data.result;
816
curl_easy_getinfo (easy, CURLINFO_EFFECTIVE_URL, &eff_url);
817
GST_DEBUG ("transfer done %s (%s-%d)\n", eff_url,
818
curl_easy_strerror (code), code);
826
gst_curl_sink_handle_transfer (GstCurlSink * sink)
829
gint running_handles;
834
glong resp_proxy = -1;
836
GST_OBJECT_LOCK (sink);
837
timeout = sink->timeout;
838
GST_OBJECT_UNLOCK (sink);
840
/* Receiving CURLM_CALL_MULTI_PERFORM means that libcurl may have more data
841
available to send or receive - call simply curl_multi_perform before
842
poll() on more actions */
844
m_code = curl_multi_perform (sink->multi_handle, &running_handles);
845
} while (m_code == CURLM_CALL_MULTI_PERFORM);
847
while (running_handles && (m_code == CURLM_OK)) {
848
if (!proxy_conn_established && (resp_proxy != RESPONSE_CONNECT_PROXY)
850
curl_easy_getinfo (sink->curl, CURLINFO_HTTP_CONNECTCODE, &resp_proxy);
851
if ((resp_proxy == RESPONSE_CONNECT_PROXY)) {
852
GST_LOG ("received HTTP/1.0 200 Connection Established");
853
/* Workaround: redefine HTTP headers before connecting to HTTP server.
854
* When talking to proxy, the Content-Length: 0 is send with the request.
856
curl_multi_remove_handle (sink->multi_handle, sink->curl);
857
gst_curl_sink_set_http_header_unlocked (sink);
858
curl_multi_add_handle (sink->multi_handle, sink->curl);
859
proxy_conn_established = TRUE;
863
retval = gst_poll_wait (sink->fdset, timeout * GST_SECOND);
864
if (G_UNLIKELY (retval == -1)) {
865
if (errno == EAGAIN || errno == EINTR) {
866
GST_DEBUG_OBJECT (sink, "interrupted by signal");
867
} else if (errno == EBUSY) {
872
} else if (G_UNLIKELY (retval == 0)) {
873
GST_DEBUG ("timeout");
877
/* readable/writable sockets */
879
m_code = curl_multi_perform (sink->multi_handle, &running_handles);
880
} while (m_code == CURLM_CALL_MULTI_PERFORM);
882
if (resp != RESPONSE_100_CONTINUE) {
883
curl_easy_getinfo (sink->curl, CURLINFO_RESPONSE_CODE, &resp);
887
if (resp != RESPONSE_100_CONTINUE) {
888
/* No 100 Continue response received. Using POST with HTTP 1.1 implies
889
* the use of a "Expect: 100-continue" header. If the server doesn't
890
* send HTTP/1.1 100 Continue, libcurl will not call transfer_read_cb
891
* in order to send POST data.
893
goto no_100_continue_response;
896
if (m_code != CURLM_OK) {
897
goto curl_multi_error;
900
/* problems still might have occurred on individual transfers even when
901
* curl_multi_perform returns CURLM_OK */
902
if ((e_code = gst_curl_sink_transfer_check (sink)) != CURLE_OK) {
903
goto curl_easy_error;
906
/* check response code */
907
curl_easy_getinfo (sink->curl, CURLINFO_RESPONSE_CODE, &resp);
908
GST_DEBUG_OBJECT (sink, "response code: %ld", resp);
909
if (resp < 200 || resp >= 300) {
917
GST_DEBUG_OBJECT (sink, "poll failed: %s", g_strerror (errno));
918
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("poll failed"), (NULL));
919
return GST_FLOW_ERROR;
924
GST_DEBUG_OBJECT (sink, "poll stopped");
925
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("poll stopped"), (NULL));
926
return GST_FLOW_ERROR;
931
GST_DEBUG_OBJECT (sink, "poll timed out");
932
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("poll timed out"), (NULL));
933
return GST_FLOW_ERROR;
938
GST_DEBUG_OBJECT (sink, "curl multi error");
939
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("%s",
940
curl_multi_strerror (m_code)), (NULL));
941
return GST_FLOW_ERROR;
946
GST_DEBUG_OBJECT (sink, "curl easy error");
947
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("%s",
948
curl_easy_strerror (e_code)), (NULL));
949
return GST_FLOW_ERROR;
952
no_100_continue_response:
954
GST_DEBUG_OBJECT (sink, "100 continue response missing");
955
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("100 continue response missing"),
957
return GST_FLOW_ERROR;
962
GST_DEBUG_OBJECT (sink, "response error");
963
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("response error: %ld", resp),
965
return GST_FLOW_ERROR;
969
/* This function gets called by libcurl after the socket() call but before
970
* the connect() call. */
972
gst_curl_sink_transfer_socket_cb (void *clientp, curl_socket_t curlfd,
973
curlsocktype G_GNUC_UNUSED purpose)
978
sink = (GstCurlSink *) clientp;
983
/* signal an unrecoverable error to the library which will close the socket
984
and return CURLE_COULDNT_CONNECT
989
gst_poll_fd_init (&sink->fd);
990
sink->fd.fd = curlfd;
992
ret = ret && gst_poll_add_fd (sink->fdset, &sink->fd);
993
ret = ret && gst_poll_fd_ctl_write (sink->fdset, &sink->fd, TRUE);
994
ret = ret && gst_poll_fd_ctl_read (sink->fdset, &sink->fd, TRUE);
995
GST_DEBUG ("fd: %d", sink->fd.fd);
996
GST_OBJECT_LOCK (sink);
997
gst_curl_sink_setup_dscp_unlocked (sink);
998
GST_OBJECT_UNLOCK (sink);
1009
gst_curl_sink_transfer_start_unlocked (GstCurlSink * sink)
1011
GError *error = NULL;
1012
gboolean ret = TRUE;
1014
GST_LOG ("creating transfer thread");
1015
sink->transfer_thread_close = FALSE;
1016
sink->new_file = TRUE;
1017
sink->transfer_thread =
1018
g_thread_create ((GThreadFunc) gst_curl_sink_transfer_thread_func, sink,
1021
if (sink->transfer_thread == NULL || error != NULL) {
1024
GST_ERROR_OBJECT (sink, "could not create thread %s", error->message);
1025
g_error_free (error);
1027
GST_ERROR_OBJECT (sink, "could not create thread for unknown reason");
1035
gst_curl_sink_transfer_thread_func (gpointer data)
1037
GstCurlSink *sink = (GstCurlSink *) data;
1038
GstFlowReturn ret = GST_FLOW_OK;
1039
gboolean data_available;
1041
GST_LOG ("transfer thread started");
1042
GST_OBJECT_LOCK (sink);
1043
if (!gst_curl_sink_transfer_setup_unlocked (sink)) {
1044
GST_DEBUG_OBJECT (sink, "curl setup error");
1045
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("curl setup error"), (NULL));
1046
sink->flow_ret = GST_FLOW_ERROR;
1050
while (!sink->transfer_thread_close && sink->flow_ret == GST_FLOW_OK) {
1051
/* we are working on a new file, clearing flag and setting file
1052
* name in http header */
1053
sink->new_file = FALSE;
1055
/* wait for data to arrive for this new file, if we get a new file name
1056
* again before getting data we will simply skip transfering anything
1057
* for this file and go directly to the new file */
1058
data_available = gst_curl_sink_wait_for_data_unlocked (sink);
1059
if (data_available) {
1060
gst_curl_sink_set_http_header_unlocked (sink);
1063
/* stay unlocked while handling the actual transfer */
1064
GST_OBJECT_UNLOCK (sink);
1066
if (data_available) {
1067
curl_multi_add_handle (sink->multi_handle, sink->curl);
1069
/* Start driving the transfer. */
1070
ret = gst_curl_sink_handle_transfer (sink);
1072
/* easy handle will be possibly re-used for next transfer, thus it needs to
1073
* be removed from the multi stack and re-added again */
1074
curl_multi_remove_handle (sink->multi_handle, sink->curl);
1077
/* lock again before looping to check the thread closed flag */
1078
GST_OBJECT_LOCK (sink);
1080
/* if we have transfered data, then set the return code */
1081
if (data_available) {
1082
sink->flow_ret = ret;
1087
/* if there is a flow error, always notify the render function so it
1088
* can return the flow error up along the pipeline */
1089
if (sink->flow_ret != GST_FLOW_OK) {
1090
gst_curl_sink_data_sent_notify_unlocked (sink);
1093
GST_OBJECT_UNLOCK (sink);
1094
GST_DEBUG ("exit thread func - transfer thread close flag: %d",
1095
sink->transfer_thread_close);
1101
gst_curl_sink_transfer_setup_unlocked (GstCurlSink * sink)
1105
if (sink->curl == NULL) {
1106
/* curl_easy_init automatically calls curl_global_init(3) */
1107
if ((sink->curl = curl_easy_init ()) == NULL) {
1108
g_warning ("Failed to init easy handle");
1113
if (!gst_curl_sink_transfer_set_options_unlocked (sink)) {
1114
g_warning ("Failed to setup easy handle");
1115
GST_OBJECT_UNLOCK (sink);
1119
/* init a multi stack (non-blocking interface to liburl) */
1120
if (sink->multi_handle == NULL) {
1121
if ((sink->multi_handle = curl_multi_init ()) == NULL) {
1130
gst_curl_sink_transfer_cleanup (GstCurlSink * sink)
1132
if (sink->curl != NULL) {
1133
if (sink->multi_handle != NULL) {
1134
curl_multi_remove_handle (sink->multi_handle, sink->curl);
1136
curl_easy_cleanup (sink->curl);
1140
if (sink->multi_handle != NULL) {
1141
curl_multi_cleanup (sink->multi_handle);
1142
sink->multi_handle = NULL;
1147
gst_curl_sink_wait_for_data_unlocked (GstCurlSink * sink)
1149
gboolean data_available = FALSE;
1151
GST_LOG ("waiting for data");
1152
while (!sink->transfer_cond->data_available &&
1153
!sink->transfer_thread_close && !sink->new_file) {
1154
g_cond_wait (sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink));
1157
if (sink->transfer_thread_close) {
1158
GST_LOG ("wait for data aborted due to thread close");
1159
} else if (sink->new_file) {
1160
GST_LOG ("wait for data aborted due to new file name");
1162
GST_LOG ("wait for data completed");
1163
data_available = TRUE;
1166
return data_available;
1170
gst_curl_sink_transfer_thread_notify_unlocked (GstCurlSink * sink)
1172
GST_LOG ("more data to send");
1173
sink->transfer_cond->data_available = TRUE;
1174
sink->transfer_cond->data_sent = FALSE;
1175
g_cond_signal (sink->transfer_cond->cond);
1179
gst_curl_sink_new_file_notify_unlocked (GstCurlSink * sink)
1181
GST_LOG ("new file name");
1182
sink->new_file = TRUE;
1183
g_cond_signal (sink->transfer_cond->cond);
1187
gst_curl_sink_transfer_thread_close_unlocked (GstCurlSink * sink)
1189
GST_LOG ("setting transfer thread close flag");
1190
sink->transfer_thread_close = TRUE;
1191
g_cond_signal (sink->transfer_cond->cond);
1195
gst_curl_sink_wait_for_transfer_thread_to_send_unlocked (GstCurlSink * sink)
1197
GST_LOG ("waiting for buffer send to complete");
1199
/* this function should not check if the transfer thread is set to be closed
1200
* since that flag only can be set by the EoS event (by the pipeline thread).
1201
* This can therefore never happen while this function is running since this
1202
* function also is called by the pipeline thread (in the render function) */
1203
while (!sink->transfer_cond->data_sent) {
1204
g_cond_wait (sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink));
1206
GST_LOG ("buffer send completed");
1210
gst_curl_sink_data_sent_notify_unlocked (GstCurlSink * sink)
1212
GST_LOG ("transfer completed");
1213
sink->transfer_cond->data_available = FALSE;
1214
sink->transfer_cond->data_sent = TRUE;
1215
g_cond_signal (sink->transfer_cond->cond);
1219
gst_curl_sink_setup_dscp_unlocked (GstCurlSink * sink)
1227
struct sockaddr_in6 sa_in6;
1228
struct sockaddr_storage sa_stor;
1230
socklen_t slen = sizeof (sa);
1232
if (getsockname (sink->fd.fd, &sa.sa, &slen) < 0) {
1233
GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
1236
af = sa.sa.sa_family;
1238
/* if this is an IPv4-mapped address then do IPv4 QoS */
1239
if (af == AF_INET6) {
1240
GST_DEBUG_OBJECT (sink, "check IP6 socket");
1241
if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
1242
GST_DEBUG_OBJECT (sink, "mapped to IPV4");
1246
/* extract and shift 6 bits of the DSCP */
1247
tos = (sink->qos_dscp & 0x3f) << 2;
1251
ret = setsockopt (sink->fd.fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos));
1255
ret = setsockopt (sink->fd.fd, IPPROTO_IPV6, IPV6_TCLASS, &tos,
1260
GST_ERROR_OBJECT (sink, "unsupported AF");
1264
GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));