2
Copyright (c) 2009-2011 250bpm s.r.o.
3
Copyright (c) 2007-2009 iMatix Corporation
4
Copyright (c) 2010-2011 Miru Limited
5
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
7
This file is part of 0MQ.
9
0MQ is free software; you can redistribute it and/or modify it under
10
the terms of the GNU Lesser General Public License as published by
11
the Free Software Foundation; either version 3 of the License, or
12
(at your option) any later version.
14
0MQ is distributed in the hope that it will be useful,
15
but WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17
GNU Lesser General Public License for more details.
19
You should have received a copy of the GNU Lesser General Public License
20
along with this program. If not, see <http://www.gnu.org/licenses/>.
23
#include "platform.hpp"
25
#if defined ZMQ_HAVE_OPENPGM
27
#ifdef ZMQ_HAVE_WINDOWS
28
#include "windows.hpp"
33
#include "io_thread.hpp"
34
#include "pgm_sender.hpp"
35
#include "session_base.hpp"
40
zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
41
const options_t &options_) :
42
io_object_t (parent_),
46
pgm_socket (false, options_),
54
int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
56
int rc = pgm_socket.init (udp_encapsulation_, network_);
60
out_buffer_size = pgm_socket.get_max_tsdu_size ();
61
out_buffer = (unsigned char*) malloc (out_buffer_size);
62
alloc_assert (out_buffer);
67
void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
69
// Alocate 2 fds for PGM socket.
70
fd_t downlink_socket_fd = retired_fd;
71
fd_t uplink_socket_fd = retired_fd;
72
fd_t rdata_notify_fd = retired_fd;
73
fd_t pending_notify_fd = retired_fd;
75
encoder.set_session (session_);
77
// Fill fds from PGM transport and add them to the poller.
78
pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
79
&rdata_notify_fd, &pending_notify_fd);
81
handle = add_fd (downlink_socket_fd);
82
uplink_handle = add_fd (uplink_socket_fd);
83
rdata_notify_handle = add_fd (rdata_notify_fd);
84
pending_notify_handle = add_fd (pending_notify_fd);
86
// Set POLLIN. We wont never want to stop polling for uplink = we never
87
// want to stop porocess NAKs.
88
set_pollin (uplink_handle);
89
set_pollin (rdata_notify_handle);
90
set_pollin (pending_notify_handle);
92
// Set POLLOUT for downlink_socket_handle.
95
// PGM is not able to pass subscriptions upstream, thus we have no idea
96
// what messages are peers interested in. Because of that we have to
97
// subscribe for all the messages.
100
*(unsigned char*) msg.data () = 1;
101
bool ok = session_->write (&msg);
106
void zmq::pgm_sender_t::unplug ()
109
cancel_timer (rx_timer_id);
110
has_rx_timer = false;
114
cancel_timer (tx_timer_id);
115
has_tx_timer = false;
119
rm_fd (uplink_handle);
120
rm_fd (rdata_notify_handle);
121
rm_fd (pending_notify_handle);
122
encoder.set_session (NULL);
125
void zmq::pgm_sender_t::terminate ()
131
void zmq::pgm_sender_t::activate_out ()
133
set_pollout (handle);
137
void zmq::pgm_sender_t::activate_in ()
142
zmq::pgm_sender_t::~pgm_sender_t ()
150
void zmq::pgm_sender_t::in_event ()
153
cancel_timer (rx_timer_id);
154
has_rx_timer = false;
157
// In-event on sender side means NAK or SPMR receiving from some peer.
158
pgm_socket.process_upstream ();
159
if (errno == ENOMEM || errno == EBUSY) {
160
const long timeout = pgm_socket.get_rx_timeout ();
161
add_timer (timeout, rx_timer_id);
166
void zmq::pgm_sender_t::out_event ()
168
// POLLOUT event from send socket. If write buffer is empty,
169
// try to read new data from the encoder.
170
if (write_size == 0) {
172
// First two bytes (sizeof uint16_t) are used to store message
173
// offset in following steps. Note that by passing our buffer to
174
// the get data function we prevent it from returning its own buffer.
175
unsigned char *bf = out_buffer + sizeof (uint16_t);
176
size_t bfsz = out_buffer_size - sizeof (uint16_t);
178
encoder.get_data (&bf, &bfsz, &offset);
180
// If there are no data to write stop polling for output.
182
reset_pollout (handle);
186
// Put offset information in the buffer.
187
write_size = bfsz + sizeof (uint16_t);
188
put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset);
192
cancel_timer (tx_timer_id);
193
has_tx_timer = false;
197
size_t nbytes = pgm_socket.send (out_buffer, write_size);
199
// We can write either all data or 0 which means rate limit reached.
200
if (nbytes == write_size) {
203
zmq_assert (nbytes == 0);
205
if (errno == ENOMEM) {
206
const long timeout = pgm_socket.get_tx_timeout ();
207
add_timer (timeout, tx_timer_id);
210
zmq_assert (errno == EBUSY);
214
void zmq::pgm_sender_t::timer_event (int token)
216
// Timer cancels on return by poller_base.
217
if (token == rx_timer_id) {
218
has_rx_timer = false;
220
} else if (token == tx_timer_id) {
221
has_tx_timer = false;