~kamalmostafa/ubuntu/lucid/pdp/fix-504941-ftbfs

« back to all changes in this revision

Viewing changes to modules/generic/pdp_rawout.c

  • Committer: Bazaar Package Importer
  • Author(s): Guenter Geiger (Debian/GNU)
  • Date: 2005-03-15 22:21:05 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20050315222105-1q287rsihmd9j1tb
Tags: 1:0.12.4-2
* fixed the hardcoded depends
* added 3dp library

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*
2
 
 *   Pure Data Packet module. packet forth console
3
 
 *   Copyright (c) by Tom Schouten <pdp@zzz.kotnet.org>
4
 
 *
5
 
 *   This program is free software; you can redistribute it and/or modify
6
 
 *   it under the terms of the GNU General Public License as published by
7
 
 *   the Free Software Foundation; either version 2 of the License, or
8
 
 *   (at your option) any later version.
9
 
 *
10
 
 *   This program is distributed in the hope that it will be useful,
11
 
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 *   GNU General Public License for more details.
14
 
 *
15
 
 *   You should have received a copy of the GNU General Public License
16
 
 *   along with this program; if not, write to the Free Software
17
 
 *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18
 
 *
19
 
 */
20
 
 
21
 
 
22
 
#include <pthread.h>
23
 
#include <stdio.h>
24
 
#include <unistd.h>
25
 
#include <stdlib.h>
26
 
#include <sys/types.h>
27
 
#include <sys/stat.h>
28
 
#include <sys/time.h>
29
 
#include <time.h>
30
 
#include <fcntl.h>
31
 
#include <errno.h>
32
 
#include <signal.h>
33
 
#include "pdp_pd.h"
34
 
#include "pdp_debug.h"
35
 
#include "pdp_list.h"
36
 
#include "pdp_comm.h"
37
 
#include "pdp_post.h"
38
 
#include "pdp_packet.h"
39
 
 
40
 
 
41
 
#define D if (1)
42
 
#define MAX_QUEUESIZE 4
43
 
#define PIPE_BLOCKSIZE 4096
44
 
 
45
 
 
46
 
 
47
 
 
48
 
/* raw input from a unix pipe */
49
 
 
50
 
typedef struct rawout_struct
51
 
{
52
 
    /* pd */
53
 
    t_object x_obj;
54
 
    //t_outlet *x_outlet;
55
 
    t_outlet *x_sync_outlet;
56
 
 
57
 
    /* comm */
58
 
    t_pdp_list *x_queue; // packet queue
59
 
 
60
 
    /* thread */
61
 
    pthread_mutex_t x_mut;
62
 
    pthread_attr_t x_attr;
63
 
    pthread_t x_thread;
64
 
 
65
 
    /* sync */
66
 
    int x_giveup;  // 1-> terminate writer thread
67
 
    int x_active;  // 1-> writer thread is launched
68
 
    int x_done;    // 1-> writer thread has exited
69
 
 
70
 
    /* config */
71
 
    t_symbol *x_pipe;
72
 
    t_pdp_symbol *x_type;
73
 
 
74
 
} t_rawout;
75
 
 
76
 
 
77
 
static inline void lock(t_rawout *x){pthread_mutex_lock(&x->x_mut);}
78
 
static inline void unlock(t_rawout *x){pthread_mutex_unlock(&x->x_mut);}
79
 
 
80
 
static void rawout_close(t_rawout *x);
81
 
static void pdp_in(t_rawout *x, t_symbol *s, t_float f)
82
 
{
83
 
    /* save packet to pdp queue, if size is smaller than maxsize */
84
 
    if (s == S_REGISTER_RO){
85
 
        if (x->x_queue->elements < MAX_QUEUESIZE){
86
 
            int p = (int)f;
87
 
            p = pdp_packet_copy_ro(p);
88
 
            if (p != -1){
89
 
                lock(x);
90
 
                pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)p);
91
 
                unlock(x);
92
 
            }
93
 
        }
94
 
        else {
95
 
            pdp_post("pdp_rawout: dropping packet: (queue full)", MAX_QUEUESIZE);
96
 
        }
