4
4
* This file is licensed by the GPL version 2. Works owned by the
5
5
* Transmission project are granted a special exemption to clause 2(b)
6
* so that the bulk of its code can remain under the MIT license.
6
* so that the bulk of its code can remain under the MIT license.
7
7
* This exemption does not extend to derived works not owned by
8
8
* the Transmission project.
10
* $Id: trevent.c 6253 2008-06-25 11:34:35Z charles $
10
* $Id: trevent.c 6961 2008-10-26 15:39:04Z charles $
13
13
#include <assert.h>
19
19
#include <signal.h>
23
#define pipe(f) _pipe(f, 1000, _O_BINARY)
26
pgpipe( int handles[2] )
29
struct sockaddr_in serv_addr;
30
int len = sizeof( serv_addr );
32
handles[0] = handles[1] = INVALID_SOCKET;
34
if ( ( s = socket( AF_INET, SOCK_STREAM, 0 ) ) == INVALID_SOCKET )
36
/* ereport(LOG, (errmsg_internal("pgpipe failed to create socket: %ui", WSAGetLastError()))); */
40
memset( &serv_addr, 0, sizeof( serv_addr ) );
41
serv_addr.sin_family = AF_INET;
42
serv_addr.sin_port = htons(0);
43
serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
44
if (bind(s, (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
46
/* ereport(LOG, (errmsg_internal("pgpipe failed to bind: %ui", WSAGetLastError()))); */
50
if (listen(s, 1) == SOCKET_ERROR)
52
/* ereport(LOG, (errmsg_internal("pgpipe failed to listen: %ui", WSAGetLastError()))); */
56
if (getsockname(s, (SOCKADDR *) & serv_addr, &len) == SOCKET_ERROR)
58
/* ereport(LOG, (errmsg_internal("pgpipe failed to getsockname: %ui", WSAGetLastError()))); */
62
if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
64
/* ereport(LOG, (errmsg_internal("pgpipe failed to create socket 2: %ui", WSAGetLastError()))); */
69
if (connect(handles[1], (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
71
/* ereport(LOG, (errmsg_internal("pgpipe failed to connect socket: %ui", WSAGetLastError()))); */
75
if ((handles[0] = accept(s, (SOCKADDR *) & serv_addr, &len)) == INVALID_SOCKET)
77
/* ereport(LOG, (errmsg_internal("pgpipe failed to accept socket: %ui", WSAGetLastError()))); */
78
closesocket(handles[1]);
79
handles[1] = INVALID_SOCKET;
88
piperead( int s, char *buf, int len )
90
int ret = recv(s, buf, len, 0);
92
if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
93
/* EOF on the pipe! (win32 socket based implementation) */
98
#define pipe(a) pgpipe(a)
99
#define pipewrite(a,b,c) send(a,(char*)b,c,0)
102
#define piperead(a,b,c) read(a,b,c)
103
#define pipewrite(a,b,c) write(a,b,c)
28
108
#include <event.h>
30
110
#include "transmission.h"
39
119
typedef struct tr_event_handle
46
126
struct event_base * base;
47
127
struct event pipeEvent;
51
typedef int timer_func(void*);
131
typedef int timer_func ( void* );
59
struct tr_event_handle * eh;
135
unsigned int inCallback : 1;
138
struct tr_event_handle * eh;
63
143
struct tr_run_data
65
void (*func)( void * );
145
void ( *func )( void * );
69
#define dbgmsg(fmt...) tr_deepLog( __FILE__, __LINE__, "event", ##fmt )
149
#define dbgmsg( ... ) \
151
if( tr_deepLoggingIsActive( ) ) \
152
tr_deepLog( __FILE__, __LINE__, "event", __VA_ARGS__ ); \
72
readFromPipe( int fd, short eventType, void * veh )
156
readFromPipe( int fd,
76
162
tr_event_handle * eh = veh;
77
164
dbgmsg( "readFromPipe: eventType is %hd", eventType );
79
166
/* read the command type */
82
ret = read( fd, &ch, 1 );
83
} while( !eh->die && ret<0 && errno==EAGAIN );
170
ret = piperead( fd, &ch, 1 );
172
while( !eh->die && ret < 0 && errno == EAGAIN );
84
174
dbgmsg( "command is [%c], ret is %d, errno is %d", ch, ret, (int)errno );
88
178
case 'r': /* run in libevent thread */
90
180
struct tr_run_data data;
91
const size_t nwant = sizeof( data );
92
const ssize_t ngot = read( fd, &data, nwant );
93
if( !eh->die && ( ngot == (ssize_t)nwant ) ) {
181
const size_t nwant = sizeof( data );
182
const ssize_t ngot = piperead( fd, &data, nwant );
183
if( !eh->die && ( ngot == (ssize_t)nwant ) )
94
185
dbgmsg( "invoking function in libevent thread" );
95
(data.func)( data.user_data );
186
( data.func )( data.user_data );
99
191
case 't': /* create timer */
102
const size_t nwant = sizeof( timer );
103
const ssize_t ngot = read( fd, &timer, nwant );
104
if( !eh->die && ( ngot == (ssize_t)nwant ) ) {
194
const size_t nwant = sizeof( timer );
195
const ssize_t ngot = piperead( fd, &timer, nwant );
196
if( !eh->die && ( ngot == (ssize_t)nwant ) )
105
198
dbgmsg( "adding timer in libevent thread" );
106
199
evtimer_add( &timer->event, &timer->tv );
110
204
case '\0': /* eof */
112
206
dbgmsg( "pipe eof reached... removing event listener" );
113
207
event_del( &eh->pipeEvent );
118
213
assert( 0 && "unhandled command type!" );
125
logFunc( int severity, const char * message )
220
logFunc( int severity,
221
const char * message )
127
223
if( severity >= _EVENT_LOG_ERR )
128
tr_nerr( "%s", message );
224
tr_err( "%s", message );
130
tr_ndbg( "%s", message );
226
tr_dbg( "%s", message );
134
230
libeventThreadFunc( void * veh )
136
tr_event_handle * eh = (tr_event_handle *) veh;
232
tr_event_handle * eh = veh;
137
234
tr_dbg( "Starting libevent thread" );
141
238
signal( SIGPIPE, SIG_IGN );
144
eh->base = event_init( );
145
241
eh->h->events = eh;
146
event_set_log_callback( logFunc );
148
243
/* listen to the pipe's read fd */
149
event_set( &eh->pipeEvent, eh->fds[0], EV_READ|EV_PERSIST, readFromPipe, veh );
244
event_set( &eh->pipeEvent, eh->fds[0], EV_READ | EV_PERSIST,
150
247
event_add( &eh->pipeEvent, NULL );
248
event_set_log_callback( logFunc );
152
249
event_dispatch( );
154
251
tr_lockFree( eh->lock );
231
334
tr_timerNew( struct tr_handle * handle,
234
337
uint64_t interval_milliseconds )
236
tr_timer * timer = tr_new0( tr_timer, 1 );
237
timer->tv = tr_timevalMsec( interval_milliseconds );
342
assert( handle->events );
344
timer = tr_new0( tr_timer, 1 );
345
tr_timevalMsec( interval_milliseconds, &timer->tv );
238
346
timer->func = func;
239
347
timer->user_data = user_data;
240
348
timer->eh = handle->events;
249
357
const char ch = 't';
250
int fd = handle->events->fds[1];
251
tr_lock * lock = handle->events->lock;
358
int fd = handle->events->fds[1];
359
tr_lock * lock = handle->events->lock;
253
361
tr_lockLock( lock );
255
write( fd, &timer, sizeof(timer) );
362
pipewrite( fd, &ch, 1 );
363
pipewrite( fd, &timer, sizeof( timer ) );
256
364
tr_lockUnlock( lock );
263
tr_runInEventThread( struct tr_handle * handle,
371
tr_runInEventThread( struct tr_handle * handle,
264
372
void func( void* ),
376
assert( handle->events );
267
378
if( tr_amInThread( handle->events->thread ) )
269
380
(func)( user_data );
274
int fd = handle->events->fds[1];
275
tr_lock * lock = handle->events->lock;
385
int fd = handle->events->fds[1];
386
tr_lock * lock = handle->events->lock;
276
387
struct tr_run_data data;
278
389
tr_lockLock( lock );
390
pipewrite( fd, &ch, 1 );
280
391
data.func = func;
281
392
data.user_data = user_data;
282
write( fd, &data, sizeof(data) );
393
pipewrite( fd, &data, sizeof( data ) );
283
394
tr_lockUnlock( lock );
399
tr_eventGetBase( tr_session * session )
401
return session->events->base;