3
/* Discipline to invoke UNIX processes as data filters.
4
** These processes must be able to fit in pipelines.
6
** Written by Kiem-Phong Vo, kpv@research.att.com, 03/18/1998.
9
#if !defined(FNDELAY) && defined(O_NDELAY)
10
#define FNDELAY O_NDELAY
13
typedef struct _filter_s
14
{ Sfdisc_t disc; /* discipline structure */
15
Sfio_t* filter; /* the filter stream */
16
char raw[1024]; /* raw data buffer */
17
char* next; /* remainder of data unwritten to pipe */
18
char* endb; /* end of data */
21
/* read data from the filter */
23
static ssize_t filterread(Sfio_t* f, Void_t* buf, size_t n, Sfdisc_t* disc)
25
static ssize_t filterread(f, buf, n, disc)
26
Sfio_t* f; /* stream reading from */
27
Void_t* buf; /* buffer to read into */
28
size_t n; /* number of bytes requested */
29
Sfdisc_t* disc; /* discipline */
38
fi->next = fi->endb = fi->raw;
40
{ /* try to get data from filter, if any */
42
if((r = sfread(fi->filter,buf,n)) > 0)
44
if(errno != EWOULDBLOCK)
48
/* get some raw data to stuff down the pipe */
49
if(fi->next >= fi->endb)
50
{ if((r = sfrd(f,fi->raw,sizeof(fi->raw),disc)) > 0)
55
{ /* eof, close write end of pipes */
56
sfset(fi->filter,SF_READ,0);
57
close(sffileno(fi->filter));
58
sfset(fi->filter,SF_READ,1);
62
if((w = fi->endb - fi->next) > 0)
64
if((w = sfwrite(fi->filter,fi->next,w)) > 0)
66
else if(errno != EWOULDBLOCK)
68
/* pipe is full, sleep for a while, then continue */
75
static ssize_t filterwrite(Sfio_t* f, const Void_t* buf, size_t n, Sfdisc_t* disc)
77
static ssize_t filterwrite(f, buf, n, disc)
78
Sfio_t* f; /* stream reading from */
79
Void_t* buf; /* buffer to read into */
80
size_t n; /* number of bytes requested */
81
Sfdisc_t* disc; /* discipline */
87
/* for the duration of this discipline, the stream is unseekable */
89
static Sfoff_t filterseek(Sfio_t* f, Sfoff_t addr, int offset, Sfdisc_t* disc)
91
static Sfoff_t filterseek(f, addr, offset, disc)
100
disc = NIL(Sfdisc_t*);
101
return (Sfoff_t)(-1);
104
/* on close, remove the discipline */
106
static filterexcept(Sfio_t* f, int type, Void_t* data, Sfdisc_t* disc)
108
static filterexcept(f,type,data,disc)
115
if(type == SF_FINAL || type == SF_DPOP)
116
{ sfclose(((Filter_t*)disc)->filter);
124
int sfdcfilter(Sfio_t* f, const char* cmd)
126
int sfdcfilter(f, cmd)
127
Sfio_t* f; /* stream to filter data */
128
char* cmd; /* program to run as a filter */
134
/* open filter for read&write */
135
if(!(filter = sfpopen(NIL(Sfio_t*),cmd,"r+")) )
138
/* unbuffered so that write data will get to the pipe right away */
139
sfsetbuf(filter,NIL(Void_t*),0);
141
/* make the write descriptor nonblocking */
142
sfset(filter,SF_READ,0);
143
fcntl(sffileno(filter),F_SETFL,FNDELAY);
144
sfset(filter,SF_READ,1);
146
/* same for the read descriptor */
147
sfset(filter,SF_WRITE,0);
148
fcntl(sffileno(filter),F_SETFL,FNDELAY);
149
sfset(filter,SF_WRITE,1);
151
if(!(fi = (Filter_t*)malloc(sizeof(Filter_t))) )
156
fi->disc.readf = filterread;
157
fi->disc.writef = filterwrite;
158
fi->disc.seekf = filterseek;
159
fi->disc.exceptf = filterexcept;
161
fi->next = fi->endb = NIL(char*);
163
if(sfdisc(f,(Sfdisc_t*)fi) != (Sfdisc_t*)fi)