~ubuntu-branches/ubuntu/saucy/zeromq3/saucy

« back to all changes in this revision

Viewing changes to src/tcp_listener.cpp

  • Committer: Package Import Robot
  • Author(s): Alessandro Ghedini
  • Date: 2012-06-04 21:21:09 UTC
  • Revision ID: package-import@ubuntu.com-20120604212109-b7b3m0rn21o8oo2q
Tags: upstream-3.1.0~beta+dfsg
ImportĀ upstreamĀ versionĀ 3.1.0~beta+dfsg

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
    Copyright (c) 2009-2011 250bpm s.r.o.
 
3
    Copyright (c) 2007-2010 iMatix Corporation
 
4
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
 
5
 
 
6
    This file is part of 0MQ.
 
7
 
 
8
    0MQ is free software; you can redistribute it and/or modify it under
 
9
    the terms of the GNU Lesser General Public License as published by
 
10
    the Free Software Foundation; either version 3 of the License, or
 
11
    (at your option) any later version.
 
12
 
 
13
    0MQ is distributed in the hope that it will be useful,
 
14
    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
15
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
16
    GNU Lesser General Public License for more details.
 
17
 
 
18
    You should have received a copy of the GNU Lesser General Public License
 
19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
20
*/
 
21
 
 
22
#include <new>
 
23
 
 
24
#include <string.h>
 
25
 
 
26
#include "platform.hpp"
 
27
#include "tcp_listener.hpp"
 
28
#include "stream_engine.hpp"
 
29
#include "io_thread.hpp"
 
30
#include "session_base.hpp"
 
31
#include "config.hpp"
 
32
#include "err.hpp"
 
33
#include "ip.hpp"
 
34
 
 
35
#ifdef ZMQ_HAVE_WINDOWS
 
36
#include "windows.hpp"
 
37
#else
 
38
#include <unistd.h>
 
39
#include <sys/socket.h>
 
40
#include <arpa/inet.h>
 
41
#include <netinet/tcp.h>
 
42
#include <netinet/in.h>
 
43
#include <netdb.h>
 
44
#include <fcntl.h>
 
45
#endif
 
46
 
 
47
#ifdef ZMQ_HAVE_OPENVMS
 
48
#include <ioctl.h>
 
49
#endif
 
50
 
 
51
zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_,
 
52
      socket_base_t *socket_, const options_t &options_) :
 
53
    own_t (io_thread_, options_),
 
54
    io_object_t (io_thread_),
 
55
    has_file (false),
 
56
    s (retired_fd),
 
57
    socket (socket_)
 
58
{
 
59
}
 
60
 
 
61
zmq::tcp_listener_t::~tcp_listener_t ()
 
62
{
 
63
    if (s != retired_fd)
 
64
        close ();
 
65
}
 
66
 
 
67
void zmq::tcp_listener_t::process_plug ()
 
68
{
 
69
    //  Start polling for incoming connections.
 
70
    handle = add_fd (s);
 
71
    set_pollin (handle);
 
72
}
 
73
 
 
74
void zmq::tcp_listener_t::process_term (int linger_)
 
75
{
 
76
    rm_fd (handle);
 
77
    own_t::process_term (linger_);
 
78
}
 
79
 
 
80
void zmq::tcp_listener_t::in_event ()
 
81
{
 
82
    fd_t fd = accept ();
 
83
 
 
84
    //  If connection was reset by the peer in the meantime, just ignore it.
 
85
    //  TODO: Handle specific errors like ENFILE/EMFILE etc.
 
86
    if (fd == retired_fd)
 
87
        return;
 
88
 
 
89
    tune_tcp_socket (fd);
 
90
 
 
91
    //  Create the engine object for this connection.
 
92
    stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options);
 
93
    alloc_assert (engine);
 
94
 
 
95
    //  Choose I/O thread to run connecter in. Given that we are already
 
96
    //  running in an I/O thread, there must be at least one available.
 
97
    io_thread_t *io_thread = choose_io_thread (options.affinity);
 
98
    zmq_assert (io_thread);
 
99
 
 
100
    //  Create and launch a session object. 
 
101
    session_base_t *session = session_base_t::create (io_thread, false, socket,
 
102
        options, NULL, NULL);
 
103
    errno_assert (session);
 
104
    session->inc_seqnum ();
 
105
    launch_child (session);
 
106
    send_attach (session, engine, false);
 
