260
259
+ DEFAULT_WAIT_FOR_CONNECTION,
261
260
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
262
+ g_object_class_install_property (gobject_class, PROP_BUFFER_TIME,
263
+ g_param_spec_uint64 ("buffer-time",
264
+ "Buffer Time of the shm buffer",
265
+ "Maximum Size of the shm buffer in nanoseconds (-1 to disable)",
266
+ 0, G_MAXUINT64, GST_CLOCK_TIME_NONE,
267
+ G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
263
269
+ signals[SIGNAL_CLIENT_CONNECTED] = g_signal_new ("client-connected",
264
270
+ GST_TYPE_SHM_SINK, G_SIGNAL_RUN_LAST, 0, NULL, NULL,
265
271
+ g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
848
907
+ const GValue * value, GParamSpec * pspec);
849
908
+static void gst_shm_src_get_property (GObject * object, guint prop_id,
850
909
+ GValue * value, GParamSpec * pspec);
910
+static void gst_shm_src_finalize (GObject * object);
851
911
+static gboolean gst_shm_src_start (GstBaseSrc * bsrc);
852
912
+static gboolean gst_shm_src_stop (GstBaseSrc * bsrc);
853
913
+static GstFlowReturn gst_shm_src_create (GstPushSrc * psrc,
854
914
+ GstBuffer ** outbuf);
855
915
+static gboolean gst_shm_src_unlock (GstBaseSrc * bsrc);
856
916
+static gboolean gst_shm_src_unlock_stop (GstBaseSrc * bsrc);
917
+static GstStateChangeReturn gst_shm_src_change_state (GstElement * element,
918
+ GstStateChange transition);
858
920
+static void gst_shm_pipe_inc (GstShmPipe * pipe);
859
921
+static void gst_shm_pipe_dec (GstShmPipe * pipe);
867
929
+ GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
869
+ gst_element_class_add_pad_template (element_class,
870
+ gst_static_pad_template_get (&srctemplate));
931
+ gst_element_class_add_static_pad_template (element_class, &srctemplate);
872
933
+ gst_element_class_set_details_simple (element_class,
873
934
+ "Shared Memory Source",
875
936
+ "Receive data from the sharem memory sink",
876
+ "Olivier Crete <olivier.crete@collabora.co.uk");
937
+ "Olivier Crete <olivier.crete@collabora.co.uk>");
880
941
+gst_shm_src_class_init (GstShmSrcClass * klass)
882
943
+ GObjectClass *gobject_class;
944
+ GstElementClass *gstelement_class;
883
945
+ GstBaseSrcClass *gstbasesrc_class;
884
946
+ GstPushSrcClass *gstpush_src_class;
886
948
+ gobject_class = (GObjectClass *) klass;
949
+ gstelement_class = (GstElementClass *) klass;
887
950
+ gstbasesrc_class = (GstBaseSrcClass *) klass;
888
951
+ gstpush_src_class = (GstPushSrcClass *) klass;
890
953
+ gobject_class->set_property = gst_shm_src_set_property;
891
954
+ gobject_class->get_property = gst_shm_src_get_property;
955
+ gobject_class->finalize = gst_shm_src_finalize;
957
+ gstelement_class->change_state = gst_shm_src_change_state;
893
959
+ gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_shm_src_start);
894
960
+ gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_shm_src_stop);
969
1047
+static gboolean
970
+gst_shm_src_start (GstBaseSrc * bsrc)
1048
+gst_shm_src_start_reading (GstShmSrc * self)
972
+ GstShmSrc *self = GST_SHM_SRC (bsrc);
973
1050
+ GstShmPipe *gstpipe = g_slice_new0 (GstShmPipe);
975
1052
+ gstpipe->use_count = 1;
976
1053
+ gstpipe->src = gst_object_ref (self);
978
1055
+ if (!self->socket_path) {
979
+ GST_ELEMENT_ERROR (bsrc, RESOURCE, NOT_FOUND,
1056
+ GST_ELEMENT_ERROR (self, RESOURCE, NOT_FOUND,
980
1057
+ ("No path specified for socket."), (NULL));
984
+ GST_DEBUG ("Opening socket %s", self->socket_path);
1061
+ GST_DEBUG_OBJECT (self, "Opening socket %s", self->socket_path);
986
1063
+ GST_OBJECT_LOCK (self);
987
1064
+ gstpipe->pipe = sp_client_open (self->socket_path);
988
1065
+ GST_OBJECT_UNLOCK (self);
990
1067
+ if (!gstpipe->pipe) {
991
+ GST_ELEMENT_ERROR (bsrc, RESOURCE, OPEN_READ_WRITE,
1068
+ GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE,
992
1069
+ ("Could not open socket %s: %d %s", self->socket_path, errno,
993
1070
+ strerror (errno)), (NULL));
994
1071
+ gst_shm_pipe_dec (gstpipe);
1018
1094
+ self->pipe = NULL;
1021
+ gst_poll_free (self->poll);
1022
+ self->poll = NULL;
1097
+ gst_poll_remove_fd (self->poll, &self->pollfd);
1098
+ gst_poll_fd_init (&self->pollfd);
1100
+ gst_poll_set_flushing (self->poll, TRUE);
1104
+gst_shm_src_start (GstBaseSrc * bsrc)
1106
+ if (gst_base_src_is_live (bsrc))
1109
+ return gst_shm_src_start_reading (GST_SHM_SRC (bsrc));
1113
+gst_shm_src_stop (GstBaseSrc * bsrc)
1115
+ if (!gst_base_src_is_live (bsrc))
1116
+ gst_shm_src_stop_reading (GST_SHM_SRC (bsrc));
1106
1200
+ return GST_FLOW_OK;
1203
+static GstStateChangeReturn
1204
+gst_shm_src_change_state (GstElement * element, GstStateChange transition)
1206
+ GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1207
+ GstShmSrc *self = GST_SHM_SRC (element);
1209
+ switch (transition) {
1210
+ case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1211
+ if (gst_base_src_is_live (GST_BASE_SRC (element)))
1212
+ if (!gst_shm_src_start_reading (self))
1213
+ return GST_STATE_CHANGE_FAILURE;
1218
+ ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1219
+ if (ret == GST_STATE_CHANGE_FAILURE)
1222
+ switch (transition) {
1223
+ case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1224
+ if (gst_base_src_is_live (GST_BASE_SRC (element)))
1225
+ gst_shm_src_stop_reading (self);
1109
1233
+static gboolean
1110
1234
+gst_shm_src_unlock (GstBaseSrc * bsrc)
1112
1236
+ GstShmSrc *self = GST_SHM_SRC (bsrc);
1114
1238
+ self->unlocked = TRUE;
1117
+ gst_poll_set_flushing (self->poll, TRUE);
1239
+ gst_poll_set_flushing (self->poll, TRUE);
1730
1854
+ if (fcntl (self->main_socket, F_SETFL, flags | O_NONBLOCK | FD_CLOEXEC) < 0)
1731
1855
+ RETURN_ERROR ("fcntl(F_SETFL) failed (%d): %s\n", errno, strerror (errno));
1733
+ sun.sun_family = AF_UNIX;
1734
+ strncpy (sun.sun_path, path, sizeof (sun.sun_path) - 1);
1857
+ sock_un.sun_family = AF_UNIX;
1858
+ strncpy (sock_un.sun_path, path, sizeof (sock_un.sun_path) - 1);
1736
+ while (bind (self->main_socket, (struct sockaddr *) &sun,
1860
+ while (bind (self->main_socket, (struct sockaddr *) &sock_un,
1737
1861
+ sizeof (struct sockaddr_un)) < 0) {
1738
1862
+ if (errno != EADDRINUSE)
1739
1863
+ RETURN_ERROR ("bind() failed (%d): %s\n", errno, strerror (errno));
1742
1866
+ RETURN_ERROR ("Could not find a free socket name for %s", path);
1744
+ snprintf (sun.sun_path, sizeof (sun.sun_path), "%s.%d", path, i);
1868
+ snprintf (sock_un.sun_path, sizeof (sock_un.sun_path), "%s.%d", path, i);
1748
+ self->socket_path = strdup (sun.sun_path);
1872
+ self->socket_path = strdup (sock_un.sun_path);
1750
1874
+ if (listen (self->main_socket, LISTEN_BACKLOG) < 0)
1751
1875
+ RETURN_ERROR ("listen() failed (%d): %s\n", errno, strerror (errno));
2013
2167
+ shm_alloc_space_alloc_block_get_offset (block->ablock);
2171
+sp_writer_block_get_pipe (ShmBlock * block)
2173
+ return block->pipe;
2017
2177
+sp_writer_free_block (ShmBlock * block)
2019
2179
+ shm_alloc_space_block_dec (block->ablock);
2020
2180
+ sp_shm_area_dec (block->pipe, block->area);
2181
+ sp_dec (block->pipe);
2021
2182
+ spalloc_free (ShmBlock, block);
2024
2185
+/* Returns the number of client this has successfully been sent to */
2027
+sp_writer_send_buf (ShmPipe * self, char *buf, size_t size)
2188
+sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, uint64_t tag)
2029
2190
+ ShmArea *area = NULL;
2030
2191
+ unsigned long offset = 0;
2226
2383
+sp_client_open (const char *path)
2228
2385
+ ShmPipe *self = spalloc_new (ShmPipe);
2229
+ struct sockaddr_un sun;
2386
+ struct sockaddr_un sock_un;
2231
2388
+ memset (self, 0, sizeof (ShmPipe));
2233
2390
+ self->main_socket = socket (PF_UNIX, SOCK_STREAM, 0);
2391
+ self->use_count = 1;
2234
2393
+ if (self->main_socket < 0)
2237
+ sun.sun_family = AF_UNIX;
2238
+ strncpy (sun.sun_path, path, sizeof (sun.sun_path) - 1);
2396
+ sock_un.sun_family = AF_UNIX;
2397
+ strncpy (sock_un.sun_path, path, sizeof (sock_un.sun_path) - 1);
2240
+ if (connect (self->main_socket, (struct sockaddr *) &sun,
2399
+ if (connect (self->main_socket, (struct sockaddr *) &sock_un,
2241
2400
+ sizeof (struct sockaddr_un)) < 0)
2378
2537
+ return pipe->socket_path;
2541
+sp_writer_get_pending_buffers (ShmPipe * self)
2543
+ return self->buffers;
2547
+sp_writer_get_next_buffer (ShmBuffer * buffer)
2549
+ return buffer->next;
2553
+sp_writer_buf_get_tag (ShmBuffer * buffer)
2555
+ return buffer->tag;
2380
2557
Index: gst-plugins-good0.10/sys/shm/shmpipe.h
2381
2558
===================================================================
2382
2559
--- /dev/null 1970-01-01 00:00:00.000000000 +0000
2383
+++ gst-plugins-good0.10/sys/shm/shmpipe.h 2011-03-18 12:22:38.195521299 -0400
2560
+++ gst-plugins-good0.10/sys/shm/shmpipe.h 2012-02-09 13:57:29.572435080 +0200
2386
2563
+ * Copyright (C) <2009> Collabora Ltd
2387
2564
+ * @author: Olivier Crete <olivier.crete@collabora.co.uk
2410
+ * None of this code is thread safe, if you want to use it in a multi-threaded
2411
+ * context, please protect it with a mutex.
2413
+ * First, create a writer with sp_writer_create()
2414
+ * And selectes() on the socket from sp_get_fd()
2415
+ * If the socket is closed or there are errors from any function, the app
2416
+ * should call sp_close() and assume the writer is dead
2417
+ * The server calls sp_writer_accept_client() when there is something to read
2418
+ * from the server fd
2419
+ * It then needs to select() on the socket from sp_writer_get_client_fd()
2420
+ * If it gets an error on that socket, it call sp_writer_close_client().
2421
+ * If there is something to read, it calls sp_writer_recv().
2423
+ * The writer allocates buffers with sp_writer_alloc_block(),
2424
+ * writes something in the buffer (retrieved with sp_writer_block_get_buf(),
2425
+ * then calls sp_writer_send_buf() to send the buffer or a subsection to
2426
+ * the other side. When it is done with the block, it calls
2427
+ * sp_writer_free_block().
2428
+ * If alloc fails, then the server must wait for events from the clients before
2432
+ * The clients connect with sp_client_open()
2433
+ * And select() on the fd from sp_get_fd() until there is something to read.
2434
+ * Then they must read using sp_client_recv() which will return > 0 if there
2435
+ * is a valid buffer (which is read only). It will return 0 if it is an internal
2436
+ * message and <0 if there was an error. If there was an error, one must close
2437
+ * it with sp_close(). If was valid buffer was received, the client must release
2438
+ * it with sp_client_recv_finish() when it is done reading from it.
2587
+ * None of this code is thread safe, if you want to use it in a
2588
+ * multi-threaded context, please protect it with a mutex.
2590
+ * First, create a writer with sp_writer_create(), then select() on
2591
+ * the socket returned by sp_get_fd(). If the socket is closed or any
2592
+ * function returns an error, the app should call sp_close() and
2593
+ * assume the other side is dead. The writer calls
2594
+ * sp_writer_accept_client() when there is something to read from the
2595
+ * main server fd. This returns a new ShmClient (representing a client
2596
+ * connection), the writer needs to do a select() on the socket
2597
+ * returned by sp_writer_get_client_fd(). If it gets an error on that
2598
+ * socket, it calls sp_writer_close_client(). If there is something to
2599
+ * read, it calls sp_writer_recv().
2601
+ * The writer allocates a block containing a free buffer with
2602
+ * sp_writer_alloc_block(), then writes something in the buffer
2603
+ * (retrieved with sp_writer_block_get_buf(), then calls
2604
+ * sp_writer_send_buf() to send the buffer or a subsection to the
2605
+ * other side. When it is done with the block, it calls
2606
+ * sp_writer_free_block(). If alloc fails, then the server must wait
2607
+ * for events on the client fd (the ones where sp_writer_recv() is
2608
+ * called), and then try to re-alloc.
2610
+ * The reader (client) connect to the writer with sp_client_open() And
2611
+ * select()s on the fd from sp_get_fd() until there is something to
2612
+ * read. Then they must read using sp_client_recv() which will return
2613
+ * the size of the buffer (positive) if there is a valid buffer (which
2614
+ * is read only). It will return 0 if it is an internal message and a
2615
+ * negative number if there was an error. If there was an error, the
2616
+ * application must close the pipe with sp_close() and assume that all
2617
+ * buffers are no longer valid. If was valid buffer was received, the
2618
+ * client must release it with sp_client_recv_finish() when it is done
2619
+ * reading from it.
2455
2637
+typedef struct _ShmClient ShmClient;
2456
2638
+typedef struct _ShmPipe ShmPipe;
2457
2639
+typedef struct _ShmBlock ShmBlock;
2640
+typedef struct _ShmBuffer ShmBuffer;
2459
2642
+ShmPipe *sp_writer_create (const char *path, size_t size, mode_t perms);
2460
2643
+const char *sp_writer_get_path (ShmPipe *pipe);
2461
2644
+void sp_close (ShmPipe * self);
2645
+void *sp_get_data (ShmPipe * self);
2646
+void sp_set_data (ShmPipe * self, void *data);
2463
2648
+int sp_writer_setperms_shm (ShmPipe * self, mode_t perms);
2464
2649
+int sp_writer_resize (ShmPipe * self, size_t size);
2469
2654
+ShmBlock *sp_writer_alloc_block (ShmPipe * self, size_t size);
2470
2655
+void sp_writer_free_block (ShmBlock *block);
2471
+int sp_writer_send_buf (ShmPipe * self, char *buf, size_t size);
2656
+int sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, uint64_t tag);
2472
2657
+char *sp_writer_block_get_buf (ShmBlock *block);
2658
+ShmPipe *sp_writer_block_get_pipe (ShmBlock *block);
2474
2660
+ShmClient * sp_writer_accept_client (ShmPipe * self);
2475
2661
+void sp_writer_close_client (ShmPipe *self, ShmClient * client);