2
Copyright (c) 2007-2011 iMatix Corporation
3
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
5
This file is part of 0MQ.
7
0MQ is free software; you can redistribute it and/or modify it under
8
the terms of the GNU Lesser General Public License as published by
9
the Free Software Foundation; either version 3 of the License, or
10
(at your option) any later version.
12
0MQ is distributed in the hope that it will be useful,
13
but WITHOUT ANY WARRANTY; without even the implied warranty of
14
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
GNU Lesser General Public License for more details.
17
You should have received a copy of the GNU Lesser General Public License
18
along with this program. If not, see <http://www.gnu.org/licenses/>.
21
#include "platform.hpp"
23
#if defined ZMQ_FORCE_SELECT
24
#define ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
25
#elif defined ZMQ_FORCE_POLL
26
#define ZMQ_SIGNALER_WAIT_BASED_ON_POLL
27
#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
28
defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
29
defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
30
defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
31
defined ZMQ_HAVE_NETBSD
32
#define ZMQ_SIGNALER_WAIT_BASED_ON_POLL
33
#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
34
#define ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
37
// On AIX, poll.h has to be included before zmq.h to get consistent
38
// definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
39
// instead of 'events' and 'revents' and defines macros to map from POSIX-y
40
// names to AIX-specific names).
41
#if defined ZMQ_SIGNALER_WAIT_BASED_ON_POLL
43
#elif defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
44
#if defined ZMQ_HAVE_WINDOWS
45
#include "windows.hpp"
46
#elif defined ZMQ_HAVE_HPUX
47
#include <sys/param.h>
48
#include <sys/types.h>
50
#elif defined ZMQ_HAVE_OPENVMS
51
#include <sys/types.h>
54
#include <sys/select.h>
58
#include "signaler.hpp"
64
#if defined ZMQ_HAVE_WINDOWS
65
#include "windows.hpp"
70
#include <netinet/tcp.h>
72
#include <sys/types.h>
73
#include <sys/socket.h>
76
zmq::signaler_t::signaler_t ()
78
// Create the socketpair for signaling.
79
int rc = make_fdpair (&r, &w);
80
errno_assert (rc == 0);
82
// Set both fds to non-blocking mode.
83
#if defined ZMQ_HAVE_WINDOWS
84
unsigned long argp = 1;
85
rc = ioctlsocket (w, FIONBIO, &argp);
86
wsa_assert (rc != SOCKET_ERROR);
87
rc = ioctlsocket (r, FIONBIO, &argp);
88
wsa_assert (rc != SOCKET_ERROR);
90
int flags = fcntl (w, F_GETFL, 0);
91
errno_assert (flags >= 0);
92
rc = fcntl (w, F_SETFL, flags | O_NONBLOCK);
93
errno_assert (rc == 0);
94
flags = fcntl (r, F_GETFL, 0);
95
errno_assert (flags >= 0);
96
rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
97
errno_assert (rc == 0);
101
zmq::signaler_t::~signaler_t ()
103
#if defined ZMQ_HAVE_WINDOWS
104
int rc = closesocket (w);
105
wsa_assert (rc != SOCKET_ERROR);
106
rc = closesocket (r);
107
wsa_assert (rc != SOCKET_ERROR);
114
zmq::fd_t zmq::signaler_t::get_fd ()
119
void zmq::signaler_t::send ()
121
#if defined ZMQ_HAVE_WINDOWS
122
unsigned char dummy = 0;
123
int nbytes = ::send (w, (char*) &dummy, sizeof (dummy), 0);
124
wsa_assert (nbytes != SOCKET_ERROR);
125
zmq_assert (nbytes == sizeof (dummy));
127
unsigned char dummy = 0;
129
ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
130
if (unlikely (nbytes == -1 && errno == EINTR))
132
zmq_assert (nbytes == sizeof (dummy));
138
int zmq::signaler_t::wait (int timeout_)
140
#ifdef ZMQ_SIGNALER_WAIT_BASED_ON_POLL
145
int rc = poll (&pfd, 1, timeout_);
146
if (unlikely (rc < 0)) {
147
zmq_assert (errno == EINTR);
150
else if (unlikely (rc == 0)) {
154
zmq_assert (rc == 1);
155
zmq_assert (pfd.revents & POLLIN);
158
#elif defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
163
struct timeval timeout;
165
timeout.tv_sec = timeout_ / 1000;
166
timeout.tv_usec = timeout_ % 1000 * 1000;
168
#ifdef ZMQ_HAVE_WINDOWS
169
int rc = select (0, &fds, NULL, NULL,
170
timeout_ >= 0 ? &timeout : NULL);
171
wsa_assert (rc != SOCKET_ERROR);
173
int rc = select (r + 1, &fds, NULL, NULL,
174
timeout_ >= 0 ? &timeout : NULL);
175
if (unlikely (rc < 0)) {
176
zmq_assert (errno == EINTR);
180
if (unlikely (rc == 0)) {
184
zmq_assert (rc == 1);
192
void zmq::signaler_t::recv ()
194
// Attempt to read a signal.
196
#ifdef ZMQ_HAVE_WINDOWS
197
int nbytes = ::recv (r, (char*) &dummy, sizeof (dummy), 0);
198
wsa_assert (nbytes != SOCKET_ERROR);
200
ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
201
errno_assert (nbytes >= 0);
203
zmq_assert (nbytes == sizeof (dummy));
204
zmq_assert (dummy == 0);
207
int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
209
#if defined ZMQ_HAVE_WINDOWS
211
// Windows has no 'socketpair' function. CreatePipe is no good as pipe
212
// handles cannot be polled on. Here we create the socketpair by hand.
213
*w_ = INVALID_SOCKET;
214
*r_ = INVALID_SOCKET;
216
// Create listening socket.
218
listener = socket (AF_INET, SOCK_STREAM, 0);
219
wsa_assert (listener != INVALID_SOCKET);
221
// Set SO_REUSEADDR and TCP_NODELAY on listening socket.
222
BOOL so_reuseaddr = 1;
223
int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
224
(char *)&so_reuseaddr, sizeof (so_reuseaddr));
225
wsa_assert (rc != SOCKET_ERROR);
226
BOOL tcp_nodelay = 1;
227
rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
228
(char *)&tcp_nodelay, sizeof (tcp_nodelay));
229
wsa_assert (rc != SOCKET_ERROR);
231
// Bind listening socket to any free local port.
232
struct sockaddr_in addr;
233
memset (&addr, 0, sizeof (addr));
234
addr.sin_family = AF_INET;
235
addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
237
rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
238
wsa_assert (rc != SOCKET_ERROR);
240
// Retrieve local port listener is bound to (into addr).
241
int addrlen = sizeof (addr);
242
rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen);
243
wsa_assert (rc != SOCKET_ERROR);
245
// Listen for incomming connections.
246
rc = listen (listener, 1);
247
wsa_assert (rc != SOCKET_ERROR);
249
// Create the writer socket.
250
*w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0);
251
wsa_assert (*w_ != INVALID_SOCKET);
253
// Set TCP_NODELAY on writer socket.
254
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
255
(char *)&tcp_nodelay, sizeof (tcp_nodelay));
256
wsa_assert (rc != SOCKET_ERROR);
258
// Connect writer to the listener.
259
rc = connect (*w_, (sockaddr *) &addr, sizeof (addr));
260
wsa_assert (rc != SOCKET_ERROR);
262
// Accept connection from writer.
263
*r_ = accept (listener, NULL, NULL);
264
wsa_assert (*r_ != INVALID_SOCKET);
266
// We don't need the listening socket anymore. Close it.
267
rc = closesocket (listener);
268
wsa_assert (rc != SOCKET_ERROR);
272
#elif defined ZMQ_HAVE_OPENVMS
274
// Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further,
275
// it does not set the socket options TCP_NODELAY and TCP_NODELACK which
276
// can lead to performance problems.
278
// The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll
279
// create the socket pair manually.
281
memset (&lcladdr, 0, sizeof (lcladdr));
282
lcladdr.sin_family = AF_INET;
283
lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
284
lcladdr.sin_port = 0;
286
int listener = socket (AF_INET, SOCK_STREAM, 0);
287
errno_assert (listener != -1);
290
int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
291
errno_assert (rc != -1);
293
rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
294
errno_assert (rc != -1);
296
rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
297
errno_assert (rc != -1);
299
socklen_t lcladdr_len = sizeof (lcladdr);
301
rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len);
302
errno_assert (rc != -1);
304
rc = listen (listener, 1);
305
errno_assert (rc != -1);
307
*w_ = socket (AF_INET, SOCK_STREAM, 0);
308
errno_assert (*w_ != -1);
310
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
311
errno_assert (rc != -1);
313
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
314
errno_assert (rc != -1);
316
rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
317
errno_assert (rc != -1);
319
*r_ = accept (listener, NULL, NULL);
320
errno_assert (*r_ != -1);
326
#else // All other implementations support socketpair()
329
int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
330
errno_assert (rc == 0);
338
#if defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
339
#undef ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
341
#if defined ZMQ_SIGNALER_WAIT_BASED_ON_POLL
342
#undef ZMQ_SIGNALER_WAIT_BASED_ON_POLL