~ubuntu-branches/ubuntu/trusty/mysql-5.6/trusty

« back to all changes in this revision

Viewing changes to storage/ndb/include/portlib/ndb_socket_poller.h

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2014-02-12 11:54:27 UTC
  • Revision ID: package-import@ubuntu.com-20140212115427-oq6tfsqxl1wuwehi
Tags: upstream-5.6.15
ImportĀ upstreamĀ versionĀ 5.6.15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
   Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
 
3
 
 
4
   This program is free software; you can redistribute it and/or modify
 
5
   it under the terms of the GNU General Public License as published by
 
6
   the Free Software Foundation; version 2 of the License.
 
7
 
 
8
   This program is distributed in the hope that it will be useful,
 
9
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
   GNU General Public License for more details.
 
12
 
 
13
   You should have received a copy of the GNU General Public License
 
14
   along with this program; if not, write to the Free Software
 
15
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
 
16
*/
 
17
 
 
18
#ifndef NDB_SOCKET_POLLER_H
 
19
#define NDB_SOCKET_POLLER_H
 
20
 
 
21
#include <portlib/NdbTick.h>
 
22
 
 
23
/*
 
24
  Portability layer used for waiting on socket events
 
25
*/
 
26
 
 
27
class ndb_socket_poller {
 
28
  // Max number of fds the list can hold, defaults to 1 and
 
29
  // can be dynamically expanded by calling 'set_max_count'
 
30
  unsigned m_max_count;
 
31
 
 
32
  // Current number of fds in the list
 
33
  unsigned m_count;
 
34
 
 
35
#ifdef HAVE_POLL
 
36
  // The list of pollfds, initial size is 1 and m_pfds will
 
37
  // then point at m_one_pfd. After dynamic expand points at
 
38
  // dynamic list of pollfds
 
39
  struct pollfd m_one_pfd;
 
40
  struct pollfd* m_pfds;
 
41
#else
 
42
#if defined(_WIN32)
 
43
  // Utility functions for dynamically expanding the fd_set
 
44
  // on Windows to get around the hardcoded FD_SETSIZE limit.
 
45
  static bool
 
46
  set_max_count(fd_set* set, fd_set* static_set, unsigned count) {
 
47
    void* ptr = malloc(sizeof(fd_set) + count-1*sizeof(SOCKET));
 
48
    if (!ptr)
 
49
      return false;
 
50
    if (set != static_set)
 
51
      free(set);
 
52
    set = (fd_set*)ptr;
 
53
    clear(set);
 
54
    return true;
 
55
  }
 
56
 
 
57
  static void
 
58
  set_fd(fd_set* set, SOCKET s) {
 
59
    // Avoid use of FD_SET since it silently drop
 
60
    // sockets when FD_SETSIZE fd_count is reached
 
61
    set->fd_array[set->fd_count++] = s;
 
62
  }
 
63
 
 
64
  static void
 
65
  clear(fd_set* set) {
 
66
    FD_ZERO(set);
 
67
  }
 
68
#endif
 
69
  fd_set m_one_read_set;
 
70
  fd_set m_one_write_set;
 
71
  fd_set m_one_excp_set;
 
72
  fd_set* m_read_set;
 
73
  fd_set* m_write_set;
 
74
  fd_set* m_excp_set;
 
75
 
 
76
  // Mapping from "index" to "fd"
 
77
  ndb_native_socket_t m_one_fd;
 
78
  ndb_native_socket_t* m_fds;
 
79
 
 
80
  int m_nfds; // Max fd number for 'select'
 
81
#endif
 
82
 
 
83
public:
 
84
 
 
85
  ndb_socket_poller(void) :
 
86
    m_max_count(1)
 
87
#ifdef HAVE_POLL
 
88
    , m_pfds(&m_one_pfd)
 
89
#else
 
90
    , m_read_set(&m_one_read_set)
 
91
    , m_write_set(&m_one_write_set)
 
92
    , m_excp_set(&m_one_excp_set)
 
93
    , m_fds(&m_one_fd)
 
94
#endif
 
95
  {
 
96
    clear();
 
97
  }
 
98
 
 
99
  void clear(void) {
 
100
    m_count = 0;
 
101
#ifndef HAVE_POLL
 
102
    FD_ZERO(m_read_set);
 
103
    FD_ZERO(m_write_set);
 
104
    FD_ZERO(m_excp_set);
 
105
    m_nfds = 0;
 
106
#endif
 
107
  }
 
108
 
 
109
  ~ndb_socket_poller() {
 
110
#ifdef HAVE_POLL
 
111
    if (m_pfds != &m_one_pfd)
 
112
      delete[] m_pfds;
 
113
#else
 
114
#ifdef _WIN32
 
115
    if (m_read_set != &m_one_read_set)
 
116
      free(m_read_set);
 
117
    if (m_write_set != &m_one_write_set)
 
118
      free(m_write_set);
 
119
    if (m_excp_set != &m_one_excp_set)
 
120
      free(m_excp_set);
 
121
#endif
 
122
    if (m_fds != &m_one_fd)
 
123
      delete[] m_fds;
 
124
#endif
 
125
    }
 
126
 
 
127
  bool set_max_count(unsigned count) {
 
128
    if (count <= m_max_count)
 
129
    {
 
130
      // Ignore decrease or setting same value
 
131
      return true;
 
132
    }
 
133
#ifdef HAVE_POLL
 
134
    struct pollfd* pfds = new struct pollfd[count];
 
135
    if (pfds == NULL)
 
136
      return false;
 
137
    if (m_pfds != &m_one_pfd)
 
138
      delete[] m_pfds;
 
139
    m_pfds = pfds;
 
140
#else
 
141
#if defined(_WIN32)
 
142
    if (count > FD_SETSIZE)
 
143
    {
 
144
      // Expand the arrays above the builtin FD_SETSIZE
 
145
      if (!set_max_count(m_read_set, &m_one_read_set, count) ||
 
146
          !set_max_count(m_write_set, &m_one_write_set, count) ||
 
147
          !set_max_count(m_excp_set, &m_one_excp_set, count))
 
148
        return false;
 
149
    }
 
150
#endif
 
151
    ndb_native_socket_t* fds = new ndb_native_socket_t[count];
 
152
    if (fds == NULL)
 
153
      return false;
 
154
    if (m_fds != &m_one_fd)
 
155
      delete[] m_fds;
 
156
    m_fds = fds;
 
157
#endif
 
158
    m_max_count = count;
 
159
    return true;
 
160
  }
 
161
 
 
162
  unsigned add(ndb_socket_t sock, bool read, bool write, bool error) {
 
163
    const unsigned index = m_count;
 
164
#ifdef HAVE_POLL
 
165
    assert(m_count < m_max_count);
 
166
    struct pollfd &pfd = m_pfds[m_count++];
 
167
    pfd.fd = ndb_socket_get_native(sock);
 
168
 
 
169
    short events = 0;
 
170
    if (read)
 
171
      events |= POLLIN;
 
172
    if (write)
 
173
      events |= POLLOUT;
 
174
    if (error)
 
175
      events |= POLLPRI;
 
176
    pfd.events = events;
 
177
 
 
178
    pfd.revents = 0;
 
179
#else
 
180
#if defined(_WIN32)
 
181
    if (read)
 
182
      set_fd(m_read_set, ndb_socket_get_native(sock));
 
183
    if (write)
 
184
      set_fd(m_write_set, ndb_socket_get_native(sock));
 
185
    if (error)
 
186
      set_fd(m_excp_set, ndb_socket_get_native(sock));
 
187
    // Not counting nfds on Windows since select ignores it anyway
 
188
    assert(m_nfds == 0);
 
189
#else
 
190
    int fd = ndb_socket_get_native(sock);
 
191
    if (fd < 0 || fd >= FD_SETSIZE)
 
192
    {
 
193
      fprintf(stderr, "Maximum value for FD_SETSIZE: %d exceeded when"
 
194
        "trying to add fd: %d", FD_SETSIZE, fd);
 
195
      fflush(stderr);
 
196
      abort();
 
197
    }
 
198
    if (read)
 
199
      FD_SET(fd, m_read_set);
 
200
    if (write)
 
201
      FD_SET(fd, m_write_set);
 
202
    if (error)
 
203
      FD_SET(fd, m_excp_set);
 
204
    if (fd > m_nfds)
 
205
      m_nfds = fd;
 
206
#endif
 
207
    // Maintain mapping from index to fd
 
208
    m_fds[m_count++] = ndb_socket_get_native(sock);
 
209
#endif
 
210
    assert(m_count > index);
 
211
    return index;
 
212
  }
 
213
 
 
214
  unsigned count(void) const {
 
215
    return m_count;
 
216
  }
 
217
 
 
218
  bool is_socket_equal(unsigned index, ndb_socket_t socket) const {
 
219
    assert(index < m_count);
 
220
    assert(m_count <= m_max_count);
 
221
#ifdef HAVE_POLL
 
222
    return (m_pfds[index].fd == ndb_socket_get_native(socket));
 
223
#else
 
224
    return (m_fds[index] == ndb_socket_get_native(socket));
 
225
#endif
 
226
  }
 
227
 
 
228
  bool has_read(unsigned index) const {
 
229
    assert(index < m_count);
 
230
    assert(m_count <= m_max_count);
 
231
#ifdef HAVE_POLL
 
232
    return (m_pfds[index].revents & POLLIN);
 
233
#else
 
234
    return FD_ISSET(m_fds[index], m_read_set);
 
235
#endif
 
236
  }
 
237
 
 
238
  bool has_write(unsigned index) const {
 
239
    assert(index < m_count);
 
240
    assert(m_count <= m_max_count);
 
241
#ifdef HAVE_POLL
 
242
    return (m_pfds[index].revents & POLLOUT);
 
243
#else
 
244
    return FD_ISSET(m_fds[index], m_write_set);
 
245
#endif
 
246
  }
 
247
 
 
248
  /*
 
249
    Wait for event(s) on socket(s) without retry of interrupted wait
 
250
  */
 
251
  int poll_unsafe(int timeout)
 
252
  {
 
253
#ifdef HAVE_POLL
 
254
    return ::poll(m_pfds, m_count, timeout);
 
255
#else
 
256
 
 
257
#ifdef _WIN32
 
258
    if (m_count == 0)
 
259
    {
 
260
      // Windows does not sleep on 'select' with 0 sockets
 
261
      Sleep(timeout);
 
262
      return 0; // Timeout occured
 
263
    }
 
264
#endif
 
265
 
 
266
    struct timeval tv;
 
267
    tv.tv_sec  = (timeout / 1000);
 
268
    tv.tv_usec = (timeout % 1000) * 1000;
 
269
 
 
270
    return select(m_nfds+1, m_read_set, m_write_set, m_excp_set,
 
271
                  timeout == -1 ? NULL : &tv);
 
272
#endif
 
273
  }
 
274
 
 
275
  /*
 
276
    Wait for event(s) on socket(s), retry interrupted wait
 
277
    if there is still time left
 
278
  */
 
279
  int poll(int timeout)
 
280
  {
 
281
    do
 
282
    {
 
283
      const NDB_TICKS start = NdbTick_CurrentMillisecond();
 
284
 
 
285
      const int res = poll_unsafe(timeout);
 
286
      if (likely(res >= 0))
 
287
        return res; // Default return path
 
288
 
 
289
      const int error = my_socket_errno();
 
290
      if (res == -1 &&
 
291
          (error == EINTR || error == EAGAIN))
 
292
      {
 
293
        // Retry if any time left of timeout
 
294
 
 
295
        // Subtract function call time from remaining timeout
 
296
        timeout -= (int)(NdbTick_CurrentMillisecond() - start);
 
297
 
 
298
        if (timeout <= 0)
 
299
          return 0; // Timeout occured
 
300
 
 
301
        //fprintf(stderr, "Got interrupted, retrying... timeout left: %d\n",
 
302
        //        timeout_millis);
 
303
 
 
304
        continue; // Retry interrupted poll
 
305
      }
 
306
 
 
307
      // Unhandled error code, return it
 
308
      return res;
 
309
 
 
310
    } while (true);
 
311
 
 
312
    abort(); // Never reached
 
313
  }
 
314
 
 
315
};
 
316
 
 
317
 
 
318
/*
 
319
  ndb_poll
 
320
  - Utility function for waiting on events on one socket
 
321
    with retry of interrupted wait
 
322
*/
 
323
 
 
324
static inline
 
325
int
 
326
ndb_poll(ndb_socket_t sock,
 
327
         bool read, bool write, bool error, int timeout_millis)
 
328
{
 
329
  ndb_socket_poller poller;
 
330
  (void)poller.add(sock, read, write, error);
 
331
 
 
332
  const int res = poller.poll(timeout_millis);
 
333
  if (res <= 0)
 
334
    return res;
 
335
 
 
336
  assert(res >= 1);
 
337
 
 
338
  return res;
 
339
}
 
340
 
 
341
#endif