1
/* vim:ts=8:sts=8:sw=4:noai:noexpandtab
3
* Simple PGM receiver: poll based non-blocking synchronous receiver.
5
* Copyright (c) 2006-2010 Miru Limited.
7
* This library is free software; you can redistribute it and/or
8
* modify it under the terms of the GNU Lesser General Public
9
* License as published by the Free Software Foundation; either
10
* version 2.1 of the License, or (at your option) any later version.
12
* This library is distributed in the hope that it will be useful,
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
* Lesser General Public License for more details.
17
* You should have received a copy of the GNU Lesser General Public
18
* License along with this library; if not, write to the Free Software
19
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
32
#include <sys/types.h>
36
# include <arpa/inet.h>
37
# include <netinet/in.h>
38
# include <sys/socket.h>
42
/* example dependencies */
43
#include <pgm/backtrace.h>
51
static int g_port = 0;
52
static const char* g_network = "";
53
static gboolean g_multicast_loop = FALSE;
54
static int g_udp_encap_port = 0;
56
static int g_max_tpdu = 1500;
57
static int g_sqns = 100;
59
static pgm_sock_t* g_sock = NULL;
60
static gboolean g_quit;
61
static int g_quit_pipe[2];
63
static void on_signal (int);
64
static gboolean on_startup (void);
66
static int on_data (gconstpointer, size_t, struct pgm_sockaddr_t*);
69
G_GNUC_NORETURN static
75
fprintf (stderr, "Usage: %s [options]\n", bin);
76
fprintf (stderr, " -n <network> : Multicast group or unicast IP address\n");
77
fprintf (stderr, " -s <port> : IP port\n");
78
fprintf (stderr, " -p <port> : Encapsulate PGM in UDP on IP port\n");
79
fprintf (stderr, " -l : Enable multicast loopback and address sharing\n");
90
pgm_error_t* pgm_err = NULL;
92
setlocale (LC_ALL, "");
95
g_message ("pnonblocksyncrecv");
97
if (!pgm_init (&pgm_err)) {
98
g_error ("Unable to start PGM engine: %s", pgm_err->message);
99
pgm_error_free (pgm_err);
103
/* parse program arguments */
104
const char* binary_name = strrchr (argv[0], '/');
106
while ((c = getopt (argc, argv, "s:n:p:lh")) != -1)
109
case 'n': g_network = optarg; break;
110
case 's': g_port = atoi (optarg); break;
111
case 'p': g_udp_encap_port = atoi (optarg); break;
112
case 'l': g_multicast_loop = TRUE; break;
115
case '?': usage (binary_name);
120
e = pipe (g_quit_pipe);
123
/* setup signal handlers */
124
signal (SIGSEGV, on_sigsegv);
125
signal (SIGINT, on_signal);
126
signal (SIGTERM, on_signal);
128
signal (SIGHUP, SIG_IGN);
132
g_error ("startup failed");
137
g_message ("entering PGM message loop ... ");
142
struct pollfd fds[ 1 + n_fds ];
145
struct pgm_sockaddr_t from;
146
socklen_t fromlen = sizeof(from);
147
const int status = pgm_recvfrom (g_sock,
156
case PGM_IO_STATUS_NORMAL:
157
on_data (buffer, len, &from);
159
case PGM_IO_STATUS_TIMER_PENDING:
161
socklen_t optlen = sizeof (tv);
162
pgm_getsockopt (g_sock, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &optlen);
165
case PGM_IO_STATUS_RATE_LIMITED:
167
socklen_t optlen = sizeof (tv);
168
pgm_getsockopt (g_sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen);
171
case PGM_IO_STATUS_WOULD_BLOCK:
172
/* poll for next event */
174
timeout = PGM_IO_STATUS_WOULD_BLOCK == status ? -1 : ((tv.tv_sec * 1000) + (tv.tv_usec / 1000));
175
memset (fds, 0, sizeof(fds));
176
fds[0].fd = g_quit_pipe[0];
177
fds[0].events = POLLIN;
178
pgm_poll_info (g_sock, &fds[1], &n_fds, POLLIN);
179
poll (fds, 1 + n_fds, timeout /* ms */);
183
g_warning ("%s", pgm_err->message);
184
pgm_error_free (pgm_err);
187
if (PGM_IO_STATUS_ERROR == status)
192
g_message ("message loop terminated, cleaning up.");
195
close (g_quit_pipe[0]);
196
close (g_quit_pipe[1]);
199
g_message ("closing PGM socket.");
200
pgm_close (g_sock, TRUE);
204
g_message ("PGM engine shutdown.");
206
g_message ("finished.");
216
g_message ("on_signal (signum:%d)", signum);
218
const char one = '1';
219
const size_t writelen = write (g_quit_pipe[1], &one, sizeof(one));
220
g_assert (sizeof(one) == writelen);
227
struct pgm_addrinfo_t* res = NULL;
228
pgm_error_t* pgm_err = NULL;
229
sa_family_t sa_family = AF_UNSPEC;
231
g_message ("startup.");
233
/* parse network parameter into transport address structure */
234
if (!pgm_getaddrinfo (g_network, NULL, &res, &pgm_err)) {
235
g_error ("parsing network parameter: %s", pgm_err->message);
239
sa_family = res->ai_send_addrs[0].gsr_group.ss_family;
241
if (g_udp_encap_port) {
242
g_message ("create PGM/UDP socket.");
243
if (!pgm_socket (&g_sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP, &pgm_err)) {
244
g_error ("socket: %s", pgm_err->message);
247
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, &g_udp_encap_port, sizeof(g_udp_encap_port));
248
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, &g_udp_encap_port, sizeof(g_udp_encap_port));
250
g_message ("create PGM/IP socket.");
251
if (!pgm_socket (&g_sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM, &pgm_err)) {
252
g_error ("socket: %s", pgm_err->message);
257
/* Use RFC 2113 tagging for PGM Router Assist */
258
const int no_router_assist = 0;
259
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_IP_ROUTER_ALERT, &no_router_assist, sizeof(no_router_assist));
261
pgm_drop_superuser();
263
/* set PGM parameters */
264
const int recv_only = 1,
266
peer_expiry = pgm_secs (300),
267
spmr_expiry = pgm_msecs (250),
268
nak_bo_ivl = pgm_msecs (50),
269
nak_rpt_ivl = pgm_secs (2),
270
nak_rdata_ivl = pgm_secs (2),
271
nak_data_retries = 50,
272
nak_ncf_retries = 50;
274
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only, sizeof(recv_only));
275
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_PASSIVE, &passive, sizeof(passive));
276
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_MTU, &g_max_tpdu, sizeof(g_max_tpdu));
277
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_RXW_SQNS, &g_sqns, sizeof(g_sqns));
278
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry, sizeof(peer_expiry));
279
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof(spmr_expiry));
280
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof(nak_bo_ivl));
281
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof(nak_rpt_ivl));
282
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof(nak_rdata_ivl));
283
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof(nak_data_retries));
284
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof(nak_ncf_retries));
286
/* create global session identifier */
287
struct pgm_sockaddr_t addr;
288
memset (&addr, 0, sizeof(addr));
289
addr.sa_port = g_port ? g_port : DEFAULT_DATA_DESTINATION_PORT;
290
addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT;
291
if (!pgm_gsi_create_from_hostname (&addr.sa_addr.gsi, &pgm_err)) {
292
g_error ("creating GSI: %s", pgm_err->message);
296
/* assign socket to specified address */
297
struct pgm_interface_req_t if_req;
298
memset (&if_req, 0, sizeof(if_req));
299
if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface;
300
if_req.ir_scope_id = 0;
301
if (AF_INET6 == sa_family) {
302
struct sockaddr_in6 sa6;
303
memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof(sa6));
304
if_req.ir_scope_id = sa6.sin6_scope_id;
306
if (!pgm_bind3 (g_sock,
308
&if_req, sizeof(if_req), /* tx interface */
309
&if_req, sizeof(if_req), /* rx interface */
312
g_error ("binding PGM socket: %s", pgm_err->message);
316
/* join IP multicast groups */
317
for (unsigned i = 0; i < res->ai_recv_addrs_len; i++)
318
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_JOIN_GROUP, &res->ai_recv_addrs[i], sizeof(struct group_req));
319
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_SEND_GROUP, &res->ai_send_addrs[0], sizeof(struct group_req));
320
pgm_freeaddrinfo (res);
322
/* set IP parameters */
323
const int nonblocking = 1,
324
multicast_loop = g_multicast_loop ? 1 : 0,
326
dscp = 0x2e << 2; /* Expedited Forwarding PHB for network elements, no ECN. */
328
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_MULTICAST_LOOP, &multicast_loop, sizeof(multicast_loop));
329
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_MULTICAST_HOPS, &multicast_hops, sizeof(multicast_hops));
330
if (AF_INET6 != sa_family)
331
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_TOS, &dscp, sizeof(dscp));
332
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking, sizeof(nonblocking));
334
if (!pgm_connect (g_sock, &pgm_err)) {
335
g_error ("connecting PGM socket: %s", pgm_err->message);
339
g_message ("startup complete.");
343
if (NULL != g_sock) {
344
pgm_close (g_sock, FALSE);
348
pgm_freeaddrinfo (res);
351
if (NULL != pgm_err) {
352
pgm_error_free (pgm_err);
363
struct pgm_sockaddr_t* from
366
/* protect against non-null terminated strings */
367
char buf[1024], tsi[PGM_TSISTRLEN];
368
const size_t buflen = MIN(sizeof(buf) - 1, len);
369
strncpy (buf, (const char*)data, buflen);
371
pgm_tsi_print_r (&from->sa_addr, tsi, sizeof(tsi));
373
g_message ("\"%s\" (%u bytes from %s)",