~ubuntu-branches/ubuntu/precise/zeromq/precise

« back to all changes in this revision

Viewing changes to src/signaler.cpp

  • Committer: Package Import Robot
  • Author(s): Martin Lucina
  • Date: 2011-09-08 16:38:46 UTC
  • mfrom: (7.2.2 sid)
  • Revision ID: package-import@ubuntu.com-20110908163846-wjo6zknbsxmwyokn
Tags: 2.1.9-1
New upstream version. 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
    Copyright (c) 2007-2011 iMatix Corporation
 
3
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
 
4
 
 
5
    This file is part of 0MQ.
 
6
 
 
7
    0MQ is free software; you can redistribute it and/or modify it under
 
8
    the terms of the GNU Lesser General Public License as published by
 
9
    the Free Software Foundation; either version 3 of the License, or
 
10
    (at your option) any later version.
 
11
 
 
12
    0MQ is distributed in the hope that it will be useful,
 
13
    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
14
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
15
    GNU Lesser General Public License for more details.
 
16
 
 
17
    You should have received a copy of the GNU Lesser General Public License
 
18
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
19
*/
 
20
 
 
21
#include "platform.hpp"
 
22
 
 
23
#if defined ZMQ_FORCE_SELECT
 
24
#define ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
 
25
#elif defined ZMQ_FORCE_POLL
 
26
#define ZMQ_SIGNALER_WAIT_BASED_ON_POLL
 
27
#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
 
28
    defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
 
29
    defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
 
30
    defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
 
31
    defined ZMQ_HAVE_NETBSD
 
32
#define ZMQ_SIGNALER_WAIT_BASED_ON_POLL
 
33
#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
 
34
#define ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
 
35
#endif
 
36
 
 
37
//  On AIX, poll.h has to be included before zmq.h to get consistent
 
38
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
 
39
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
 
40
//  names to AIX-specific names).
 
41
#if defined ZMQ_SIGNALER_WAIT_BASED_ON_POLL
 
42
#include <poll.h>
 
43
#elif defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
 
44
#if defined ZMQ_HAVE_WINDOWS
 
45
#include "windows.hpp"
 
46
#elif defined ZMQ_HAVE_HPUX
 
47
#include <sys/param.h>
 
48
#include <sys/types.h>
 
49
#include <sys/time.h>
 
50
#elif defined ZMQ_HAVE_OPENVMS
 
51
#include <sys/types.h>
 
52
#include <sys/time.h>
 
53
#else
 
54
#include <sys/select.h>
 
55
#endif
 
56
#endif
 
57
 
 
58
#include "signaler.hpp"
 
59
#include "likely.hpp"
 
60
#include "err.hpp"
 
61
#include "fd.hpp"
 
62
#include "ip.hpp"
 
63
 
 
64
#if defined ZMQ_HAVE_WINDOWS
 
65
#include "windows.hpp"
 
66
#else
 
67
#include <unistd.h>
 
68
#include <fcntl.h>
 
69
#include <limits.h>
 
70
#include <netinet/tcp.h>
 
71
#include <unistd.h>
 
72
#include <sys/types.h>
 
73
#include <sys/socket.h>
 
74
#endif
 
75
 
 
76
zmq::signaler_t::signaler_t ()
 
77
{
 
78
    //  Create the socketpair for signaling.
 
79
    int rc = make_fdpair (&r, &w);
 
80
    errno_assert (rc == 0);
 
81
 
 
82
    //  Set both fds to non-blocking mode.
 
83
#if defined ZMQ_HAVE_WINDOWS
 
84
    unsigned long argp = 1;
 
85
    rc = ioctlsocket (w, FIONBIO, &argp);
 
86
    wsa_assert (rc != SOCKET_ERROR);
 
87
    rc = ioctlsocket (r, FIONBIO, &argp);
 
88
    wsa_assert (rc != SOCKET_ERROR);
 
89
#else
 
90
    int flags = fcntl (w, F_GETFL, 0);
 
91
    errno_assert (flags >= 0);
 
92
    rc = fcntl (w, F_SETFL, flags | O_NONBLOCK);
 
93
    errno_assert (rc == 0);
 
94
    flags = fcntl (r, F_GETFL, 0);
 
95
    errno_assert (flags >= 0);
 
96
    rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
 
97
    errno_assert (rc == 0);
 
98
#endif
 
99
}
 
100
 
 
101
zmq::signaler_t::~signaler_t ()
 
102
{
 
103
#if defined ZMQ_HAVE_WINDOWS
 
104
    int rc = closesocket (w);
 
105
    wsa_assert (rc != SOCKET_ERROR);
 
106
    rc = closesocket (r);
 
107
    wsa_assert (rc != SOCKET_ERROR);
 
108
#else
 
109
    close (w);
 
110
    close (r);
 
111
#endif
 
112
}
 
