1
/* vim:ts=8:sts=8:sw=4:noai:noexpandtab
3
* ććŖć³ PGM receiver
5
* Copyright (c) 2006-2010 Miru Limited.
7
* This library is free software; you can redistribute it and/or
8
* modify it under the terms of the GNU Lesser General Public
9
* License as published by the Free Software Foundation; either
10
* version 2.1 of the License, or (at your option) any later version.
12
* This library is distributed in the hope that it will be useful,
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
* Lesser General Public License for more details.
17
* You should have received a copy of the GNU Lesser General Public
18
* License along with this library; if not, write to the Free Software
19
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23
#define _CRT_SECURE_NO_WARNINGS 1
47
static const char* network = "";
48
static bool use_multicast_loop = FALSE;
49
static int udp_encap_port = 0;
51
static int max_tpdu = 1500;
52
static int sqns = 100;
54
static bool use_pgmcc = FALSE;
55
static bool use_fec = FALSE;
57
static int rs_n = 255;
59
static pgm_sock_t* sock = NULL;
60
static bool is_terminated = FALSE;
63
static int terminate_pipe[2];
64
static void on_signal (int);
66
static WSAEVENT terminateEvent;
67
static BOOL on_console_ctrl (DWORD);
70
static void usage (const char*) __attribute__((__noreturn__));
72
static void usage (const char*);
75
static bool on_startup (void);
76
static int on_data (const void*restrict, const size_t, const struct pgm_sockaddr_t*restrict);
84
fprintf (stderr, "Usage: %s [options]\n", bin);
85
fprintf (stderr, " -n <network> : Multicast group or unicast IP address\n");
86
fprintf (stderr, " -s <port> : IP port\n");
87
fprintf (stderr, " -p <port> : Encapsulate PGM in UDP on IP port\n");
88
fprintf (stderr, " -c : Enable PGMCC\n");
89
fprintf (stderr, " -f <type> : Enable FEC with either proactive or ondemand parity\n");
90
fprintf (stderr, " -K <k> : Configure Reed-Solomon code (n, k)\n");
91
fprintf (stderr, " -N <n>\n");
92
fprintf (stderr, " -l : Enable multicast loopback and address sharing\n");
93
fprintf (stderr, " -i : List available interfaces\n");
103
pgm_error_t* pgm_err = NULL;
105
setlocale (LC_ALL, "");
108
puts ("ććŖć³ ććŖć³");
110
/* Windows consoles have incredibly limited Unicode support */
111
puts ("purin purin");
113
if (!pgm_init (&pgm_err)) {
114
fprintf (stderr, "Unable to start PGM engine: %s\n", pgm_err->message);
115
pgm_error_free (pgm_err);
119
/* parse program arguments */
121
const char* binary_name = strrchr (argv[0], '\\');
123
const char* binary_name = strrchr (argv[0], '/');
125
if (NULL == binary_name) binary_name = argv[0];
129
while ((c = getopt (argc, argv, "s:n:p:cf:K:N:lih")) != -1)
132
case 'n': network = optarg; break;
133
case 's': port = atoi (optarg); break;
134
case 'p': udp_encap_port = atoi (optarg); break;
135
case 'c': use_pgmcc = TRUE; break;
136
case 'f': use_fec = TRUE; break;
137
case 'K': rs_k = atoi (optarg); break;
138
case 'N': rs_n = atoi (optarg); break;
139
case 'l': use_multicast_loop = TRUE; break;
146
case '?': usage (binary_name);
150
if (use_fec && ( !rs_n || !rs_k )) {
151
fprintf (stderr, "Invalid Reed-Solomon parameters RS(%d,%d).\n", rs_n, rs_k);
155
/* setup signal handlers */
157
signal (SIGHUP, SIG_IGN);
160
int e = pipe (terminate_pipe);
162
signal (SIGINT, on_signal);
163
signal (SIGTERM, on_signal);
165
terminateEvent = WSACreateEvent();
166
SetConsoleCtrlHandler ((PHANDLER_ROUTINE)on_console_ctrl, TRUE);
167
setvbuf (stdout, (char *) NULL, _IONBF, 0);
171
fprintf (stderr, "Startup failed\n");
180
SOCKET recv_sock, pending_sock;
181
DWORD cEvents = PGM_RECV_SOCKET_READ_COUNT + 1;
182
WSAEVENT waitEvents[ PGM_RECV_SOCKET_READ_COUNT + 1 ];
183
socklen_t socklen = sizeof (SOCKET);
185
waitEvents[0] = terminateEvent;
186
waitEvents[1] = WSACreateEvent();
187
waitEvents[2] = WSACreateEvent();
188
assert (2 == PGM_RECV_SOCKET_READ_COUNT);
189
pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, &recv_sock, &socklen);
190
WSAEventSelect (recv_sock, waitEvents[1], FD_READ);
191
pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, &pending_sock, &socklen);
192
WSAEventSelect (pending_sock, waitEvents[2], FD_READ);
194
puts ("Entering PGM message loop ... ");
198
DWORD dwTimeout, dwEvents;
202
struct pgm_sockaddr_t from;
203
socklen_t fromlen = sizeof (from);
204
const int status = pgm_recvfrom (sock,
213
case PGM_IO_STATUS_NORMAL:
214
on_data (buffer, len, &from);
216
case PGM_IO_STATUS_TIMER_PENDING:
218
socklen_t optlen = sizeof (tv);
219
pgm_getsockopt (sock, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &optlen);
222
case PGM_IO_STATUS_RATE_LIMITED:
224
socklen_t optlen = sizeof (tv);
225
pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen);
227
case PGM_IO_STATUS_WOULD_BLOCK:
228
/* select for next event */
231
fds = terminate_pipe[0] + 1;
233
FD_SET(terminate_pipe[0], &readfds);
234
pgm_select_info (sock, &readfds, NULL, &fds);
235
fds = select (fds, &readfds, NULL, NULL, PGM_IO_STATUS_WOULD_BLOCK == status ? NULL : &tv);
237
dwTimeout = PGM_IO_STATUS_WOULD_BLOCK == status ? WSA_INFINITE : (DWORD)((tv.tv_sec * 1000) + (tv.tv_usec / 1000));
238
dwEvents = WSAWaitForMultipleEvents (cEvents, waitEvents, FALSE, dwTimeout, FALSE);
240
case WSA_WAIT_EVENT_0+1: WSAResetEvent (waitEvents[1]); break;
241
case WSA_WAIT_EVENT_0+2: WSAResetEvent (waitEvents[2]); break;
249
fprintf (stderr, "%s\n", pgm_err->message);
250
pgm_error_free (pgm_err);
253
if (PGM_IO_STATUS_ERROR == status)
256
} while (!is_terminated);
258
puts ("Message loop terminated, cleaning up.");
262
close (terminate_pipe[0]);
263
close (terminate_pipe[1]);
265
WSACloseEvent (waitEvents[0]);
266
WSACloseEvent (waitEvents[1]);
267
WSACloseEvent (waitEvents[2]);
271
puts ("Destroying PGM socket.");
272
pgm_close (sock, TRUE);
276
puts ("PGM engine shutdown.");
289
printf ("on_signal (signum:%d)\n", signum);
290
is_terminated = TRUE;
291
const char one = '1';
292
const size_t writelen = write (terminate_pipe[1], &one, sizeof(one));
293
assert (sizeof(one) == writelen);
302
printf ("on_console_ctrl (dwCtrlType:%lu)\n", (unsigned long)dwCtrlType);
303
is_terminated = TRUE;
304
WSASetEvent (terminateEvent);
313
struct pgm_addrinfo_t* res = NULL;
314
pgm_error_t* pgm_err = NULL;
315
sa_family_t sa_family = AF_UNSPEC;
317
/* parse network parameter into PGM socket address structure */
318
if (!pgm_getaddrinfo (network, NULL, &res, &pgm_err)) {
319
fprintf (stderr, "Parsing network parameter: %s\n", pgm_err->message);
323
sa_family = res->ai_send_addrs[0].gsr_group.ss_family;
325
if (udp_encap_port) {
326
puts ("Create PGM/UDP socket.");
327
if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP, &pgm_err)) {
328
fprintf (stderr, "Creating PGM/UDP socket: %s\n", pgm_err->message);
331
pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, &udp_encap_port, sizeof(udp_encap_port));
332
pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, &udp_encap_port, sizeof(udp_encap_port));
334
puts ("Create PGM/IP socket.");
335
if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM, &pgm_err)) {
336
fprintf (stderr, "Creating PGM/IP socket: %s\n", pgm_err->message);
341
/* Use RFC 2113 tagging for PGM Router Assist */
342
const int no_router_assist = 0;
343
pgm_setsockopt (sock, IPPROTO_PGM, PGM_IP_ROUTER_ALERT, &no_router_assist, sizeof(no_router_assist));
345
pgm_drop_superuser();
347
/* set PGM parameters */
348
const int recv_only = 1,
350
peer_expiry = pgm_secs (300),
351
spmr_expiry = pgm_msecs (250),
352
nak_bo_ivl = pgm_msecs (50),
353
nak_rpt_ivl = pgm_secs (2),
354
nak_rdata_ivl = pgm_secs (2),
355
nak_data_retries = 50,
356
nak_ncf_retries = 50;
358
pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only, sizeof(recv_only));
359
pgm_setsockopt (sock, IPPROTO_PGM, PGM_PASSIVE, &passive, sizeof(passive));
360
pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu, sizeof(max_tpdu));
361
pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &sqns, sizeof(sqns));
362
pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry, sizeof(peer_expiry));
363
pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof(spmr_expiry));
364
pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof(nak_bo_ivl));
365
pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof(nak_rpt_ivl));
366
pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof(nak_rdata_ivl));
367
pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof(nak_data_retries));
368
pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof(nak_ncf_retries));
370
#ifdef I_UNDERSTAND_PGMCC_AND_FEC_ARE_NOT_SUPPORTED
372
struct pgm_pgmccinfo_t pgmccinfo;
373
pgmccinfo.ack_bo_ivl = pgm_msecs (50);
374
pgmccinfo.ack_c = 75;
375
pgmccinfo.ack_c_p = 500;
376
pgm_setsockopt (sock, IPPROTO_PGM, PGM_USE_PGMCC, &pgmccinfo, sizeof(pgmccinfo));
379
struct pgm_fecinfo_t fecinfo;
380
fecinfo.block_size = rs_n;
381
fecinfo.proactive_packets = 0;
382
fecinfo.group_size = rs_k;
383
fecinfo.ondemand_parity_enabled = TRUE;
384
fecinfo.var_pktlen_enabled = FALSE;
385
pgm_setsockopt (sock, IPPROTO_PGM, PGM_USE_FEC, &fecinfo, sizeof(fecinfo));
389
/* create global session identifier */
390
struct pgm_sockaddr_t addr;
391
memset (&addr, 0, sizeof(addr));
392
addr.sa_port = port ? port : DEFAULT_DATA_DESTINATION_PORT;
393
addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT;
394
if (!pgm_gsi_create_from_hostname (&addr.sa_addr.gsi, &pgm_err)) {
395
fprintf (stderr, "Creating GSI: %s\n", pgm_err->message);
399
/* assign socket to specified address */
400
struct pgm_interface_req_t if_req;
401
memset (&if_req, 0, sizeof(if_req));
402
if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface;
403
if_req.ir_scope_id = 0;
404
if (AF_INET6 == sa_family) {
405
struct sockaddr_in6 sa6;
406
memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof(sa6));
407
if_req.ir_scope_id = sa6.sin6_scope_id;
409
if (!pgm_bind3 (sock,
411
&if_req, sizeof(if_req), /* tx interface */
412
&if_req, sizeof(if_req), /* rx interface */
415
fprintf (stderr, "Binding PGM socket: %s\n", pgm_err->message);
419
/* join IP multicast groups */
420
for (unsigned i = 0; i < res->ai_recv_addrs_len; i++)
421
pgm_setsockopt (sock, IPPROTO_PGM, PGM_JOIN_GROUP, &res->ai_recv_addrs[i], sizeof(struct group_req));
422
pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_GROUP, &res->ai_send_addrs[0], sizeof(struct group_req));
423
pgm_freeaddrinfo (res);
425
/* set IP parameters */
426
const int nonblocking = 1,
427
multicast_loop = use_multicast_loop ? 1 : 0,
429
dscp = 0x2e << 2; /* Expedited Forwarding PHB for network elements, no ECN. */
431
pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP, &multicast_loop, sizeof(multicast_loop));
432
pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS, &multicast_hops, sizeof(multicast_hops));
433
if (AF_INET6 != sa_family)
434
pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS, &dscp, sizeof(dscp));
435
pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking, sizeof(nonblocking));
437
if (!pgm_connect (sock, &pgm_err)) {
438
fprintf (stderr, "Connecting PGM socket: %s\n", pgm_err->message);
442
puts ("Startup complete.");
447
pgm_close (sock, FALSE);
451
pgm_freeaddrinfo (res);
454
if (NULL != pgm_err) {
455
pgm_error_free (pgm_err);
459
pgm_close (sock, FALSE);
468
const void* restrict data,
470
const struct pgm_sockaddr_t* restrict from
473
/* protect against non-null terminated strings */
474
char buf[1024], tsi[PGM_TSISTRLEN];
475
const size_t buflen = MIN(sizeof(buf) - 1, len);
476
#ifndef CONFIG_HAVE_SECURITY_ENHANCED_CRT
477
strncpy (buf, (const char*)data, buflen);
480
strncpy_s (buf, buflen, (const char*)data, _TRUNCATE);
482
pgm_tsi_print_r (&from->sa_addr, tsi, sizeof(tsi));
484
printf ("\"%s\" (%zu bytes from %s)\n",
487
/* Microsoft CRT will crash on %zu */
488
printf ("\"%s\" (%lu bytes from %s)\n",
489
buf, (unsigned long)len, tsi);