2
* Pure Data Packet module. packet forth console
3
* Copyright (c) by Tom Schouten <pdp@zzz.kotnet.org>
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26
#include <sys/types.h>
34
#include "pdp_debug.h"
38
#include "pdp_packet.h"
42
#define MAX_QUEUESIZE 4
43
#define PIPE_BLOCKSIZE 4096
48
/* raw input from a unix pipe */
50
typedef struct rawout_struct
55
t_outlet *x_sync_outlet;
58
t_pdp_list *x_queue; // packet queue
61
pthread_mutex_t x_mut;
62
pthread_attr_t x_attr;
66
int x_giveup; // 1-> terminate writer thread
67
int x_active; // 1-> writer thread is launched
68
int x_done; // 1-> writer thread has exited
77
static inline void lock(t_rawout *x){pthread_mutex_lock(&x->x_mut);}
78
static inline void unlock(t_rawout *x){pthread_mutex_unlock(&x->x_mut);}
80
static void rawout_close(t_rawout *x);
81
static void pdp_in(t_rawout *x, t_symbol *s, t_float f)
83
/* save packet to pdp queue, if size is smaller than maxsize */
84
if (s == S_REGISTER_RO){
85
if (x->x_queue->elements < MAX_QUEUESIZE){
87
p = pdp_packet_copy_ro(p);
90
pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)p);
95
pdp_post("pdp_rawout: dropping packet: (queue full)", MAX_QUEUESIZE);
100
/* check if thread is done */
101
if (x->x_done) rawout_close(x);
107
static void *rawout_thread(void *y)
111
t_rawout *x = (t_rawout *)y;
114
sigset_t sigvec; /* signal handling */
116
/* ignore pipe signal */
117
sigemptyset(&sigvec);
118
sigaddset(&sigvec,SIGPIPE);
119
pthread_sigmask(SIG_BLOCK, &sigvec, 0);
121
//D pdp_post("pipe: %s", x->x_pipe->s_name);
122
//D pdp_post("type: %s", x->x_type->s_name);
125
if (-1 == (pipe = open(x->x_pipe->s_name, O_WRONLY|O_NONBLOCK))){
126
perror(x->x_pipe->s_name);
130
/* main loop (packets) */
135
/* try again if queue is empty */
136
if (!x->x_queue->elements){
137
/* check if we need to stop */
142
usleep(1000.0f); // sleep before polling again
146
/* get packet from queue */
148
packet = pdp_list_pop(x->x_queue).w_packet;
152
data = pdp_packet_data(packet);
153
left = pdp_packet_data_size(packet);
155
/* inner loop: pipe reads */
159
struct timeval tv = {0,10000};
161
/* check if we need to stop */
163
pdp_packet_mark_unused(packet);
167
/* select, with timeout */
169
FD_SET(pipe, &outset);
170
if (-1 == select(pipe+1, NULL, &outset, NULL, &tv)){
171
pdp_post("select error");
175
/* if ready, read, else retry */
176
if (FD_ISSET(pipe, &outset)){
177
int bytes = write(pipe, data, left);
180
perror(x->x_pipe->s_name);
181
if (bytes != EAGAIN) goto close;
183
/* or update pointers */
187
//pdp_post("left %d", left);
191
//pdp_post("retrying write");
196
pdp_packet_mark_unused(packet);
213
static void rawout_type(t_rawout *x, t_symbol *type)
215
x->x_type = pdp_gensym(type->s_name);
218
static void rawout_open(t_rawout *x, t_symbol *pipe)
220
/* save pipe name if not empty */
221
if (pipe->s_name[0]) {x->x_pipe = pipe;}
224
pdp_post("already open");
230
pthread_create(&x->x_thread, &x->x_attr, rawout_thread , x);
234
static void rawout_close(t_rawout *x)
237
if (!x->x_active) return;
239
/* stop thread: set giveup + wait */
241
pthread_join(x->x_thread, NULL);
245
outlet_bang(x->x_sync_outlet);
246
pdp_post("connection to %s closed", x->x_pipe->s_name);
254
static void rawout_free(t_rawout *x)
257
pdp_tree_strip_packets(x->x_queue);
258
pdp_tree_free(x->x_queue);
261
t_class *rawout_class;
264
static void *rawout_new(t_symbol *pipe, t_symbol *type)
268
pdp_post("%s %s", pipe->s_name, type->s_name);
270
/* allocate & init */
271
x = (t_rawout *)pd_new(rawout_class);
272
//x->x_outlet = outlet_new(&x->x_obj, &s_anything);
273
x->x_sync_outlet = outlet_new(&x->x_obj, &s_anything);
274
x->x_queue = pdp_list_new(0);
278
x->x_type = pdp_gensym("image/YCrCb/320x240"); //default
279
x->x_pipe = gensym("/tmp/pdpraw"); // default
280
pthread_attr_init(&x->x_attr);
281
pthread_mutex_init(&x->x_mut, NULL);
284
rawout_type(x, type);
285
if (pipe->s_name[0]) x->x_pipe = pipe;
299
void pdp_rawout_setup(void)
303
/* create a standard pd class: [pdp_rawout pipe type] */
304
rawout_class = class_new(gensym("pdp_rawout"), (t_newmethod)rawout_new,
305
(t_method)rawout_free, sizeof(t_rawout), 0, A_DEFSYMBOL, A_DEFSYMBOL, A_NULL);
307
/* add global message handler */
308
class_addmethod(rawout_class, (t_method)pdp_in,
309
gensym("pdp"), A_SYMBOL, A_FLOAT, A_NULL);
311
class_addmethod(rawout_class, (t_method)rawout_type, gensym("type"), A_SYMBOL, A_NULL);
312
class_addmethod(rawout_class, (t_method)rawout_open, gensym("open"), A_DEFSYMBOL, A_NULL);
313
class_addmethod(rawout_class, (t_method)rawout_close, gensym("close"), A_NULL);