1
/* vim:ts=8:sts=8:sw=4:noai:noexpandtab
3
* Simple receiver using the PGM transport, based on enonblocksyncrecvmsgv :/
5
* Copyright (c) 2006-2010 Miru Limited.
7
* This library is free software; you can redistribute it and/or
8
* modify it under the terms of the GNU Lesser General Public
9
* License as published by the Free Software Foundation; either
10
* version 2.1 of the License, or (at your option) any later version.
12
* This library is distributed in the hope that it will be useful,
13
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
* Lesser General Public License for more details.
17
* You should have received a copy of the GNU Lesser General Public
18
* License along with this library; if not, write to the Free Software
19
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
29
#ifdef CONFIG_HAVE_EPOLL
30
# include <sys/epoll.h>
32
#include <sys/types.h>
36
# include <arpa/inet.h>
37
# include <netinet/in.h>
39
# include <sys/socket.h>
41
# include <sys/time.h>
46
#ifdef CONFIG_WITH_HTTP
47
# include <pgm/http.h>
49
#ifdef CONFIG_WITH_SNMP
50
# include <pgm/snmp.h>
53
/* example dependencies */
54
#include <pgm/backtrace.h>
56
#include <pgm/signal.h>
61
static int g_port = 0;
62
static const char* g_network = "";
63
static const char* g_source = "";
64
static gboolean g_multicast_loop = FALSE;
65
static int g_udp_encap_port = 0;
67
static int g_max_tpdu = 1500;
68
static int g_sqns = 100;
70
static pgm_sock_t* g_sock = NULL;
71
static GThread* g_thread = NULL;
72
static GMainLoop* g_loop = NULL;
73
static gboolean g_quit;
75
static int g_quit_pipe[2];
76
static void on_signal (int, gpointer);
78
static HANDLE g_quit_event;
79
static BOOL on_console_ctrl (DWORD);
82
static gboolean on_startup (gpointer);
83
static gboolean on_mark (gpointer);
85
static gpointer receiver_thread (gpointer);
86
static int on_msgv (struct pgm_msgv_t*, size_t);
89
G_GNUC_NORETURN static void
94
fprintf (stderr, "Usage: %s [options]\n", bin);
95
fprintf (stderr, " -n <network> : Multicast group or unicast IP address\n");
96
fprintf (stderr, " -a <ip address> : Source unicast IP address\n");
97
fprintf (stderr, " -s <port> : IP port\n");
98
fprintf (stderr, " -p <port> : Encapsulate PGM in UDP on IP port\n");
99
fprintf (stderr, " -l : Enable multicast loopback and address sharing\n");
100
#ifdef CONFIG_WITH_HTTP
101
fprintf (stderr, " -H : Enable HTTP administrative interface\n");
103
#ifdef CONFIG_WITH_SNMP
104
fprintf (stderr, " -S : Enable SNMP interface\n");
106
fprintf (stderr, " -i : List available interfaces\n");
116
pgm_error_t* pgm_err = NULL;
117
#ifdef CONFIG_WITH_HTTP
118
gboolean enable_http = FALSE;
120
#ifdef CONFIG_WITH_SNMP
121
gboolean enable_snmpx = FALSE;
124
setlocale (LC_ALL, "");
126
/* pre-initialise PGM messages module to add hook for GLib logging */
129
g_message ("pgmrecv");
131
if (!pgm_init (&pgm_err)) {
132
g_error ("Unable to start PGM engine: %s", (pgm_err && pgm_err->message) ? pgm_err->message : "(null)");
133
pgm_error_free (pgm_err);
134
pgm_messages_shutdown();
138
g_thread_init (NULL);
140
/* parse program arguments */
141
const char* binary_name = strrchr (argv[0], '/');
143
while ((c = getopt (argc, argv, "a:s:n:p:lih"
144
#ifdef CONFIG_WITH_HTTP
147
#ifdef CONFIG_WITH_SNMP
153
case 'n': g_network = optarg; break;
154
case 'a': g_source = optarg; break;
155
case 's': g_port = atoi (optarg); break;
156
case 'p': g_udp_encap_port = atoi (optarg); break;
158
case 'l': g_multicast_loop = TRUE; break;
159
#ifdef CONFIG_WITH_HTTP
160
case 'H': enable_http = TRUE; break;
162
#ifdef CONFIG_WITH_SNMP
163
case 'S': enable_snmpx = TRUE; break;
168
pgm_messages_shutdown();
173
pgm_messages_shutdown();
178
#ifdef CONFIG_WITH_HTTP
180
if (!pgm_http_init (PGM_HTTP_DEFAULT_SERVER_PORT, &pgm_err)) {
181
g_error ("Unable to start HTTP interface: %s", pgm_err->message);
182
pgm_error_free (pgm_err);
184
pgm_messages_shutdown();
189
#ifdef CONFIG_WITH_SNMP
191
if (!pgm_snmp_init (&pgm_err)) {
192
g_error ("Unable to start SNMP interface: %s", pgm_err->message);
193
pgm_error_free (pgm_err);
194
#ifdef CONFIG_WITH_HTTP
196
pgm_http_shutdown ();
199
pgm_messages_shutdown();
205
g_loop = g_main_loop_new (NULL, FALSE);
209
/* setup signal handlers */
210
signal (SIGSEGV, on_sigsegv);
212
signal (SIGHUP, SIG_IGN);
215
const int e = pipe (g_quit_pipe);
217
pgm_signal_install (SIGINT, on_signal, g_loop);
218
pgm_signal_install (SIGTERM, on_signal, g_loop);
220
g_quit_event = CreateEvent (NULL, TRUE, FALSE, TEXT("QuitEvent"));
221
SetConsoleCtrlHandler ((PHANDLER_ROUTINE)on_console_ctrl, TRUE);
222
setvbuf (stdout, (char *) NULL, _IONBF, 0);
225
/* delayed startup */
226
g_message ("scheduling startup.");
227
g_timeout_add (0, (GSourceFunc)on_startup, NULL);
230
g_message ("entering main event loop ... ");
231
g_main_loop_run (g_loop);
233
g_message ("event loop terminated, cleaning up.");
238
const char one = '1';
239
const size_t writelen = write (g_quit_pipe[1], &one, sizeof(one));
240
g_assert (sizeof(one) == writelen);
241
g_thread_join (g_thread);
242
close (g_quit_pipe[0]);
243
close (g_quit_pipe[1]);
245
WSASetEvent (g_quit_event);
246
g_thread_join (g_thread);
247
WSACloseEvent (g_quit_event);
250
g_main_loop_unref (g_loop);
254
g_message ("closing PGM socket.");
256
pgm_close (g_sock, TRUE);
260
#ifdef CONFIG_WITH_HTTP
264
#ifdef CONFIG_WITH_SNMP
269
g_message ("PGM engine shutdown.");
271
g_message ("finished.");
272
pgm_messages_shutdown();
284
GMainLoop* loop = (GMainLoop*)user_data;
285
g_message ("on_signal (signum:%d user_data:%p)",
287
g_main_loop_quit (loop);
296
g_message ("on_console_ctrl (dwCtrlType:%lu)", (unsigned long)dwCtrlType);
297
g_main_loop_quit (g_loop);
300
#endif /* !G_OS_UNIX */
305
G_GNUC_UNUSED gpointer data
308
struct pgm_addrinfo_t* res = NULL;
309
pgm_error_t* pgm_err = NULL;
310
sa_family_t sa_family = AF_UNSPEC;
312
g_message ("startup.");
314
/* parse network parameter into transport address structure */
315
if (!pgm_getaddrinfo (g_network, NULL, &res, &pgm_err)) {
316
g_error ("parsing network parameter: %s", pgm_err->message);
320
sa_family = res->ai_send_addrs[0].gsr_group.ss_family;
322
if (g_udp_encap_port) {
323
g_message ("create PGM/UDP socket.");
324
if (!pgm_socket (&g_sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP, &pgm_err)) {
325
g_error ("socket: %s", pgm_err->message);
328
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, &g_udp_encap_port, sizeof(g_udp_encap_port));
329
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, &g_udp_encap_port, sizeof(g_udp_encap_port));
331
g_message ("create PGM/IP socket.");
332
if (!pgm_socket (&g_sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM, &pgm_err)) {
333
g_error ("socket: %s", pgm_err->message);
338
/* Use RFC 2113 tagging for PGM Router Assist */
339
const int no_router_assist = 0;
340
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_IP_ROUTER_ALERT, &no_router_assist, sizeof(no_router_assist));
342
pgm_drop_superuser();
344
/* set PGM parameters */
345
const int recv_only = 1,
347
peer_expiry = pgm_secs (300),
348
spmr_expiry = pgm_msecs (250),
349
nak_bo_ivl = pgm_msecs (50),
350
nak_rpt_ivl = pgm_secs (2),
351
nak_rdata_ivl = pgm_secs (2),
352
nak_data_retries = 50,
353
nak_ncf_retries = 50;
355
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only, sizeof(recv_only));
356
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_PASSIVE, &passive, sizeof(passive));
357
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_MTU, &g_max_tpdu, sizeof(g_max_tpdu));
358
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_RXW_SQNS, &g_sqns, sizeof(g_sqns));
359
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry, sizeof(peer_expiry));
360
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof(spmr_expiry));
361
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof(nak_bo_ivl));
362
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof(nak_rpt_ivl));
363
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof(nak_rdata_ivl));
364
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof(nak_data_retries));
365
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof(nak_ncf_retries));
367
/* create global session identifier */
368
struct pgm_sockaddr_t addr;
369
memset (&addr, 0, sizeof(addr));
370
addr.sa_port = g_port ? g_port : DEFAULT_DATA_DESTINATION_PORT;
371
addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT;
372
if (!pgm_gsi_create_from_hostname (&addr.sa_addr.gsi, &pgm_err)) {
373
g_error ("creating GSI: %s", pgm_err->message);
377
/* assign socket to specified address */
378
struct pgm_interface_req_t if_req;
379
memset (&if_req, 0, sizeof(if_req));
380
if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface;
381
if_req.ir_scope_id = 0;
382
if (AF_INET6 == sa_family) {
383
struct sockaddr_in6 sa6;
384
memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof(sa6));
385
if_req.ir_scope_id = sa6.sin6_scope_id;
387
if (!pgm_bind3 (g_sock,
389
&if_req, sizeof(if_req), /* tx interface */
390
&if_req, sizeof(if_req), /* rx interface */
393
g_error ("binding PGM socket: %s", pgm_err->message);
397
/* join IP multicast groups */
398
for (unsigned i = 0; i < res->ai_recv_addrs_len; i++)
399
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_JOIN_GROUP, &res->ai_recv_addrs[i], sizeof(struct group_req));
400
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_SEND_GROUP, &res->ai_send_addrs[0], sizeof(struct group_req));
401
pgm_freeaddrinfo (res);
403
/* set IP parameters */
404
const int nonblocking = 1,
405
multicast_loop = g_multicast_loop ? 1 : 0,
407
dscp = 0x2e << 2; /* Expedited Forwarding PHB for network elements, no ECN. */
409
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_MULTICAST_LOOP, &multicast_loop, sizeof(multicast_loop));
410
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_MULTICAST_HOPS, &multicast_hops, sizeof(multicast_hops));
411
if (AF_INET6 != sa_family)
412
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_TOS, &dscp, sizeof(dscp));
413
pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking, sizeof(nonblocking));
415
if (!pgm_connect (g_sock, &pgm_err)) {
416
g_error ("connecting PGM socket: %s", pgm_err->message);
420
/* create receiver thread */
421
GError* glib_err = NULL;
422
g_thread = g_thread_create_full (receiver_thread,
427
G_THREAD_PRIORITY_HIGH,
430
g_error ("g_thread_create_full failed errno %i: \"%s\"", glib_err->code, glib_err->message);
431
g_error_free (glib_err);
435
/* period timer to indicate some form of life */
436
// TODO: Gnome 2.14: replace with g_timeout_add_seconds()
437
g_timeout_add(10 * 1000, (GSourceFunc)on_mark, NULL);
439
g_message ("startup complete.");
443
if (NULL != g_sock) {
444
pgm_close (g_sock, FALSE);
448
pgm_freeaddrinfo (res);
451
if (NULL != pgm_err) {
452
pgm_error_free (pgm_err);
455
g_main_loop_quit (g_loop);
459
/* idle log notification
465
G_GNUC_UNUSED gpointer data
468
g_message ("-- MARK --");
478
pgm_sock_t* rx_sock = (pgm_sock_t*)data;
479
const long iov_len = 20;
480
const long ev_len = 1;
481
struct pgm_msgv_t msgv[iov_len];
483
#ifdef CONFIG_HAVE_EPOLL
484
struct epoll_event events[ev_len]; /* wait for maximum 1 event */
485
const int efd = epoll_create (IP_MAX_MEMBERSHIPS);
487
g_error ("epoll_create failed errno %i: \"%s\"", errno, strerror(errno));
488
g_main_loop_quit (g_loop);
492
if (pgm_epoll_ctl (rx_sock, efd, EPOLL_CTL_ADD, EPOLLIN) < 0)
494
g_error ("pgm_epoll_ctl failed errno %i: \"%s\"", errno, strerror(errno));
495
g_main_loop_quit (g_loop);
498
struct epoll_event event;
499
event.events = EPOLLIN;
500
event.data.fd = g_quit_pipe[0];
501
if (epoll_ctl (efd, EPOLL_CTL_ADD, g_quit_pipe[0], &event) < 0)
503
g_error ("epoll_ctl failed errno %i: \"%s\"", errno, strerror(errno));
504
g_main_loop_quit (g_loop);
507
#elif defined(CONFIG_HAVE_POLL)
509
struct pollfd fds[ 1 + n_fds ];
510
#elif defined(G_OS_UNIX) /* HAVE_SELECT */
513
#else /* G_OS_WIN32 */
514
SOCKET recv_sock, pending_sock;
515
DWORD cEvents = PGM_RECV_SOCKET_READ_COUNT + 1;
516
WSAEVENT waitEvents[ PGM_RECV_SOCKET_READ_COUNT + 1 ];
517
socklen_t socklen = sizeof (SOCKET);
519
waitEvents[0] = g_quit_event;
520
waitEvents[1] = WSACreateEvent ();
521
pgm_getsockopt (rx_sock, IPPROTO_PGM, PGM_RECV_SOCK, &recv_sock, &socklen);
522
WSAEventSelect (recv_sock, waitEvents[1], FD_READ);
523
waitEvents[2] = WSACreateEvent ();
524
pgm_getsockopt (rx_sock, IPPROTO_PGM, PGM_PENDING_SOCK, &pending_sock, &socklen);
525
WSAEventSelect (pending_sock, waitEvents[2], FD_READ);
526
#endif /* !CONFIG_HAVE_EPOLL */
533
DWORD dwTimeout, dwEvents;
536
pgm_error_t* pgm_err = NULL;
537
const int status = pgm_recvmsgv (rx_sock,
544
case PGM_IO_STATUS_NORMAL:
547
case PGM_IO_STATUS_TIMER_PENDING:
549
socklen_t optlen = sizeof (tv);
550
pgm_getsockopt (rx_sock, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &optlen);
553
case PGM_IO_STATUS_RATE_LIMITED:
555
socklen_t optlen = sizeof (tv);
556
pgm_getsockopt (rx_sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen);
559
case PGM_IO_STATUS_WOULD_BLOCK:
561
#ifdef CONFIG_HAVE_EPOLL
562
timeout = PGM_IO_STATUS_WOULD_BLOCK == status ? -1 : ((tv.tv_sec * 1000) + (tv.tv_usec / 1000));
563
epoll_wait (efd, events, G_N_ELEMENTS(events), timeout /* ms */);
564
#elif defined(CONFIG_HAVE_POLL)
565
timeout = PGM_IO_STATUS_WOULD_BLOCK == status ? -1 : ((tv.tv_sec * 1000) + (tv.tv_usec / 1000));
566
memset (fds, 0, sizeof(fds));
567
fds[0].fd = g_quit_pipe[0];
568
fds[0].events = POLLIN;
569
pgm_poll_info (rx_sock, &fds[1], &n_fds, POLLIN);
570
poll (fds, 1 + n_fds, timeout /* ms */);
571
#elif defined(G_OS_UNIX) /* HAVE_SELECT */
573
FD_SET(g_quit_pipe[0], &readfds);
574
n_fds = g_quit_pipe[0] + 1;
575
pgm_select_info (rx_sock, &readfds, NULL, &n_fds);
576
select (n_fds, &readfds, NULL, NULL, PGM_IO_STATUS_RATE_LIMITED == status ? &tv : NULL);
577
#else /* G_OS_WIN32 */
578
dwTimeout = PGM_IO_STATUS_WOULD_BLOCK == status ? WSA_INFINITE : ((tv.tv_sec * 1000) + (tv.tv_usec / 1000));
579
dwEvents = WSAWaitForMultipleEvents (cEvents, waitEvents, FALSE, dwTimeout, FALSE);
581
case WSA_WAIT_EVENT_0+1: WSAResetEvent (waitEvents[1]); break;
582
case WSA_WAIT_EVENT_0+2: WSAResetEvent (waitEvents[2]); break;
585
#endif /* !CONFIG_HAVE_EPOLL */
590
g_warning ("%s", pgm_err->message);
591
pgm_error_free (pgm_err);
594
if (PGM_IO_STATUS_ERROR == status)
599
#ifdef CONFIG_HAVE_EPOLL
601
#elif defined(G_OS_WIN32)
602
WSACloseEvent (waitEvents[1]);
603
WSACloseEvent (waitEvents[2]);
604
# if (__STDC_VERSION__ < 199901L)
605
g_free (waitHandles);
614
struct pgm_msgv_t* msgv, /* an array of msgvs */
618
g_message ("(%u bytes)",
624
const struct pgm_sk_buff_t* pskb = msgv[i].msgv_skb[0];
626
for (unsigned j = 0; j < msgv[i].msgv_len; j++)
627
apdu_len += msgv[i].msgv_skb[j]->len;
628
/* truncate to first fragment to make GLib printing happy */
629
char buf[2048], tsi[PGM_TSISTRLEN];
630
const gsize buflen = MIN(sizeof(buf) - 1, pskb->len);
631
strncpy (buf, (const char*)pskb->data, buflen);
633
pgm_tsi_print_r (&pskb->tsi, tsi, sizeof(tsi));
634
if (msgv[i].msgv_len > 1)
635
g_message ("\t%u: \"%s\" ... (%" G_GSIZE_FORMAT " bytes from %s)",
636
i, buf, apdu_len, tsi);
638
g_message ("\t%u: \"%s\" (%" G_GSIZE_FORMAT " bytes from %s)",
639
i, buf, apdu_len, tsi);