97
 
            
98
 
    }
99
 
 
100
 
    /* check if thread is done */
101
 
    if (x->x_done) rawout_close(x);
102
 
 
103
 
}
104
 
 
105
 
 
106
 
 
107
 
static void *rawout_thread(void *y)
108
 
{
109
 
    int pipe;
110
 
    int packet = -1;
111
 
    t_rawout *x = (t_rawout *)y;
112
 
    int period_sec;
113
 
    int period_usec;
114
 
    sigset_t sigvec; /* signal handling  */
115
 
 
116
 
    /* ignore pipe signal */
117
 
    sigemptyset(&sigvec);
118
 
    sigaddset(&sigvec,SIGPIPE);
119
 
    pthread_sigmask(SIG_BLOCK, &sigvec, 0);
120
 
 
121
 
    //D pdp_post("pipe: %s", x->x_pipe->s_name);
122
 
    //D pdp_post("type: %s", x->x_type->s_name);
123
 
 
124
 
    /* open pipe */
125
 
    if (-1 == (pipe = open(x->x_pipe->s_name, O_WRONLY|O_NONBLOCK))){
126
 
        perror(x->x_pipe->s_name);
127
 
        goto exit;
128
 
    }
129
 
 
130
 
    /* main loop (packets) */
131
 
    while(1){
132
 
        void *data = 0;
133
 
        int left = -1;
134
 
 
135
 
        /* try again if queue is empty */
136
 
        if (!x->x_queue->elements){
137
 
            /* check if we need to stop */
138
 
            if (x->x_giveup){
139
 
                goto close;
140
 
            }
141
 
            else {
142
 
                usleep(1000.0f); // sleep before polling again
143
 
                continue;
144
 
            }
145
 
        }
146
 
        /* get packet from queue */
147
 
        lock(x);
148
 
        packet = pdp_list_pop(x->x_queue).w_packet;
149
 
        unlock(x);
150
 
        
151
 
        /* send packet */
152
 
        data = pdp_packet_data(packet);
153
 
        left = pdp_packet_data_size(packet);
154
 
 
155
 
        /* inner loop: pipe reads */
156
 
        while(left){
157
 
 
158
 
            fd_set outset;
159
 
            struct timeval tv = {0,10000};
160
 
 
161
 
            /* check if we need to stop */
162
 
            if (x->x_giveup){
163
 
                pdp_packet_mark_unused(packet);
164
 
                goto close;
165
 
            }
166
 
 
167
 
            /* select, with timeout */
168
 
            FD_ZERO(&outset);
169
 
            FD_SET(pipe, &outset);
170
 
            if (-1 == select(pipe+1, NULL, &outset, NULL, &tv)){
171
 
                pdp_post("select error");
172
 
                goto close;
173
 
            }
174
 
 
175
 
            /* if ready, read, else retry */
176
 
            if (FD_ISSET(pipe, &outset)){
177
 
                int bytes = write(pipe, data, left);
178
 
                /* handle errors */
179
 
                if (bytes <= 0){
180
 
                    perror(x->x_pipe->s_name);
181
 
                    if (bytes != EAGAIN) goto close;
182
 
                }
183
 
                /* or update pointers */
184
 
                else{
185
 
                    data += bytes;
186
 
                    left -= bytes;
187
 
                    //pdp_post("left %d", left);
188
 
                }
189
 
            }
190
 
            else {
191
 
                //pdp_post("retrying write");
192
 
            }
193
 
        }
194
 
                   
195
 
        /* discard packet */
196
 
        pdp_packet_mark_unused(packet);
197
 
 
198
 
    
199
 
    }
200
 
 
201
 
  close:
202
 
    /* close pipe */
203
 
    close(pipe);
204
 
        
205
 
        
206
 
  exit:
207
 
    x->x_done = 1;
208
 
    return 0;
209
 
}
210
 
 
211
 
 
212
 
 
213
 
static void rawout_type(t_rawout *x, t_symbol *type)
214
 
{
215
 
    x->x_type = pdp_gensym(type->s_name);
216
 
}
217
 
 
218
 
static void rawout_open(t_rawout *x, t_symbol *pipe)
219
 
