2
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
4
This file is part of 0MQ.
6
0MQ is free software; you can redistribute it and/or modify it under
7
the terms of the GNU Lesser General Public License as published by
8
the Free Software Foundation; either version 3 of the License, or
9
(at your option) any later version.
11
0MQ is distributed in the hope that it will be useful,
12
but WITHOUT ANY WARRANTY; without even the implied warranty of
13
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
GNU Lesser General Public License for more details.
16
You should have received a copy of the GNU Lesser General Public License
17
along with this program. If not, see <http://www.gnu.org/licenses/>.
20
#include "session_base.hpp"
21
#include "i_engine.hpp"
25
#include "tcp_connecter.hpp"
26
#include "ipc_connecter.hpp"
27
#include "pgm_sender.hpp"
28
#include "pgm_receiver.hpp"
29
#include "address.hpp"
34
zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
35
bool connect_, class socket_base_t *socket_, const options_t &options_,
36
const address_t *addr_)
38
session_base_t *s = NULL;
39
switch (options_.type) {
41
s = new (std::nothrow) req_session_t (io_thread_, connect_,
42
socket_, options_, addr_);
55
s = new (std::nothrow) session_base_t (io_thread_, connect_,
56
socket_, options_, addr_);
66
zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
67
bool connect_, class socket_base_t *socket_, const options_t &options_,
68
const address_t *addr_) :
69
own_t (io_thread_, options_),
70
io_object_t (io_thread_),
74
incomplete_in (false),
78
io_thread (io_thread_),
79
has_linger_timer (false),
84
zmq::session_base_t::~session_base_t ()
87
zmq_assert (!zap_pipe);
89
// If there's still a pending linger timer, remove it.
90
if (has_linger_timer) {
91
cancel_timer (linger_timer_id);
92
has_linger_timer = false;
102
void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
104
zmq_assert (!is_terminating ());
108
pipe->set_event_sink (this);
111
int zmq::session_base_t::pull_msg (msg_t *msg_)
113
if (!pipe || !pipe->read (msg_)) {
117
incomplete_in = msg_->flags () & msg_t::more ? true : false;
122
int zmq::session_base_t::push_msg (msg_t *msg_)
124
if (pipe && pipe->write (msg_)) {
125
int rc = msg_->init ();
126
errno_assert (rc == 0);
134
int zmq::session_base_t::read_zap_msg (msg_t *msg_)
136
if (zap_pipe == NULL) {
141
if (!zap_pipe->read (msg_)) {
149
int zmq::session_base_t::write_zap_msg (msg_t *msg_)
151
if (zap_pipe == NULL) {
156
const bool ok = zap_pipe->write (msg_);
159
if ((msg_->flags () & msg_t::more) == 0)
162
const int rc = msg_->init ();
163
errno_assert (rc == 0);
167
void zmq::session_base_t::reset ()
171
void zmq::session_base_t::flush ()
177
void zmq::session_base_t::clean_pipes ()
181
// Get rid of half-processed messages in the out pipe. Flush any
182
// unflushed messages upstream.
186
// Remove any half-read message from the in pipe.
187
while (incomplete_in) {
189
int rc = msg.init ();
190
errno_assert (rc == 0);
191
rc = pull_msg (&msg);
192
errno_assert (rc == 0);
194
errno_assert (rc == 0);
199
void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
201
// Drop the reference to the deallocated pipe if required.
202
zmq_assert (pipe_ == pipe
204
|| terminating_pipes.count (pipe_) == 1);
207
// If this is our current pipe, remove it
210
if (pipe_ == zap_pipe) {
214
// Remove the pipe from the detached pipes set
215
terminating_pipes.erase (pipe_);
217
if (!is_terminating () && options.raw_sock) {
219
engine->terminate ();
225
// If we are waiting for pending messages to be sent, at this point
226
// we are sure that there will be no more messages and we can proceed
227
// with termination safely.
228
if (pending && !pipe && !zap_pipe && terminating_pipes.empty ())
229
proceed_with_term ();
232
void zmq::session_base_t::read_activated (pipe_t *pipe_)
234
// Skip activating if we're detaching this pipe
235
if (unlikely(pipe_ != pipe && pipe_ != zap_pipe)) {
236
zmq_assert (terminating_pipes.count (pipe_) == 1);
240
if (unlikely (engine == NULL)) {
245
if (likely (pipe_ == pipe))
246
engine->restart_output ();
248
engine->zap_msg_available ();
251
void zmq::session_base_t::write_activated (pipe_t *pipe_)
253
// Skip activating if we're detaching this pipe
255
zmq_assert (terminating_pipes.count (pipe_) == 1);
260
engine->restart_input ();
263
void zmq::session_base_t::hiccuped (pipe_t *)
265
// Hiccups are always sent from session to socket, not the other
270
zmq::socket_base_t *zmq::session_base_t::get_socket ()
275
void zmq::session_base_t::process_plug ()
278
start_connecting (false);
281
int zmq::session_base_t::zap_connect ()
283
zmq_assert (zap_pipe == NULL);
285
endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
286
if (peer.socket == NULL) {
287
errno = ECONNREFUSED;
290
if (peer.options.type != ZMQ_REP
291
&& peer.options.type != ZMQ_ROUTER) {
292
errno = ECONNREFUSED;
296
// Create a bi-directional pipe that will connect
297
// session with zap socket.
298
object_t *parents [2] = {this, peer.socket};
299
pipe_t *new_pipes [2] = {NULL, NULL};
300
int hwms [2] = {0, 0};
301
bool conflates [2] = {false, false};
302
int rc = pipepair (parents, new_pipes, hwms, conflates);
303
errno_assert (rc == 0);
305
// Attach local end of the pipe to this socket object.
306
zap_pipe = new_pipes [0];
307
zap_pipe->set_nodelay ();
308
zap_pipe->set_event_sink (this);
310
send_bind (peer.socket, new_pipes [1], false);
312
// Send empty identity if required by the peer.
313
if (peer.options.recv_identity) {
316
errno_assert (rc == 0);
317
id.set_flags (msg_t::identity);
318
bool ok = zap_pipe->write (&id);
326
void zmq::session_base_t::process_attach (i_engine *engine_)
328
zmq_assert (engine_ != NULL);
330
// Create the pipe if it does not exist yet.
331
if (!pipe && !is_terminating ()) {
332
object_t *parents [2] = {this, socket};
333
pipe_t *pipes [2] = {NULL, NULL};
335
bool conflate = options.conflate &&
336
(options.type == ZMQ_DEALER ||
337
options.type == ZMQ_PULL ||
338
options.type == ZMQ_PUSH ||
339
options.type == ZMQ_PUB ||
340
options.type == ZMQ_SUB);
342
int hwms [2] = {conflate? -1 : options.rcvhwm,
343
conflate? -1 : options.sndhwm};
344
bool conflates [2] = {conflate, conflate};
345
int rc = pipepair (parents, pipes, hwms, conflates);
346
errno_assert (rc == 0);
348
// Plug the local end of the pipe.
349
pipes [0]->set_event_sink (this);
351
// Remember the local end of the pipe.
355
// Ask socket to plug into the remote end of the pipe.
356
send_bind (socket, pipes [1]);
359
// Plug in the engine.
360
zmq_assert (!engine);
362
engine->plug (io_thread, this);
365
void zmq::session_base_t::detach ()
367
// Engine is dead. Let's forget about it.
370
// Remove any half-done messages from the pipes.
373
// Send the event to the derived class.
376
// Just in case there's only a delimiter in the pipe.
381
zap_pipe->check_read ();
384
void zmq::session_base_t::process_term (int linger_)
386
zmq_assert (!pending);
388
// If the termination of the pipe happens before the term command is
389
// delivered there's nothing much to do. We can proceed with the
390
// standard termination immediately.
391
if (!pipe && !zap_pipe) {
392
proceed_with_term ();
399
// If there's finite linger value, delay the termination.
400
// If linger is infinite (negative) we don't even have to set
403
zmq_assert (!has_linger_timer);
404
add_timer (linger_, linger_timer_id);
405
has_linger_timer = true;
408
// Start pipe termination process. Delay the termination till all messages
409
// are processed in case the linger time is non-zero.
410
pipe->terminate (linger_ != 0);
412
// TODO: Should this go into pipe_t::terminate ?
413
// In case there's no engine and there's only delimiter in the
414
// pipe it wouldn't be ever read. Thus we check for it explicitly.
418
if (zap_pipe != NULL)
419
zap_pipe->terminate (false);
422
void zmq::session_base_t::proceed_with_term ()
424
// The pending phase has just ended.
427
// Continue with standard termination.
428
own_t::process_term (0);
431
void zmq::session_base_t::timer_event (int id_)
434
// Linger period expired. We can proceed with termination even though
435
// there are still pending messages to be sent.
436
zmq_assert (id_ == linger_timer_id);
437
has_linger_timer = false;
439
// Ask pipe to terminate even though there may be pending messages in it.
441
pipe->terminate (false);
444
void zmq::session_base_t::detached ()
446
// Transient session self-destructs after peer disconnects.
452
// For delayed connect situations, terminate the pipe
453
// and reestablish later on
454
if (pipe && options.immediate == 1
455
&& addr->protocol != "pgm" && addr->protocol != "epgm") {
457
pipe->terminate (false);
458
terminating_pipes.insert (pipe);
465
if (options.reconnect_ivl != -1)
466
start_connecting (true);
468
// For subscriber sockets we hiccup the inbound pipe, which will cause
469
// the socket object to resend all the subscriptions.
470
if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
474
void zmq::session_base_t::start_connecting (bool wait_)
476
zmq_assert (connect);
478
// Choose I/O thread to run connecter in. Given that we are already
479
// running in an I/O thread, there must be at least one available.
480
io_thread_t *io_thread = choose_io_thread (options.affinity);
481
zmq_assert (io_thread);
483
// Create the connecter object.
485
if (addr->protocol == "tcp") {
486
tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t (
487
io_thread, this, options, addr, wait_);
488
alloc_assert (connecter);
489
launch_child (connecter);
493
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
494
if (addr->protocol == "ipc") {
495
ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t (
496
io_thread, this, options, addr, wait_);
497
alloc_assert (connecter);
498
launch_child (connecter);
503
#ifdef ZMQ_HAVE_OPENPGM
505
// Both PGM and EPGM transports are using the same infrastructure.
506
if (addr->protocol == "pgm" || addr->protocol == "epgm") {
508
zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
509
|| options.type == ZMQ_SUB || options.type == ZMQ_XSUB);
511
// For EPGM transport with UDP encapsulation of PGM is used.
512
bool const udp_encapsulation = addr->protocol == "epgm";
514
// At this point we'll create message pipes to the session straight
515
// away. There's no point in delaying it as no concept of 'connect'
516
// exists with PGM anyway.
517
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
520
pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
522
alloc_assert (pgm_sender);
524
int rc = pgm_sender->init (udp_encapsulation, addr->address.c_str ());
525
errno_assert (rc == 0);
527
send_attach (this, pgm_sender);
532
pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
534
alloc_assert (pgm_receiver);
536
int rc = pgm_receiver->init (udp_encapsulation, addr->address.c_str ());
537
errno_assert (rc == 0);
539
send_attach (this, pgm_receiver);