2
Copyright (c) 2009-2011 250bpm s.r.o.
3
Copyright (c) 2007-2009 iMatix Corporation
4
Copyright (c) 2010-2011 Miru Limited
5
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
7
This file is part of 0MQ.
9
0MQ is free software; you can redistribute it and/or modify it under
10
the terms of the GNU Lesser General Public License as published by
11
the Free Software Foundation; either version 3 of the License, or
12
(at your option) any later version.
14
0MQ is distributed in the hope that it will be useful,
15
but WITHOUT ANY WARRANTY; without even the implied warranty of
16
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17
GNU Lesser General Public License for more details.
19
You should have received a copy of the GNU Lesser General Public License
20
along with this program. If not, see <http://www.gnu.org/licenses/>.
23
#include "platform.hpp"
25
#ifdef ZMQ_HAVE_OPENPGM
27
#ifdef ZMQ_HAVE_WINDOWS
28
#include "windows.hpp"
39
#include "options.hpp"
40
#include "pgm_socket.hpp"
47
#define MSG_ERRQUEUE 0x2000
50
zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :
58
pgm_msgv_processed (0)
62
// Resolve PGM socket address.
63
// network_ of the form <interface & multicast group decls>:<IP port>
64
// e.g. eth0;239.192.0.1:7500
65
// link-local;224.250.0.1,224.250.0.2;224.250.0.3:8000
66
// ;[fe80::1%en0]:7500
67
int zmq::pgm_socket_t::init_address (const char *network_,
68
struct pgm_addrinfo_t **addr, uint16_t *port_number)
70
// Parse port number, start from end for IPv6
71
const char *port_delim = strrchr (network_, ':');
77
*port_number = atoi (port_delim + 1);
80
if (port_delim - network_ >= (int) sizeof (network) - 1) {
84
memset (network, '\0', sizeof (network));
85
memcpy (network, network_, port_delim - network_);
87
pgm_error_t *pgm_error = NULL;
88
struct pgm_addrinfo_t hints, *res = NULL;
89
sa_family_t sa_family;
91
memset (&hints, 0, sizeof (hints));
92
hints.ai_family = AF_UNSPEC;
93
if (!pgm_getaddrinfo (network, NULL, &res, &pgm_error)) {
95
// Invalid parameters don't set pgm_error_t.
96
zmq_assert (pgm_error != NULL);
97
if (pgm_error->domain == PGM_ERROR_DOMAIN_IF &&
99
// NB: cannot catch EAI_BADFLAGS.
100
( pgm_error->code != PGM_ERROR_SERVICE &&
101
pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT)) {
103
// User, host, or network configuration or transient error.
104
pgm_error_free (pgm_error);
109
// Fatal OpenPGM internal error.
115
// Create, bind and connect PGM socket.
116
int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
118
// Can not open transport before destroying old one.
119
zmq_assert (sock == NULL);
120
zmq_assert (options.rate > 0);
122
// Zero counter used in msgrecv.
124
nbytes_processed = 0;
125
pgm_msgv_processed = 0;
127
uint16_t port_number;
128
struct pgm_addrinfo_t *res = NULL;
129
sa_family_t sa_family;
131
pgm_error_t *pgm_error = NULL;
133
if (init_address(network_, &res, &port_number) < 0) {
137
zmq_assert (res != NULL);
139
// Pick up detected IP family.
140
sa_family = res->ai_send_addrs[0].gsr_group.ss_family;
142
// Create IP/PGM or UDP/PGM socket.
143
if (udp_encapsulation_) {
144
if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP,
147
// Invalid parameters don't set pgm_error_t.
148
zmq_assert (pgm_error != NULL);
149
if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
150
pgm_error->code != PGM_ERROR_BADF &&
151
pgm_error->code != PGM_ERROR_FAULT &&
152
pgm_error->code != PGM_ERROR_NOPROTOOPT &&
153
pgm_error->code != PGM_ERROR_FAILED))
155
// User, host, or network configuration or transient error.
158
// Fatal OpenPGM internal error.
162
// All options are of data type int
163
const int encapsulation_port = port_number;
164
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT,
165
&encapsulation_port, sizeof (encapsulation_port)))
167
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT,
168
&encapsulation_port, sizeof (encapsulation_port)))
172
if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM,
175
// Invalid parameters don't set pgm_error_t.
176
zmq_assert (pgm_error != NULL);
177
if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
178
pgm_error->code != PGM_ERROR_BADF &&
179
pgm_error->code != PGM_ERROR_FAULT &&
180
pgm_error->code != PGM_ERROR_NOPROTOOPT &&
181
pgm_error->code != PGM_ERROR_FAILED))
183
// User, host, or network configuration or transient error.
186
// Fatal OpenPGM internal error.
192
const int rcvbuf = (int) options.rcvbuf;
194
if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf,
199
const int sndbuf = (int) options.sndbuf;
201
if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf,
206
const int max_tpdu = (int) pgm_max_tpdu;
207
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu,
213
const int recv_only = 1,
214
rxw_max_tpdu = (int) pgm_max_tpdu,
215
rxw_sqns = compute_sqns (rxw_max_tpdu),
216
peer_expiry = pgm_secs (300),
217
spmr_expiry = pgm_msecs (25),
218
nak_bo_ivl = pgm_msecs (50),
219
nak_rpt_ivl = pgm_msecs (200),
220
nak_rdata_ivl = pgm_msecs (200),
221
nak_data_retries = 50,
222
nak_ncf_retries = 50;
224
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only,
225
sizeof (recv_only)) ||
226
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns,
227
sizeof (rxw_sqns)) ||
228
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry,
229
sizeof (peer_expiry)) ||
230
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry,
231
sizeof (spmr_expiry)) ||
232
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl,
233
sizeof (nak_bo_ivl)) ||
234
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl,
235
sizeof (nak_rpt_ivl)) ||
236
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL,
237
&nak_rdata_ivl, sizeof (nak_rdata_ivl)) ||
238
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES,
239
&nak_data_retries, sizeof (nak_data_retries)) ||
240
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES,
241
&nak_ncf_retries, sizeof (nak_ncf_retries)))
244
const int send_only = 1,
245
max_rte = (int) ((options.rate * 1000) / 8),
246
txw_max_tpdu = (int) pgm_max_tpdu,
247
txw_sqns = compute_sqns (txw_max_tpdu),
248
ambient_spm = pgm_secs (30),
249
heartbeat_spm[] = { pgm_msecs (100),
259
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY,
260
&send_only, sizeof (send_only)) ||
261
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_ODATA_MAX_RTE,
262
&max_rte, sizeof (max_rte)) ||
263
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS,
264
&txw_sqns, sizeof (txw_sqns)) ||
265
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM,
266
&ambient_spm, sizeof (ambient_spm)) ||
267
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM,
268
&heartbeat_spm, sizeof (heartbeat_spm)))
272
// PGM transport GSI.
273
struct pgm_sockaddr_t addr;
275
memset (&addr, 0, sizeof(addr));
276
addr.sa_port = port_number;
277
addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT;
279
// Create random GSI.
281
buf [0] = generate_random ();
282
buf [1] = generate_random ();
283
if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, (uint8_t*) buf, 8))
287
// Bind a transport to the specified network devices.
288
struct pgm_interface_req_t if_req;
289
memset (&if_req, 0, sizeof(if_req));
290
if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface;
291
if_req.ir_scope_id = 0;
292
if (AF_INET6 == sa_family) {
293
struct sockaddr_in6 sa6;
294
memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof (sa6));
295
if_req.ir_scope_id = sa6.sin6_scope_id;
297
if (!pgm_bind3 (sock, &addr, sizeof (addr), &if_req, sizeof (if_req),
298
&if_req, sizeof (if_req), &pgm_error)) {
300
// Invalid parameters don't set pgm_error_t.
301
zmq_assert (pgm_error != NULL);
302
if ((pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET ||
303
pgm_error->domain == PGM_ERROR_DOMAIN_IF) && (
304
pgm_error->code != PGM_ERROR_INVAL &&
305
pgm_error->code != PGM_ERROR_BADF &&
306
pgm_error->code != PGM_ERROR_FAULT))
308
// User, host, or network configuration or transient error.
311
// Fatal OpenPGM internal error.
315
// Join IP multicast groups.
316
for (unsigned i = 0; i < res->ai_recv_addrs_len; i++) {
317
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_JOIN_GROUP,
318
&res->ai_recv_addrs [i], sizeof (struct group_req)))
321
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_GROUP,
322
&res->ai_send_addrs [0], sizeof (struct group_req)))
325
pgm_freeaddrinfo (res);
328
// Set IP level parameters.
330
const int multicast_loop = 0;
331
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP,
332
&multicast_loop, sizeof (multicast_loop)))
335
const int multicast_hops = options.multicast_hops;
336
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS,
337
&multicast_hops, sizeof (multicast_hops)))
340
// Expedited Forwarding PHB for network elements, no ECN.
341
const int dscp = 0x2e << 2;
342
if (AF_INET6 != sa_family && !pgm_setsockopt (sock,
343
IPPROTO_PGM, PGM_TOS, &dscp, sizeof (dscp)))
346
const int nonblocking = 1;
347
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK,
348
&nonblocking, sizeof (nonblocking)))
352
// Connect PGM transport to start state machine.
353
if (!pgm_connect (sock, &pgm_error)) {
355
// Invalid parameters don't set pgm_error_t.
356
zmq_assert (pgm_error != NULL);
360
// For receiver transport preallocate pgm_msgv array.
362
zmq_assert (in_batch_size > 0);
363
size_t max_tsdu_size = get_max_tsdu_size ();
364
pgm_msgv_len = (int) in_batch_size / max_tsdu_size;
365
if ((int) in_batch_size % max_tsdu_size)
367
zmq_assert (pgm_msgv_len);
369
pgm_msgv = (pgm_msgv_t*) malloc (sizeof (pgm_msgv_t) * pgm_msgv_len);
370
alloc_assert (pgm_msgv);
377
pgm_close (sock, FALSE);
381
pgm_freeaddrinfo (res);
384
if (pgm_error != NULL) {
385
pgm_error_free (pgm_error);
392
zmq::pgm_socket_t::~pgm_socket_t ()
397
pgm_close (sock, TRUE);
400
// Get receiver fds. receive_fd_ is signaled for incoming packets,
401
// waiting_pipe_fd_ is signaled for state driven events and data.
402
void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_,
403
fd_t *waiting_pipe_fd_)
408
zmq_assert (receive_fd_);
409
zmq_assert (waiting_pipe_fd_);
411
socklen = sizeof (*receive_fd_);
412
rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_,
415
zmq_assert (socklen == sizeof (*receive_fd_));
417
socklen = sizeof (*waiting_pipe_fd_);
418
rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, waiting_pipe_fd_,
421
zmq_assert (socklen == sizeof (*waiting_pipe_fd_));
424
// Get fds and store them into user allocated memory.
425
// send_fd is for non-blocking send wire notifications.
426
// receive_fd_ is for incoming back-channel protocol packets.
427
// rdata_notify_fd_ is raised for waiting repair transmissions.
428
// pending_notify_fd_ is for state driven events.
429
void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_,
430
fd_t *rdata_notify_fd_, fd_t *pending_notify_fd_)
435
zmq_assert (send_fd_);
436
zmq_assert (receive_fd_);
437
zmq_assert (rdata_notify_fd_);
438
zmq_assert (pending_notify_fd_);
440
socklen = sizeof (*send_fd_);
441
rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_SEND_SOCK, send_fd_, &socklen);
443
zmq_assert (socklen == sizeof (*receive_fd_));
445
socklen = sizeof (*receive_fd_);
446
rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_,
449
zmq_assert (socklen == sizeof (*receive_fd_));
451
socklen = sizeof (*rdata_notify_fd_);
452
rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_REPAIR_SOCK, rdata_notify_fd_,
455
zmq_assert (socklen == sizeof (*rdata_notify_fd_));
457
socklen = sizeof (*pending_notify_fd_);
458
rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK,
459
pending_notify_fd_, &socklen);
461
zmq_assert (socklen == sizeof (*pending_notify_fd_));
464
// Send one APDU, transmit window owned memory.
465
// data_len_ must be less than one TPDU.
466
size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
470
const int status = pgm_send (sock, data_, data_len_, &nbytes);
472
// We have to write all data as one packet.
474
zmq_assert (status == PGM_IO_STATUS_NORMAL);
475
zmq_assert (nbytes == data_len_);
477
zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED ||
478
status == PGM_IO_STATUS_WOULD_BLOCK);
480
if (status == PGM_IO_STATUS_RATE_LIMITED)
486
// Save return value.
487
last_tx_status = status;
492
long zmq::pgm_socket_t::get_rx_timeout ()
494
if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED &&
495
last_rx_status != PGM_IO_STATUS_TIMER_PENDING)
499
socklen_t optlen = sizeof (tv);
500
const bool rc = pgm_getsockopt (sock, IPPROTO_PGM,
501
last_rx_status == PGM_IO_STATUS_RATE_LIMITED ? PGM_RATE_REMAIN :
502
PGM_TIME_REMAIN, &tv, &optlen);
505
const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
510
long zmq::pgm_socket_t::get_tx_timeout ()
512
if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED)
516
socklen_t optlen = sizeof (tv);
517
const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv,
521
const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
526
// Return max TSDU size without fragmentation from current PGM transport.
527
size_t zmq::pgm_socket_t::get_max_tsdu_size ()
530
socklen_t optlen = sizeof (max_tsdu);
532
bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_MSS, &max_tsdu, &optlen);
534
zmq_assert (optlen == sizeof (max_tsdu));
535
return (size_t) max_tsdu;
538
// pgm_recvmsgv is called to fill the pgm_msgv array up to pgm_msgv_len.
539
// In subsequent calls data from pgm_msgv structure are returned.
540
ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
542
size_t raw_data_len = 0;
544
// We just sent all data from pgm_transport_recvmsgv up
545
// and have to return 0 that another engine in this thread is scheduled.
546
if (nbytes_rec == nbytes_processed && nbytes_rec > 0) {
548
// Reset all the counters.
550
nbytes_processed = 0;
551
pgm_msgv_processed = 0;
556
// If we have are going first time or if we have processed all pgm_msgv_t
557
// structure previously read from the pgm socket.
558
if (nbytes_rec == nbytes_processed) {
560
// Check program flow.
561
zmq_assert (pgm_msgv_processed == 0);
562
zmq_assert (nbytes_processed == 0);
563
zmq_assert (nbytes_rec == 0);
565
// Receive a vector of Application Protocol Domain Unit's (APDUs)
566
// from the transport.
567
pgm_error_t *pgm_error = NULL;
569
const int status = pgm_recvmsgv (sock, pgm_msgv,
570
pgm_msgv_len, MSG_ERRQUEUE, &nbytes_rec, &pgm_error);
572
// Invalid parameters.
573
zmq_assert (status != PGM_IO_STATUS_ERROR);
575
last_rx_status = status;
577
// In a case when no ODATA/RDATA fired POLLIN event (SPM...)
578
// pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING.
579
if (status == PGM_IO_STATUS_TIMER_PENDING) {
581
zmq_assert (nbytes_rec == 0);
583
// In case if no RDATA/ODATA caused POLLIN 0 is
590
// Send SPMR, NAK, ACK is rate limited.
591
if (status == PGM_IO_STATUS_RATE_LIMITED) {
593
zmq_assert (nbytes_rec == 0);
595
// In case if no RDATA/ODATA caused POLLIN 0 is returned.
601
// No peers and hence no incoming packets.
602
if (status == PGM_IO_STATUS_WOULD_BLOCK) {
604
zmq_assert (nbytes_rec == 0);
606
// In case if no RDATA/ODATA caused POLLIN 0 is returned.
613
if (status == PGM_IO_STATUS_RESET) {
615
struct pgm_sk_buff_t* skb = pgm_msgv [0].msgv_skb [0];
617
// Save lost data TSI.
621
// In case of dala loss -1 is returned.
627
zmq_assert (status == PGM_IO_STATUS_NORMAL);
631
zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
634
// Zero byte payloads are valid in PGM, but not 0MQ protocol.
635
zmq_assert (nbytes_rec > 0);
637
// Only one APDU per pgm_msgv_t structure is allowed.
638
zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_len == 1);
640
struct pgm_sk_buff_t* skb =
641
pgm_msgv [pgm_msgv_processed].msgv_skb [0];
643
// Take pointers from pgm_msgv_t structure.
644
*raw_data_ = skb->data;
645
raw_data_len = skb->len;
650
// Move the the next pgm_msgv_t structure.
651
pgm_msgv_processed++;
652
zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
653
nbytes_processed +=raw_data_len;
658
void zmq::pgm_socket_t::process_upstream ()
660
pgm_msgv_t dummy_msg;
662
size_t dummy_bytes = 0;
663
pgm_error_t *pgm_error = NULL;
665
const int status = pgm_recvmsgv (sock, &dummy_msg,
666
1, MSG_ERRQUEUE, &dummy_bytes, &pgm_error);
668
// Invalid parameters.
669
zmq_assert (status != PGM_IO_STATUS_ERROR);
671
// No data should be returned.
672
zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING ||
673
status == PGM_IO_STATUS_RATE_LIMITED ||
674
status == PGM_IO_STATUS_WOULD_BLOCK));
676
last_rx_status = status;
678
if (status == PGM_IO_STATUS_TIMER_PENDING)
680
else if (status == PGM_IO_STATUS_RATE_LIMITED)
686
int zmq::pgm_socket_t::compute_sqns (int tpdu_)
688
// Convert rate into B/ms.
689
uint64_t rate = uint64_t (options.rate) / 8;
691
// Compute the size of the buffer in bytes.
692
uint64_t size = uint64_t (options.recovery_ivl) * rate;
694
// Translate the size into number of packets.
695
uint64_t sqns = size / tpdu_;
697
// Buffer should be able to hold at least one packet.