107
}
 
108
 
 
109
void zmq::tcp_listener_t::close ()
 
110
{
 
111
    zmq_assert (s != retired_fd);
 
112
#ifdef ZMQ_HAVE_WINDOWS
 
113
    int rc = closesocket (s);
 
114
    wsa_assert (rc != SOCKET_ERROR);
 
115
#else
 
116
    int rc = ::close (s);
 
117
    errno_assert (rc == 0);
 
118
#endif
 
119
    s = retired_fd;
 
120
}
 
121
 
 
122
int zmq::tcp_listener_t::set_address (const char *addr_)
 
123
{
 
124
    //  Convert the textual address into address structure.
 
125
    int rc = address.resolve (addr_, true, options.ipv4only ? true : false);
 
126
    if (rc != 0)
 
127
        return -1;
 
128
 
 
129
    //  Create a listening socket.
 
130
    s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP);
 
131
#ifdef ZMQ_HAVE_WINDOWS
 
132
    if (s == INVALID_SOCKET)
 
133
        wsa_error_to_errno ();
 
134
#endif
 
135
 
 
136
    //  IPv6 address family not supported, try automatic downgrade to IPv4.
 
137
    if (address.family () == AF_INET6 && errno == EAFNOSUPPORT &&
 
138
          !options.ipv4only) {
 
139
        rc = address.resolve (addr_, true, true);
 
140
        if (rc != 0)
 
141
            return rc;
 
142
        s = ::socket (address.family (), SOCK_STREAM, IPPROTO_TCP);
 
143
    }
 
144
 
 
145
#ifdef ZMQ_HAVE_WINDOWS
 
146
    if (s == INVALID_SOCKET) {
 
147
        wsa_error_to_errno ();
 
148
        return -1;
 
149
    }
 
150
#else
 
151
    if (s == -1)
 
152
        return -1;
 
153
#endif
 
154
 
 
155
    //  On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
 
156
    //  Switch it on in such cases.
 
157
    if (address.family () == AF_INET6)
 
158
        enable_ipv4_mapping (s);
 
159
 
 
160
    //  Allow reusing of the address.
 
161
    int flag = 1;
 
162
#ifdef ZMQ_HAVE_WINDOWS
 
163
    rc = setsockopt (s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
 
164
        (const char*) &flag, sizeof (int));
 
165
    wsa_assert (rc != SOCKET_ERROR);
 
166
#else
 
167
    rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
 
168
    errno_assert (rc == 0);
 
169
#endif
 
170
 
 
171
    //  Bind the socket to the network interface and port.
 
172
    rc = bind (s, address.addr (), address.addrlen ());
 
173
#ifdef ZMQ_HAVE_WINDOWS
 
174
    if (rc == SOCKET_ERROR) {
 
175
        wsa_error_to_errno ();
 
176
        return -1;
 
177
    }
 
178
#else
 
179
    if (rc != 0)
 
180
        return -1;
 
181
#endif
 
182
 
 
183
    //  Listen for incomming connections.
 
184
    rc = listen (s, options.backlog);
 
185
#ifdef ZMQ_HAVE_WINDOWS
 
186
    if (rc == SOCKET_ERROR) {
 
187
        wsa_error_to_errno ();
 
188
        return -1;
 
189
    }
 
190
#else
 
191
    if (rc != 0)
 
192
        return -1;
 
193
#endif
 
194
 
 
195
    return 0;
 
196
}
 
197
 
 
198
zmq::fd_t zmq::tcp_listener_t::accept ()
 
199
{
 
200
    //  Accept one connection and deal with different failure modes.
 
201
    zmq_assert (s != retired_fd);
 
202
    fd_t sock = ::accept (s, NULL, NULL);
 
203
#ifdef ZMQ_HAVE_WINDOWS
 
204
    if (sock == INVALID_SOCKET) {
 
205
        wsa_assert (WSAGetLastError () == WSAEWOULDBLOCK ||
 
206
            WSAGetLastError () == WSAECONNRESET);
 
207
        return retired_fd;
 
208
    }
 
209
#else
 
210
    if (sock == -1) {
 
211
        errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
 
212
            errno == EINTR || errno == ECONNABORTED || errno == EPROTO ||
 
213
            errno == ENOBUFS);
 
214
        return retired_fd;
 
215
    }
 
216
#endif
 
217
    return sock;
 
218
}
 
219