113
 
 
114
zmq::fd_t zmq::signaler_t::get_fd ()
 
115
{
 
116
    return r;
 
117
}
 
118
 
 
119
void zmq::signaler_t::send ()
 
120
{
 
121
#if defined ZMQ_HAVE_WINDOWS
 
122
    unsigned char dummy = 0;
 
123
    int nbytes = ::send (w, (char*) &dummy, sizeof (dummy), 0);
 
124
    wsa_assert (nbytes != SOCKET_ERROR);
 
125
    zmq_assert (nbytes == sizeof (dummy));
 
126
#else
 
127
    unsigned char dummy = 0;
 
128
    while (true) {
 
129
        ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
 
130
        if (unlikely (nbytes == -1 && errno == EINTR))
 
131
            continue;
 
132
        zmq_assert (nbytes == sizeof (dummy));
 
133
        break;
 
134
    }
 
135
#endif
 
136
}
 
137
 
 
138
int zmq::signaler_t::wait (int timeout_)
 
139
{
 
140
#ifdef ZMQ_SIGNALER_WAIT_BASED_ON_POLL
 
141
 
 
142
    struct pollfd pfd;
 
143
    pfd.fd = r;
 
144
    pfd.events = POLLIN;
 
145
    int rc = poll (&pfd, 1, timeout_);
 
146
    if (unlikely (rc < 0)) {
 
147
        zmq_assert (errno == EINTR);
 
148
        return -1;
 
149
    }
 
150
    else if (unlikely (rc == 0)) {
 
151
        errno = EAGAIN;
 
152
        return -1;
 
153
    }
 
154
    zmq_assert (rc == 1);
 
155
    zmq_assert (pfd.revents & POLLIN);
 
156
    return 0;
 
157
 
 
158
#elif defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
 
159
 
 
160
    fd_set fds;
 
161
    FD_ZERO (&fds);
 
162
    FD_SET (r, &fds);
 
163
    struct timeval timeout;
 
164
    if (timeout_ >= 0) {
 
165
        timeout.tv_sec = timeout_ / 1000;
 
166
        timeout.tv_usec = timeout_ % 1000 * 1000;
 
167
    }
 
168
#ifdef ZMQ_HAVE_WINDOWS
 
169
    int rc = select (0, &fds, NULL, NULL,
 
170
        timeout_ >= 0 ? &timeout : NULL);
 
171
    wsa_assert (rc != SOCKET_ERROR);
 
172
#else
 
173
    int rc = select (r + 1, &fds, NULL, NULL,
 
174
        timeout_ >= 0 ? &timeout : NULL);
 
175
    if (unlikely (rc < 0)) {
 
176
        zmq_assert (errno == EINTR);
 
177
        return -1;
 
178
    }
 
179
#endif
 
180
    if (unlikely (rc == 0)) {
 
181
        errno = EAGAIN;
 
182
        return -1;
 
183
    }
 
184
    zmq_assert (rc == 1);
 
185
    return 0;
 
186
 
 
187
#else
 
188
#error
 
189
#endif
 
190
}
 
191
 
 
192
void zmq::signaler_t::recv ()
 
193
{
 
194
    //  Attempt to read a signal.
 
195
    unsigned char dummy;
 
196
#ifdef ZMQ_HAVE_WINDOWS
 
197
    int nbytes = ::recv (r, (char*) &dummy, sizeof (dummy), 0);
 
198
    wsa_assert (nbytes != SOCKET_ERROR);
 
199
#else
 
200
    ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
 
201
    errno_assert (nbytes >= 0);
 
202
#endif
 
203
    zmq_assert (nbytes == sizeof (dummy));
 
204
    zmq_assert (dummy == 0);
 
205
}
 
206
 
 
207
int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
 
208
{
 
209
#if defined ZMQ_HAVE_WINDOWS
 
210
 
 
211
    //  Windows has no 'socketpair' function. CreatePipe is no good as pipe
 
212
    //  handles cannot be polled on. Here we create the socketpair by hand.
 
213
    *w_ = INVALID_SOCKET;
 
214
    *r_ = INVALID_SOCKET;
 
215
 
 
216
    //  Create listening socket.
 
217
    SOCKET listener;
 
218
    listener = socket (AF_INET, SOCK_STREAM, 0);
 
219
    wsa_assert (listener != INVALID_SOCKET);
 
220
 
 
221
    //  Set SO_REUSEADDR and TCP_NODELAY on listening socket.
 
222
    BOOL so_reuseaddr = 1;
 
223
    int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
 
224
        (char *)&so_reuseaddr, sizeof (so_reuseaddr));
 
