2
Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
4
This program is free software; you can redistribute it and/or modify
5
it under the terms of the GNU General Public License as published by
6
the Free Software Foundation; version 2 of the License.
8
This program is distributed in the hope that it will be useful,
9
but WITHOUT ANY WARRANTY; without even the implied warranty of
10
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
GNU General Public License for more details.
13
You should have received a copy of the GNU General Public License
14
along with this program; if not, write to the Free Software
15
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
#ifndef NDB_SOCKET_POLLER_H
19
#define NDB_SOCKET_POLLER_H
21
#include <portlib/NdbTick.h>
24
Portability layer used for waiting on socket events
27
class ndb_socket_poller {
28
// Max number of fds the list can hold, defaults to 1 and
29
// can be dynamically expanded by calling 'set_max_count'
32
// Current number of fds in the list
36
// The list of pollfds, initial size is 1 and m_pfds will
37
// then point at m_one_pfd. After dynamic expand points at
38
// dynamic list of pollfds
39
struct pollfd m_one_pfd;
40
struct pollfd* m_pfds;
43
// Utility functions for dynamically expanding the fd_set
44
// on Windows to get around the hardcoded FD_SETSIZE limit.
46
set_max_count(fd_set* set, fd_set* static_set, unsigned count) {
47
void* ptr = malloc(sizeof(fd_set) + count-1*sizeof(SOCKET));
50
if (set != static_set)
58
set_fd(fd_set* set, SOCKET s) {
59
// Avoid use of FD_SET since it silently drop
60
// sockets when FD_SETSIZE fd_count is reached
61
set->fd_array[set->fd_count++] = s;
69
fd_set m_one_read_set;
70
fd_set m_one_write_set;
71
fd_set m_one_excp_set;
76
// Mapping from "index" to "fd"
77
ndb_native_socket_t m_one_fd;
78
ndb_native_socket_t* m_fds;
80
int m_nfds; // Max fd number for 'select'
85
ndb_socket_poller(void) :
90
, m_read_set(&m_one_read_set)
91
, m_write_set(&m_one_write_set)
92
, m_excp_set(&m_one_excp_set)
103
FD_ZERO(m_write_set);
109
~ndb_socket_poller() {
111
if (m_pfds != &m_one_pfd)
115
if (m_read_set != &m_one_read_set)
117
if (m_write_set != &m_one_write_set)
119
if (m_excp_set != &m_one_excp_set)
122
if (m_fds != &m_one_fd)
127
bool set_max_count(unsigned count) {
128
if (count <= m_max_count)
130
// Ignore decrease or setting same value
134
struct pollfd* pfds = new struct pollfd[count];
137
if (m_pfds != &m_one_pfd)
142
if (count > FD_SETSIZE)
144
// Expand the arrays above the builtin FD_SETSIZE
145
if (!set_max_count(m_read_set, &m_one_read_set, count) ||
146
!set_max_count(m_write_set, &m_one_write_set, count) ||
147
!set_max_count(m_excp_set, &m_one_excp_set, count))
151
ndb_native_socket_t* fds = new ndb_native_socket_t[count];
154
if (m_fds != &m_one_fd)
162
unsigned add(ndb_socket_t sock, bool read, bool write, bool error) {
163
const unsigned index = m_count;
165
assert(m_count < m_max_count);
166
struct pollfd &pfd = m_pfds[m_count++];
167
pfd.fd = ndb_socket_get_native(sock);
182
set_fd(m_read_set, ndb_socket_get_native(sock));
184
set_fd(m_write_set, ndb_socket_get_native(sock));
186
set_fd(m_excp_set, ndb_socket_get_native(sock));
187
// Not counting nfds on Windows since select ignores it anyway
190
int fd = ndb_socket_get_native(sock);
191
if (fd < 0 || fd >= FD_SETSIZE)
193
fprintf(stderr, "Maximum value for FD_SETSIZE: %d exceeded when"
194
"trying to add fd: %d", FD_SETSIZE, fd);
199
FD_SET(fd, m_read_set);
201
FD_SET(fd, m_write_set);
203
FD_SET(fd, m_excp_set);
207
// Maintain mapping from index to fd
208
m_fds[m_count++] = ndb_socket_get_native(sock);
210
assert(m_count > index);
214
unsigned count(void) const {
218
bool is_socket_equal(unsigned index, ndb_socket_t socket) const {
219
assert(index < m_count);
220
assert(m_count <= m_max_count);
222
return (m_pfds[index].fd == ndb_socket_get_native(socket));
224
return (m_fds[index] == ndb_socket_get_native(socket));
228
bool has_read(unsigned index) const {
229
assert(index < m_count);
230
assert(m_count <= m_max_count);
232
return (m_pfds[index].revents & POLLIN);
234
return FD_ISSET(m_fds[index], m_read_set);
238
bool has_write(unsigned index) const {
239
assert(index < m_count);
240
assert(m_count <= m_max_count);
242
return (m_pfds[index].revents & POLLOUT);
244
return FD_ISSET(m_fds[index], m_write_set);
249
Wait for event(s) on socket(s) without retry of interrupted wait
251
int poll_unsafe(int timeout)
254
return ::poll(m_pfds, m_count, timeout);
260
// Windows does not sleep on 'select' with 0 sockets
262
return 0; // Timeout occured
267
tv.tv_sec = (timeout / 1000);
268
tv.tv_usec = (timeout % 1000) * 1000;
270
return select(m_nfds+1, m_read_set, m_write_set, m_excp_set,
271
timeout == -1 ? NULL : &tv);
276
Wait for event(s) on socket(s), retry interrupted wait
277
if there is still time left
279
int poll(int timeout)
283
const NDB_TICKS start = NdbTick_CurrentMillisecond();
285
const int res = poll_unsafe(timeout);
286
if (likely(res >= 0))
287
return res; // Default return path
289
const int error = my_socket_errno();
291
(error == EINTR || error == EAGAIN))
293
// Retry if any time left of timeout
295
// Subtract function call time from remaining timeout
296
timeout -= (int)(NdbTick_CurrentMillisecond() - start);
299
return 0; // Timeout occured
301
//fprintf(stderr, "Got interrupted, retrying... timeout left: %d\n",
304
continue; // Retry interrupted poll
307
// Unhandled error code, return it
312
abort(); // Never reached
320
- Utility function for waiting on events on one socket
321
with retry of interrupted wait
326
ndb_poll(ndb_socket_t sock,
327
bool read, bool write, bool error, int timeout_millis)
329
ndb_socket_poller poller;
330
(void)poller.add(sock, read, write, error);
332
const int res = poller.poll(timeout_millis);