{
220
 
    /* save pipe name if not empty */
221
 
    if (pipe->s_name[0]) {x->x_pipe = pipe;}
222
 
 
223
 
    if (x->x_active) {
224
 
        pdp_post("already open");
225
 
        return;
226
 
    }
227
 
    /* start thread */
228
 
    x->x_giveup = 0;
229
 
    x->x_done = 0;
230
 
    pthread_create(&x->x_thread, &x->x_attr, rawout_thread , x);
231
 
    x->x_active = 1;
232
 
}
233
 
 
234
 
static void rawout_close(t_rawout *x)
235
 
{
236
 
 
237
 
    if (!x->x_active) return;
238
 
 
239
 
    /* stop thread: set giveup + wait */
240
 
    x->x_giveup = 1;
241
 
    pthread_join(x->x_thread, NULL);
242
 
    x->x_active = 0;
243
 
 
244
 
    /* notify */
245
 
    outlet_bang(x->x_sync_outlet);
246
 
    pdp_post("connection to %s closed", x->x_pipe->s_name);
247
 
 
248
 
    
249
 
 
250
 
 
251
 
    
252
 
}
253
 
 
254
 
static void rawout_free(t_rawout *x)
255
 
{
256
 
    rawout_close(x);
257
 
    pdp_tree_strip_packets(x->x_queue);
258
 
    pdp_tree_free(x->x_queue);
259
 
}
260
 
 
261
 
t_class *rawout_class;
262
 
 
263
 
 
264
 
static void *rawout_new(t_symbol *pipe, t_symbol *type)
265
 
{
266
 
    t_rawout *x;
267
 
 
268
 
    pdp_post("%s %s", pipe->s_name, type->s_name);
269
 
 
270
 
    /* allocate & init */
271
 
    x = (t_rawout *)pd_new(rawout_class);
272
 
    //x->x_outlet = outlet_new(&x->x_obj, &s_anything);
273
 
    x->x_sync_outlet = outlet_new(&x->x_obj, &s_anything);
274
 
    x->x_queue = pdp_list_new(0);
275
 
    x->x_active = 0;
276
 
    x->x_giveup = 0;
277
 
    x->x_done = 0;
278
 
    x->x_type = pdp_gensym("image/YCrCb/320x240"); //default
279
 
    x->x_pipe = gensym("/tmp/pdpraw"); // default
280
 
    pthread_attr_init(&x->x_attr);
281
 
    pthread_mutex_init(&x->x_mut, NULL);
282
 
 
283
 
    /* args */
284
 
    rawout_type(x, type);
285
 
    if (pipe->s_name[0]) x->x_pipe = pipe; 
286
 
 
287
 
    return (void *)x;
288
 
 
289
 
}
290
 
 
291
 
 
292
 
 
293
 
#ifdef __cplusplus
294
 
extern "C"
295
 
{
296
 
#endif
297
 
 
298
 
 
299
 
void pdp_rawout_setup(void)
300
 
{
301
 
    int i;
302
 
 
303
 
    /* create a standard pd class: [pdp_rawout pipe type] */
304
 
    rawout_class = class_new(gensym("pdp_rawout"), (t_newmethod)rawout_new,
305
 
        (t_method)rawout_free, sizeof(t_rawout), 0, A_DEFSYMBOL, A_DEFSYMBOL, A_NULL);
306
 
 
307
 
    /* add global message handler */
308
 
    class_addmethod(rawout_class, (t_method)pdp_in, 
309
 
                    gensym("pdp"), A_SYMBOL, A_FLOAT, A_NULL);
310
 
 
311
 
    class_addmethod(rawout_class, (t_method)rawout_type, gensym("type"), A_SYMBOL, A_NULL);
312
 
    class_addmethod(rawout_class, (t_method)rawout_open, gensym("open"), A_DEFSYMBOL, A_NULL);
313
 
    class_addmethod(rawout_class, (t_method)rawout_close, gensym("close"), A_NULL);
314
 
 
315
 
 
316
 
}
317
 
 
318
 
#ifdef __cplusplus
319
 
}
320
 
#endif