225
    wsa_assert (rc != SOCKET_ERROR);
 
226
    BOOL tcp_nodelay = 1;
 
227
    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
 
228
        (char *)&tcp_nodelay, sizeof (tcp_nodelay));
 
229
    wsa_assert (rc != SOCKET_ERROR);
 
230
 
 
231
    //  Bind listening socket to any free local port.
 
232
    struct sockaddr_in addr;
 
233
    memset (&addr, 0, sizeof (addr));
 
234
    addr.sin_family = AF_INET;
 
235
    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
 
236
    addr.sin_port = 0;
 
237
    rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
 
238
    wsa_assert (rc != SOCKET_ERROR);
 
239
 
 
240
    //  Retrieve local port listener is bound to (into addr).
 
241
    int addrlen = sizeof (addr);
 
242
    rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen);
 
243
    wsa_assert (rc != SOCKET_ERROR);
 
244
 
 
245
    //  Listen for incomming connections.
 
246
    rc = listen (listener, 1);
 
247
    wsa_assert (rc != SOCKET_ERROR);
 
248
 
 
249
    //  Create the writer socket.
 
250
    *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0,  0);
 
251
    wsa_assert (*w_ != INVALID_SOCKET);
 
252
 
 
253
    //  Set TCP_NODELAY on writer socket.
 
254
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
 
255
        (char *)&tcp_nodelay, sizeof (tcp_nodelay));
 
256
    wsa_assert (rc != SOCKET_ERROR);
 
257
 
 
258
    //  Connect writer to the listener.
 
259
    rc = connect (*w_, (sockaddr *) &addr, sizeof (addr));
 
260
    wsa_assert (rc != SOCKET_ERROR);
 
261
 
 
262
    //  Accept connection from writer.
 
263
    *r_ = accept (listener, NULL, NULL);
 
264
    wsa_assert (*r_ != INVALID_SOCKET);
 
265
 
 
266
    //  We don't need the listening socket anymore. Close it.
 
267
    rc = closesocket (listener);
 
268
    wsa_assert (rc != SOCKET_ERROR);
 
269
 
 
270
    return 0;
 
271
 
 
272
#elif defined ZMQ_HAVE_OPENVMS
 
273
 
 
274
    //  Whilst OpenVMS supports socketpair - it maps to AF_INET only.  Further,
 
275
    //  it does not set the socket options TCP_NODELAY and TCP_NODELACK which
 
276
    //  can lead to performance problems.
 
277
    //
 
278
    //  The bug will be fixed in V5.6 ECO4 and beyond.  In the meantime, we'll
 
279
    //  create the socket pair manually.
 
280
    sockaddr_in lcladdr;
 
281
    memset (&lcladdr, 0, sizeof (lcladdr));
 
282
    lcladdr.sin_family = AF_INET;
 
283
    lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
 
284
    lcladdr.sin_port = 0;
 
285
 
 
286
    int listener = socket (AF_INET, SOCK_STREAM, 0);
 
287
    errno_assert (listener != -1);
 
288
 
 
289
    int on = 1;
 
290
    int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
 
291
    errno_assert (rc != -1);
 
292
 
 
293
    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
 
294
    errno_assert (rc != -1);
 
295
 
 
296
    rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
 
297
    errno_assert (rc != -1);
 
298
 
 
299
    socklen_t lcladdr_len = sizeof (lcladdr);
 
300
 
 
301
    rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len);
 
302
    errno_assert (rc != -1);
 
303
 
 
304
    rc = listen (listener, 1);
 
305
    errno_assert (rc != -1);
 
306
 
 
307
    *w_ = socket (AF_INET, SOCK_STREAM, 0);
 
308
    errno_assert (*w_ != -1);
 
309
 
 
310
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
 
311
    errno_assert (rc != -1);
 
312
 
 
313
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
 
314
    errno_assert (rc != -1);
 
315
 
 
316
    rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
 
317
    errno_assert (rc != -1);
 
318
 
 
319
    *r_ = accept (listener, NULL, NULL);
 
320
    errno_assert (*r_ != -1);
 
321
 
 
322
    close (listener);
 
323
 
 
324
    return 0;
 
325
 
 
326
#else // All other implementations support socketpair()
 
327
 
 
328
    int sv [2];
 
329
    int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
 
330
    errno_assert (rc == 0);
 
331
    *w_ = sv [0];
 
332
    *r_ = sv [1];
 
333
    return 0;
 
334
 
 
335
#endif
 
336
}
 
337
 
 
338
#if defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
 
339
#undef ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
 
340
#endif
 
341
#if defined ZMQ_SIGNALER_WAIT_BASED_ON_POLL
 
342
#undef ZMQ_SIGNALER_WAIT_BASED_ON_POLL
 
343
#endif
 
344