~ubuntu-branches/ubuntu/trusty/zeromq3/trusty

« back to all changes in this revision

Viewing changes to src/ipc_connecter.cpp

  • Committer: Package Import Robot
  • Author(s): Alessandro Ghedini
  • Date: 2012-06-12 10:53:58 UTC
  • mfrom: (1.1.1)
  • Revision ID: package-import@ubuntu.com-20120612105358-irh7e8ivwc4566fi
Tags: 3.2.0~rc1+dfsg-1
* New upstream RC release
* Use repack.{local,stub} instead of get-orig-source rule
* Add 01_fix-unused-variable-error.patch
* Remove build dependency on uuid-dev (no more needed)
* Add 02_check-ifdef-SO_NOSIGPIPE.patch to fix kfreebsd build
* Add 03_fix-test_shutdown_stress-segfault.patch

Show diffs side-by-side

added added

removed removed

Lines of Context:
31
31
#include "random.hpp"
32
32
#include "err.hpp"
33
33
#include "ip.hpp"
 
34
#include "address.hpp"
 
35
#include "ipc_address.hpp"
 
36
#include "session_base.hpp"
34
37
 
35
38
#include <unistd.h>
36
39
#include <sys/types.h>
39
42
 
40
43
zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
41
44
      class session_base_t *session_, const options_t &options_,
42
 
      const char *address_, bool wait_) :
 
45
      const address_t *addr_, bool wait_) :
43
46
    own_t (io_thread_, options_),
44
47
    io_object_t (io_thread_),
 
48
    addr (addr_),
45
49
    s (retired_fd),
46
50
    handle_valid (false),
47
51
    wait (wait_),
48
52
    session (session_),
49
53
    current_reconnect_ivl(options.reconnect_ivl)
50
54
{
51
 
    //  TODO: set_addess should be called separately, so that the error
52
 
    //  can be propagated.
53
 
    int rc = set_address (address_);
54
 
    zmq_assert (rc == 0);
 
55
    zmq_assert (addr);
 
56
    zmq_assert (addr->protocol == "ipc");
 
57
    addr->to_string (endpoint);
55
58
}
56
59
 
57
60
zmq::ipc_connecter_t::~ipc_connecter_t ()
58
61
{
 
62
    zmq_assert (!wait);
 
63
    zmq_assert (!handle_valid);
 
64
    zmq_assert (s == retired_fd);
 
65
}
 
66
 
 
67
void zmq::ipc_connecter_t::process_plug ()
 
68
{
59
69
    if (wait)
 
70
        add_reconnect_timer();
 
71
    else
 
72
        start_connecting ();
 
73
}
 
74
 
 
75
void zmq::ipc_connecter_t::process_term (int linger_)
 
76
{
 
77
    if (wait) {
60
78
        cancel_timer (reconnect_timer_id);
61
 
    if (handle_valid)
 
79
        wait = false;
 
80
    }
 
81
 
 
82
    if (handle_valid) {
62
83
        rm_fd (handle);
 
84
        handle_valid = false;
 
85
    }
63
86
 
64
87
    if (s != retired_fd)
65
88
        close ();
66
 
}
67
89
 
68
 
void zmq::ipc_connecter_t::process_plug ()
69
 
{
70
 
    if (wait)
71
 
        add_reconnect_timer();
72
 
    else
73
 
        start_connecting ();
 
90
    own_t::process_term (linger_);
74
91
}
75
92
 
76
93
void zmq::ipc_connecter_t::in_event ()
104
121
 
105
122
    //  Shut the connecter down.
106
123
    terminate ();
 
124
 
 
125
    session->monitor_event (ZMQ_EVENT_CONNECTED, endpoint.c_str(), fd);
107
126
}
108
127
 
109
128
void zmq::ipc_connecter_t::timer_event (int id_)
126
145
        return;
127
146
    }
128
147
 
129
 
    //  Connection establishment may be dealyed. Poll for its completion.
130
 
    else if (rc == -1 && errno == EAGAIN) {
 
148
    //  Connection establishment may be delayed. Poll for its completion.
 
149
    else if (rc == -1 && errno == EINPROGRESS) {
131
150
        handle = add_fd (s);
132
151
        handle_valid = true;
133
152
        set_pollout (handle);
 
153
        session->monitor_event (ZMQ_EVENT_CONNECT_DELAYED, endpoint.c_str(), zmq_errno());
134
154
        return;
135
155
    }
136
156
 
142
162
 
143
163
void zmq::ipc_connecter_t::add_reconnect_timer()
144
164
{
145
 
    add_timer (get_new_reconnect_ivl(), reconnect_timer_id);
 
165
    int rc_ivl = get_new_reconnect_ivl();
 
166
    add_timer (rc_ivl, reconnect_timer_id);
 
167
    session->monitor_event (ZMQ_EVENT_CONNECT_RETRIED, endpoint.c_str(), rc_ivl);
146
168
}
147
169
 
148
170
int zmq::ipc_connecter_t::get_new_reconnect_ivl ()
165
187
    return this_interval;
166
188
}
167
189
 
168
 
int zmq::ipc_connecter_t::set_address (const char *addr_)
169
 
{
170
 
    return address.resolve (addr_);
171
 
}
172
 
 
173
190
int zmq::ipc_connecter_t::open ()
174
191
{
175
192
    zmq_assert (s == retired_fd);
183
200
    unblock_socket (s);
184
201
 
185
202
    //  Connect to the remote peer.
186
 
    int rc = ::connect (s, address.addr (), address.addrlen ());
 
203
    int rc = ::connect (
 
204
        s, addr->resolved.ipc_addr->addr (),
 
205
        addr->resolved.ipc_addr->addrlen ());
187
206
 
188
207
    //  Connect was successfull immediately.
189
208
    if (rc == 0)
190
209
        return 0;
 
210
        
 
211
    //  Translate other error codes indicating asynchronous connect has been
 
212
    //  launched to a uniform EINPROGRESS.
 
213
    if (rc == -1 && errno == EINTR) {
 
214
        errno = EINPROGRESS;
 
215
        return -1;
 
216
    }
191
217
 
192
218
    //  Forward the error.
193
219
    return -1;
197
223
{
198
224
    zmq_assert (s != retired_fd);
199
225
    int rc = ::close (s);
200
 
    if (rc != 0)
 
226
    if (rc != 0) {
 
227
        session->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno());
201
228
        return -1;
 
229
    }
 
230
    session->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s);
202
231
    s = retired_fd;
203
232
    return 0;
204
233
}