124
125
erts_smp_mtx_unlock(&(PS)->mtx)
126
127
#define ERTS_POLLSET_SET_POLLED_CHK(PS) \
127
((int) erts_smp_atomic_xchg(&(PS)->polled, (long) 1))
128
((int) erts_atomic32_xchg_nob(&(PS)->polled, (erts_aint32_t) 1))
128
129
#define ERTS_POLLSET_UNSET_POLLED(PS) \
129
erts_smp_atomic_set(&(PS)->polled, (long) 0)
130
erts_atomic32_set_nob(&(PS)->polled, (erts_aint32_t) 0)
130
131
#define ERTS_POLLSET_IS_POLLED(PS) \
131
((int) erts_smp_atomic_read(&(PS)->polled))
133
#define ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) set_poller_woken_chk((PS))
134
#define ERTS_POLLSET_SET_POLLER_WOKEN(PS) \
136
ERTS_THR_MEMORY_BARRIER; \
137
erts_smp_atomic_set(&(PS)->woken, (long) 1); \
139
#define ERTS_POLLSET_UNSET_POLLER_WOKEN(PS) \
141
erts_smp_atomic_set(&(PS)->woken, (long) 0); \
142
ERTS_THR_MEMORY_BARRIER; \
144
#define ERTS_POLLSET_IS_POLLER_WOKEN(PS) \
145
((int) erts_smp_atomic_read(&(PS)->woken))
132
((int) erts_atomic32_read_nob(&(PS)->polled))
152
139
#define ERTS_POLLSET_UNSET_POLLED(PS)
153
140
#define ERTS_POLLSET_IS_POLLED(PS) 0
155
#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
158
* Ideally, the ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) operation would
159
* be atomic. This operation isn't, but we will do okay anyway. The
160
* "woken check" is only an optimization. The only requirement we have:
161
* If (PS)->woken is set to a value != 0 when interrupting, we have to
162
* write on the the wakeup pipe at least once. Multiple writes are okay.
164
#define ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) ((PS)->woken++)
165
#define ERTS_POLLSET_SET_POLLER_WOKEN(PS) ((PS)->woken = 1, (void) 0)
166
#define ERTS_POLLSET_UNSET_POLLER_WOKEN(PS) ((PS)->woken = 0, (void) 0)
167
#define ERTS_POLLSET_IS_POLLER_WOKEN(PS) ((PS)->woken)
171
#define ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) 1
172
#define ERTS_POLLSET_SET_POLLER_WOKEN(PS)
173
#define ERTS_POLLSET_UNSET_POLLER_WOKEN(PS)
174
#define ERTS_POLLSET_IS_POLLER_WOKEN(PS) 1
180
144
#if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE
181
145
#define ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(PS) \
182
erts_smp_atomic_set(&(PS)->have_update_requests, (long) 1)
146
erts_smp_atomic32_set_nob(&(PS)->have_update_requests, (erts_aint32_t) 1)
183
147
#define ERTS_POLLSET_UNSET_HAVE_UPDATE_REQUESTS(PS) \
184
erts_smp_atomic_set(&(PS)->have_update_requests, (long) 0)
148
erts_smp_atomic32_set_nob(&(PS)->have_update_requests, (erts_aint32_t) 0)
185
149
#define ERTS_POLLSET_HAVE_UPDATE_REQUESTS(PS) \
186
((int) erts_smp_atomic_read(&(PS)->have_update_requests))
150
((int) erts_smp_atomic32_read_nob(&(PS)->have_update_requests))
188
152
#define ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(PS)
189
153
#define ERTS_POLLSET_UNSET_HAVE_UPDATE_REQUESTS(PS)
190
154
#define ERTS_POLLSET_HAVE_UPDATE_REQUESTS(PS) 0
193
#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT && !defined(ERTS_SMP)
195
#define ERTS_POLLSET_UNSET_INTERRUPTED_CHK(PS) unset_interrupted_chk((PS))
196
#define ERTS_POLLSET_UNSET_INTERRUPTED(PS) ((PS)->interrupt = 0, (void) 0)
197
#define ERTS_POLLSET_SET_INTERRUPTED(PS) ((PS)->interrupt = 1, (void) 0)
198
#define ERTS_POLLSET_IS_INTERRUPTED(PS) ((PS)->interrupt)
202
#define ERTS_POLLSET_UNSET_INTERRUPTED_CHK(PS) unset_interrupted_chk((PS))
203
#define ERTS_POLLSET_UNSET_INTERRUPTED(PS) \
205
erts_smp_atomic_set(&(PS)->interrupt, (long) 0); \
206
ERTS_THR_MEMORY_BARRIER; \
208
#define ERTS_POLLSET_SET_INTERRUPTED(PS) \
210
ERTS_THR_MEMORY_BARRIER; \
211
erts_smp_atomic_set(&(PS)->interrupt, (long) 1); \
213
#define ERTS_POLLSET_IS_INTERRUPTED(PS) \
214
((int) erts_smp_atomic_read(&(PS)->interrupt))
218
157
#if ERTS_POLL_USE_FALLBACK
219
158
# if ERTS_POLL_USE_POLL
220
159
# define ERTS_POLL_NEED_FALLBACK(PS) ((PS)->no_poll_fds > 1)
318
257
#if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE
319
258
ErtsPollSetUpdateRequestsBlock update_requests;
320
259
ErtsPollSetUpdateRequestsBlock *curr_upd_req_block;
321
erts_smp_atomic_t have_update_requests;
260
erts_smp_atomic32_t have_update_requests;
324
erts_smp_atomic_t polled;
325
erts_smp_atomic_t woken;
263
erts_atomic32_t polled;
326
264
erts_smp_mtx_t mtx;
327
#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
330
266
#if ERTS_POLL_USE_WAKEUP_PIPE
333
269
#if ERTS_POLL_USE_FALLBACK
334
270
int fallback_used;
336
#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT && !defined(ERTS_SMP)
337
volatile int interrupt;
339
erts_smp_atomic_t interrupt;
272
#if defined(USE_THREADS) || ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
273
erts_atomic32_t wakeup_state;
341
erts_smp_atomic_t timeout;
275
erts_smp_atomic32_t timeout;
342
276
#ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS
343
277
erts_smp_atomic_t no_avoided_wakeups;
344
278
erts_smp_atomic_t no_avoided_interrupts;
349
static ERTS_INLINE int
350
unset_interrupted_chk(ErtsPollSet ps)
353
#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT && !defined(ERTS_SMP)
354
/* This operation isn't atomic, but we have no need at all for an
355
atomic operation here... */
359
res = (int) erts_smp_atomic_xchg(&ps->interrupt, (long) 0);
360
ERTS_THR_MEMORY_BARRIER;
368
static ERTS_INLINE int
369
set_poller_woken_chk(ErtsPollSet ps)
371
ERTS_THR_MEMORY_BARRIER;
372
return (int) erts_smp_atomic_xchg(&ps->woken, (long) 1);
377
283
void erts_silence_warn_unused_result(long unused);
378
284
static void fatal_error(char *format, ...);
379
285
static void fatal_error_async_signal_safe(char *error_str);
430
336
static void print_misc_debug_info(void);
339
#define ERTS_POLL_NOT_WOKEN 0
340
#define ERTS_POLL_WOKEN -1
341
#define ERTS_POLL_WOKEN_INTR 1
343
static ERTS_INLINE void
344
reset_wakeup_state(ErtsPollSet ps)
346
#if defined(USE_THREADS) || ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
347
erts_atomic32_set_mb(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN);
351
static ERTS_INLINE int
352
is_woken(ErtsPollSet ps)
354
#if defined(USE_THREADS) || ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
355
return erts_atomic32_read_acqb(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN;
361
static ERTS_INLINE int
362
is_interrupted_reset(ErtsPollSet ps)
364
#if defined(USE_THREADS) || ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
365
return (erts_atomic32_xchg_nob(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN)
366
== ERTS_POLL_WOKEN_INTR);
372
static ERTS_INLINE void
373
woke_up(ErtsPollSet ps)
375
#if defined(USE_THREADS) || ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
376
erts_aint32_t wakeup_state = erts_atomic32_read_nob(&ps->wakeup_state);
377
if (wakeup_state == ERTS_POLL_NOT_WOKEN)
378
(void) erts_atomic32_cmpxchg_nob(&ps->wakeup_state,
380
ERTS_POLL_NOT_WOKEN);
381
ASSERT(erts_atomic32_read_nob(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN);
434
386
* --- Wakeup pipe -----------------------------------------------------------
437
389
#if ERTS_POLL_USE_WAKEUP_PIPE
439
391
static ERTS_INLINE void
440
wake_poller(ErtsPollSet ps)
392
wake_poller(ErtsPollSet ps, int interrupted, int async_signal_safe)
395
if (async_signal_safe)
398
erts_aint32_t wakeup_state;
400
wakeup_state = erts_atomic32_cmpxchg_relb(&ps->wakeup_state,
402
ERTS_POLL_NOT_WOKEN);
405
* We might unnecessarily write to the pipe, however,
406
* that isn't problematic.
408
wakeup_state = erts_atomic32_read_nob(&ps->wakeup_state);
409
erts_atomic32_set_relb(&ps->wakeup_state, ERTS_POLL_WOKEN_INTR);
411
wake = wakeup_state == ERTS_POLL_NOT_WOKEN;
443
414
* NOTE: This function might be called from signal handlers in the
444
415
* non-smp case; therefore, it has to be async-signal safe in
445
416
* the non-smp case.
447
if (!ERTS_POLLSET_SET_POLLER_WOKEN_CHK(ps)) {
449
420
if (ps->wake_fds[1] < 0)
450
421
return; /* Not initialized yet */
453
424
res = write(ps->wake_fds[1], "!", 1);
454
425
} while (res < 0 && errno == EINTR);
455
426
if (res <= 0 && errno != ERRNO_BLOCK) {
456
fatal_error_async_signal_safe(__FILE__
457
":XXX:wake_poller(): "
458
"Failed to write on wakeup pipe\n");
427
if (async_signal_safe)
428
fatal_error_async_signal_safe(__FILE__
429
":XXX:wake_poller(): "
430
"Failed to write on wakeup pipe\n");
432
fatal_error("%s:%d:wake_poller(): "
433
"Failed to write to wakeup pipe fd=%d: "
437
erl_errno_id(errno), errno);
463
442
static ERTS_INLINE void
464
443
cleanup_wakeup_pipe(ErtsPollSet ps)
445
#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
466
448
int fd = ps->wake_fds[0];
470
452
res = read(fd, buf, sizeof(buf));
453
#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
471
457
} while (res > 0 || (res < 0 && errno == EINTR));
472
458
if (res < 0 && errno != ERRNO_BLOCK) {
473
459
fatal_error("%s:%d:cleanup_wakeup_pipe(): "
852
843
ps->fds_status[fd].flags |= ERTS_POLL_FD_FLG_USEFLBCK;
853
844
ASSERT(ps->fds_status[fd].used_events);
854
845
ps->fds_status[fd].used_events = 0;
855
erts_smp_atomic_dec(&ps->no_of_user_fds);
846
erts_smp_atomic_dec_nob(&ps->no_of_user_fds);
856
847
update_fallback_pollset(ps, fd);
857
848
ASSERT(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK);
902
893
events = ERTS_POLL_EV_E2N(ps->fds_status[fd].events);
904
895
buf[buf_len].events = POLLREMOVE;
905
erts_smp_atomic_dec(&ps->no_of_user_fds);
896
erts_smp_atomic_dec_nob(&ps->no_of_user_fds);
907
898
else if (!ps->fds_status[fd].used_events) {
908
899
buf[buf_len].events = events;
909
erts_smp_atomic_inc(&ps->no_of_user_fds);
900
erts_smp_atomic_inc_nob(&ps->no_of_user_fds);
912
903
if ((ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_RST)
1387
1378
#endif /* ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE */
1389
1380
static ERTS_INLINE ErtsPollEvents
1390
poll_control(ErtsPollSet ps, int fd, ErtsPollEvents events, int on,
1391
int *have_set_have_update_requests,
1381
poll_control(ErtsPollSet ps, int fd, ErtsPollEvents events, int on, int *do_wake)
1394
1383
ErtsPollEvents new_events;
1526
1514
int* do_wake) /* In: Wake up polling thread */
1527
1515
/* Out: Poller is woken */
1530
1517
ErtsPollEvents res;
1532
1519
ERTS_POLLSET_LOCK(ps);
1534
res = poll_control(ps, fd, events, on, &hshur, do_wake);
1521
res = poll_control(ps, fd, events, on, do_wake);
1523
ERTS_POLLSET_UNLOCK(ps);
1536
1525
#ifdef ERTS_SMP
1537
1526
if (*do_wake) {
1527
wake_poller(ps, 0, 0);
1540
1529
#endif /* ERTS_SMP */
1542
ERTS_POLLSET_UNLOCK(ps);
1911
1899
static ERTS_INLINE int
1912
check_fd_events(ErtsPollSet ps, SysTimeval *tv, int max_res, int *ps_locked)
1900
check_fd_events(ErtsPollSet ps, SysTimeval *tv, int max_res)
1914
ASSERT(!*ps_locked);
1915
if (erts_smp_atomic_read(&ps->no_of_user_fds) == 0
1903
if (erts_smp_atomic_read_nob(&ps->no_of_user_fds) == 0
1916
1904
&& tv->tv_usec == 0 && tv->tv_sec == 0) {
1917
1905
/* Nothing to poll and zero timeout; done... */
1921
1909
long timeout = tv->tv_sec*1000 + tv->tv_usec/1000;
1910
if (timeout > ERTS_AINT32_T_MAX)
1911
timeout = ERTS_AINT32_T_MAX;
1922
1912
ASSERT(timeout >= 0);
1923
erts_smp_atomic_set(&ps->timeout, timeout);
1913
erts_smp_atomic32_set_relb(&ps->timeout, (erts_aint32_t) timeout);
1924
1914
#if ERTS_POLL_USE_FALLBACK
1925
1915
if (!(ps->fallback_used = ERTS_POLL_NEED_FALLBACK(ps))) {
1929
1919
timeout = INT_MAX;
1930
1920
if (max_res > ps->res_events_len)
1931
1921
grow_res_events(ps, max_res);
1932
return epoll_wait(ps->kp_fd, ps->res_events, max_res, (int)timeout);
1924
erts_thr_progress_prepare_wait(NULL);
1926
res = epoll_wait(ps->kp_fd, ps->res_events, max_res, (int)timeout);
1933
1927
#elif ERTS_POLL_USE_KQUEUE /* --- kqueue ------------------------------ */
1934
1928
struct timespec ts;
1929
if (max_res > ps->res_events_len)
1930
grow_res_events(ps, max_res);
1933
erts_thr_progress_prepare_wait(NULL);
1935
1935
ts.tv_sec = tv->tv_sec;
1936
1936
ts.tv_nsec = tv->tv_usec*1000;
1937
if (max_res > ps->res_events_len)
1938
grow_res_events(ps, max_res);
1939
return kevent(ps->kp_fd, NULL, 0, ps->res_events, max_res, &ts);
1937
res = kevent(ps->kp_fd, NULL, 0, ps->res_events, max_res, &ts);
1940
1938
#endif /* ----------------------------------------- */
1943
1940
else /* use fallback (i.e. poll() or select()) */
1944
1941
#endif /* ERTS_POLL_USE_FALLBACK */
1961
1958
if (poll_res.dp_nfds > ps->res_events_len)
1962
1959
grow_res_events(ps, poll_res.dp_nfds);
1963
1960
poll_res.dp_fds = ps->res_events;
1963
erts_thr_progress_prepare_wait(NULL);
1964
1965
poll_res.dp_timeout = (int) timeout;
1965
return ioctl(ps->kp_fd, DP_POLL, &poll_res);
1966
res = ioctl(ps->kp_fd, DP_POLL, &poll_res);
1966
1967
#elif ERTS_POLL_USE_POLL /* --- poll -------------------------------- */
1967
1968
if (timeout > INT_MAX)
1968
1969
timeout = INT_MAX;
1969
return poll(ps->poll_fds, ps->no_poll_fds, (int) timeout);
1972
erts_thr_progress_prepare_wait(NULL);
1974
res = poll(ps->poll_fds, ps->no_poll_fds, (int) timeout);
1970
1975
#elif ERTS_POLL_USE_SELECT /* --- select ------------------------------ */
1976
SysTimeval to = *tv;
1972
1978
ps->res_input_fds = ps->input_fds;
1973
1979
ps->res_output_fds = ps->output_fds;
1982
if (to.tv_sec || to.tv_usec)
1983
erts_thr_progress_prepare_wait(NULL);
1974
1985
res = select(ps->max_fd + 1,
1975
1986
&ps->res_input_fds,
1976
1987
&ps->res_output_fds,
1979
1990
#ifdef ERTS_SMP
1991
if (to.tv_sec || to.tv_usec)
1992
erts_thr_progress_finalize_wait(NULL);
1981
1994
&& errno == EBADF
1982
1995
&& ERTS_POLLSET_HAVE_UPDATE_REQUESTS(ps)) {
2129
2144
ERTS_POLL_EXPORT(erts_poll_interrupt)(ErtsPollSet ps, int set)
2146
#if defined(USE_THREADS)
2148
reset_wakeup_state(ps);
2150
wake_poller(ps, 1, 0);
2154
#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
2156
ERTS_POLL_EXPORT(erts_poll_async_sig_interrupt)(ErtsPollSet ps)
2132
* NOTE: This function might be called from signal handlers in the
2133
* non-smp case; therefore, it has to be async-signal safe in
2159
* NOTE: This function is called from signal handlers, it,
2160
* therefore, it has to be async-signal safe.
2137
ERTS_POLLSET_SET_INTERRUPTED(ps);
2138
#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT || defined(ERTS_SMP)
2162
wake_poller(ps, 1, 1);
2143
ERTS_POLLSET_UNSET_INTERRUPTED(ps);
2148
2167
* erts_poll_interrupt_timed():
2150
2169
* is not guaranteed that it will timeout before 'msec' milli seconds.
2153
ERTS_POLL_EXPORT(erts_poll_interrupt_timed)(ErtsPollSet ps, int set, long msec)
2172
ERTS_POLL_EXPORT(erts_poll_interrupt_timed)(ErtsPollSet ps,
2174
erts_short_time_t msec)
2156
if (erts_smp_atomic_read(&ps->timeout) > msec) {
2157
ERTS_POLLSET_SET_INTERRUPTED(ps);
2158
2176
#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT || defined(ERTS_SMP)
2178
reset_wakeup_state(ps);
2180
if (erts_smp_atomic32_read_acqb(&ps->timeout) > (erts_aint32_t) msec)
2181
wake_poller(ps, 1, 0);
2162
2182
#ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS
2164
2184
if (ERTS_POLLSET_IS_POLLED(ps))
2165
erts_smp_atomic_inc(&ps->no_avoided_wakeups);
2166
erts_smp_atomic_inc(&ps->no_avoided_interrupts);
2185
erts_smp_atomic_inc_nob(&ps->no_avoided_wakeups);
2186
erts_smp_atomic_inc_nob(&ps->no_avoided_interrupts);
2168
erts_smp_atomic_inc(&ps->no_interrupt_timed);
2172
ERTS_POLLSET_UNSET_INTERRUPTED(ps);
2188
erts_smp_atomic_inc_nob(&ps->no_interrupt_timed);
2281
2299
ps->update_requests.next = NULL;
2282
2300
ps->update_requests.len = 0;
2283
2301
ps->curr_upd_req_block = &ps->update_requests;
2284
erts_smp_atomic_init(&ps->have_update_requests, 0);
2302
erts_smp_atomic32_init_nob(&ps->have_update_requests, 0);
2286
2304
#ifdef ERTS_SMP
2287
erts_smp_atomic_init(&ps->polled, 0);
2288
erts_smp_atomic_init(&ps->woken, 0);
2305
erts_atomic32_init_nob(&ps->polled, 0);
2289
2306
erts_smp_mtx_init(&ps->mtx, "pollset");
2290
#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
2308
#if defined(USE_THREADS) || ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
2309
erts_atomic32_init_nob(&ps->wakeup_state, (erts_aint32_t) 0);
2293
2311
#if ERTS_POLL_USE_WAKEUP_PIPE
2294
2312
create_wakeup_pipe(ps);
2310
2328
ps->internal_fd_limit = kp_fd + 1;
2311
2329
ps->kp_fd = kp_fd;
2313
#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT && !defined(ERTS_SMP)
2316
erts_smp_atomic_init(&ps->interrupt, 0);
2318
erts_smp_atomic_init(&ps->timeout, LONG_MAX);
2331
erts_smp_atomic32_init_nob(&ps->timeout, ERTS_AINT32_T_MAX);
2319
2332
#ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS
2320
erts_smp_atomic_init(&ps->no_avoided_wakeups, 0);
2321
erts_smp_atomic_init(&ps->no_avoided_interrupts, 0);
2322
erts_smp_atomic_init(&ps->no_interrupt_timed, 0);
2333
erts_smp_atomic_init_nob(&ps->no_avoided_wakeups, 0);
2334
erts_smp_atomic_init_nob(&ps->no_avoided_interrupts, 0);
2335
erts_smp_atomic_init_nob(&ps->no_interrupt_timed, 0);
2324
2337
#if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE
2325
2338
handle_update_requests(ps);
2531
2544
pip->max_fds = max_fds;
2533
2546
#ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS
2534
pip->no_avoided_wakeups = erts_smp_atomic_read(&ps->no_avoided_wakeups);
2535
pip->no_avoided_interrupts = erts_smp_atomic_read(&ps->no_avoided_interrupts);
2536
pip->no_interrupt_timed = erts_smp_atomic_read(&ps->no_interrupt_timed);
2547
pip->no_avoided_wakeups = erts_smp_atomic_read_nob(&ps->no_avoided_wakeups);
2548
pip->no_avoided_interrupts = erts_smp_atomic_read_nob(&ps->no_avoided_interrupts);
2549
pip->no_interrupt_timed = erts_smp_atomic_read_nob(&ps->no_interrupt_timed);
2539
2552
ERTS_POLLSET_UNLOCK(ps);