~ubuntu-branches/ubuntu/precise/libpgm/precise

« back to all changes in this revision

Viewing changes to openpgm/pgm/examples/.svn/text-base/pnonblocksyncrecv.c.svn-base

  • Committer: Bazaar Package Importer
  • Author(s): Gabriel de Perthuis
  • Date: 2011-04-07 16:48:52 UTC
  • Revision ID: james.westby@ubuntu.com-20110407164852-8uamem42ojeptj6l
Tags: upstream-5.1.116~dfsg
ImportĀ upstreamĀ versionĀ 5.1.116~dfsg

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* vim:ts=8:sts=8:sw=4:noai:noexpandtab
 
2
 *
 
3
 * Simple PGM receiver: poll based non-blocking synchronous receiver.
 
4
 *
 
5
 * Copyright (c) 2006-2010 Miru Limited.
 
6
 *
 
7
 * This library is free software; you can redistribute it and/or
 
8
 * modify it under the terms of the GNU Lesser General Public
 
9
 * License as published by the Free Software Foundation; either
 
10
 * version 2.1 of the License, or (at your option) any later version.
 
11
 *
 
12
 * This library is distributed in the hope that it will be useful,
 
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
15
 * Lesser General Public License for more details.
 
16
 *
 
17
 * You should have received a copy of the GNU Lesser General Public
 
18
 * License along with this library; if not, write to the Free Software
 
19
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
20
 */
 
21
 
 
22
#include <errno.h>
 
23
#include <locale.h>
 
24
#include <poll.h>
 
25
#include <signal.h>
 
26
#include <stdio.h>
 
27
#include <stdlib.h>
 
28
#include <string.h>
 
29
#include <time.h>
 
30
#include <unistd.h>
 
31
#include <sys/time.h>
 
32
#include <sys/types.h>
 
33
#include <glib.h>
 
34
#ifdef G_OS_UNIX
 
35
#       include <netdb.h>
 
36
#       include <arpa/inet.h>
 
37
#       include <netinet/in.h>
 
38
#       include <sys/socket.h>
 
39
#endif
 
40
#include <pgm/pgm.h>
 
41
 
 
42
/* example dependencies */
 
43
#include <pgm/backtrace.h>
 
44
#include <pgm/log.h>
 
45
 
 
46
 
 
47
/* typedefs */
 
48
 
 
49
/* globals */
 
50
 
 
51
static int              g_port = 0;
 
52
static const char*      g_network = "";
 
53
static gboolean         g_multicast_loop = FALSE;
 
54
static int              g_udp_encap_port = 0;
 
55
 
 
56
static int              g_max_tpdu = 1500;
 
57
static int              g_sqns = 100;
 
58
 
 
59
static pgm_sock_t*      g_sock = NULL;
 
60
static gboolean         g_quit;
 
61
static int              g_quit_pipe[2];
 
62
 
 
63
static void on_signal (int);
 
64
static gboolean on_startup (void);
 
65
 
 
66
static int on_data (gconstpointer, size_t, struct pgm_sockaddr_t*);
 
67
 
 
68
 
 
69
G_GNUC_NORETURN static
 
70
void
 
71
usage (
 
72
        const char*     bin
 
73
        )
 
74
{
 
75
        fprintf (stderr, "Usage: %s [options]\n", bin);
 
76
        fprintf (stderr, "  -n <network>    : Multicast group or unicast IP address\n");
 
77
        fprintf (stderr, "  -s <port>       : IP port\n");
 
78
        fprintf (stderr, "  -p <port>       : Encapsulate PGM in UDP on IP port\n");
 
79
        fprintf (stderr, "  -l              : Enable multicast loopback and address sharing\n");
 
80
        exit (1);
 
81
}
 
82
 
 
83
int
 
84
main (
 
85
        int             argc,
 
86
        char*           argv[]
 
87
        )
 
88
{
 
89
        int e;
 
90
        pgm_error_t* pgm_err = NULL;
 
91
 
 
92
        setlocale (LC_ALL, "");
 
93
 
 
94
        log_init ();
 
95
        g_message ("pnonblocksyncrecv");
 
96
 
 
97
        if (!pgm_init (&pgm_err)) {
 
98
                g_error ("Unable to start PGM engine: %s", pgm_err->message);
 
99
                pgm_error_free (pgm_err);
 
100
                return EXIT_FAILURE;
 
101
        }
 
102
 
 
103
/* parse program arguments */
 
104
        const char* binary_name = strrchr (argv[0], '/');
 
105
        int c;
 
106
        while ((c = getopt (argc, argv, "s:n:p:lh")) != -1)
 
107
        {
 
108
                switch (c) {
 
109
                case 'n':       g_network = optarg; break;
 
110
                case 's':       g_port = atoi (optarg); break;
 
111
                case 'p':       g_udp_encap_port = atoi (optarg); break;
 
112
                case 'l':       g_multicast_loop = TRUE; break;
 
113
 
 
114
                case 'h':
 
115
                case '?': usage (binary_name);
 
116
                }
 
117
        }
 
118
 
 
119
        g_quit = FALSE;
 
120
        e = pipe (g_quit_pipe);
 
121
        g_assert (0 == e);
 
122
 
 
123
/* setup signal handlers */
 
124
        signal (SIGSEGV, on_sigsegv);
 
125
        signal (SIGINT,  on_signal);
 
126
        signal (SIGTERM, on_signal);
 
127
#ifdef SIGHUP
 
128
        signal (SIGHUP,  SIG_IGN);
 
129
#endif
 
130
 
 
131
        if (!on_startup()) {
 
132
                g_error ("startup failed");
 
133
                exit(1);
 
134
        }
 
135
 
 
136
/* dispatch loop */
 
137
        g_message ("entering PGM message loop ... ");
 
138
        do {
 
139
                struct timeval tv;
 
140
                int timeout;
 
141
                int n_fds = 2;
 
142
                struct pollfd fds[ 1 + n_fds ];
 
143
                char buffer[4096];
 
144
                size_t len;
 
145
                struct pgm_sockaddr_t from;
 
146
                socklen_t fromlen = sizeof(from);
 
147
                const int status = pgm_recvfrom (g_sock,
 
148
                                                 buffer,
 
149
                                                 sizeof(buffer),
 
150
                                                 0,
 
151
                                                 &len,
 
152
                                                 &from,
 
153
                                                 &fromlen,
 
154
                                                 &pgm_err);
 
155
                switch (status) {
 
156
                case PGM_IO_STATUS_NORMAL:
 
157
                        on_data (buffer, len, &from);
 
158
                        break;
 
159
                case PGM_IO_STATUS_TIMER_PENDING:
 
160
                        {
 
161
                                socklen_t optlen = sizeof (tv);
 
162
                                pgm_getsockopt (g_sock, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &optlen);
 
163
                        }
 
164
                        goto block;
 
165
                case PGM_IO_STATUS_RATE_LIMITED:
 
166
                        {
 
167
                                socklen_t optlen = sizeof (tv);
 
168
                                pgm_getsockopt (g_sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen);
 
169
                        }
 
170
/* fall through */
 
171
                case PGM_IO_STATUS_WOULD_BLOCK:
 
172
/* poll for next event */
 
173
block:
 
174
                        timeout = PGM_IO_STATUS_WOULD_BLOCK == status ? -1 : ((tv.tv_sec * 1000) + (tv.tv_usec / 1000));
 
175
                        memset (fds, 0, sizeof(fds));
 
176
                        fds[0].fd = g_quit_pipe[0];
 
177
                        fds[0].events = POLLIN;
 
178
                        pgm_poll_info (g_sock, &fds[1], &n_fds, POLLIN);
 
179
                        poll (fds, 1 + n_fds, timeout /* ms */);
 
180
                        break;
 
181
                default:
 
182
                        if (pgm_err) {
 
183
                                g_warning ("%s", pgm_err->message);
 
184
                                pgm_error_free (pgm_err);
 
185
                                pgm_err = NULL;
 
186
                        }
 
187
                        if (PGM_IO_STATUS_ERROR == status)
 
188
                                break;
 
189
                }
 
190
        } while (!g_quit);
 
191
 
 
192
        g_message ("message loop terminated, cleaning up.");
 
193
 
 
194
/* cleanup */
 
195
        close (g_quit_pipe[0]);
 
196
        close (g_quit_pipe[1]);
 
197
 
 
198
        if (g_sock) {
 
199
                g_message ("closing PGM socket.");
 
200
                pgm_close (g_sock, TRUE);
 
201
                g_sock = NULL;
 
202
        }
 
203
 
 
204
        g_message ("PGM engine shutdown.");
 
205
        pgm_shutdown ();
 
206
        g_message ("finished.");
 
207
        return EXIT_SUCCESS;
 
208
}
 
209
 
 
210
static
 
211
void
 
212
on_signal (
 
213
        int             signum
 
214
        )
 
215
{
 
216
        g_message ("on_signal (signum:%d)", signum);
 
217
        g_quit = TRUE;
 
218
        const char one = '1';
 
219
        const size_t writelen = write (g_quit_pipe[1], &one, sizeof(one));
 
220
        g_assert (sizeof(one) == writelen);
 
221
}
 
222
 
 
223
static
 
224
gboolean
 
225
on_startup (void)
 
226
{
 
227
        struct pgm_addrinfo_t* res = NULL;
 
228
        pgm_error_t* pgm_err = NULL;
 
229
        sa_family_t sa_family = AF_UNSPEC;
 
230
 
 
231
        g_message ("startup.");
 
232
 
 
233
/* parse network parameter into transport address structure */
 
234
        if (!pgm_getaddrinfo (g_network, NULL, &res, &pgm_err)) {
 
235
                g_error ("parsing network parameter: %s", pgm_err->message);
 
236
                goto err_abort;
 
237
        }
 
238
 
 
239
        sa_family = res->ai_send_addrs[0].gsr_group.ss_family;
 
240
 
 
241
        if (g_udp_encap_port) {
 
242
                g_message ("create PGM/UDP socket.");
 
243
                if (!pgm_socket (&g_sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP, &pgm_err)) {
 
244
                        g_error ("socket: %s", pgm_err->message);
 
245
                        goto err_abort;
 
246
                }
 
247
                pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, &g_udp_encap_port, sizeof(g_udp_encap_port));
 
248
                pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, &g_udp_encap_port, sizeof(g_udp_encap_port));
 
249
        } else {
 
250
                g_message ("create PGM/IP socket.");
 
251
                if (!pgm_socket (&g_sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM, &pgm_err)) {
 
252
                        g_error ("socket: %s", pgm_err->message);
 
253
                        goto err_abort;
 
254
                }
 
255
        }
 
256
 
 
257
/* Use RFC 2113 tagging for PGM Router Assist */
 
258
        const int no_router_assist = 0;
 
259
        pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_IP_ROUTER_ALERT, &no_router_assist, sizeof(no_router_assist));
 
260
 
 
261
        pgm_drop_superuser();
 
262
 
 
263
/* set PGM parameters */
 
264
        const int recv_only = 1,
 
265
                  passive = 0,
 
266
                  peer_expiry = pgm_secs (300),
 
267
                  spmr_expiry = pgm_msecs (250),
 
268
                  nak_bo_ivl = pgm_msecs (50),
 
269
                  nak_rpt_ivl = pgm_secs (2),
 
270
                  nak_rdata_ivl = pgm_secs (2),
 
271
                  nak_data_retries = 50,
 
272
                  nak_ncf_retries = 50;
 
273
 
 
274
        pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only, sizeof(recv_only));
 
275
        pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_PASSIVE, &passive, sizeof(passive));
 
276
        pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_MTU, &g_max_tpdu, sizeof(g_max_tpdu));
 
277
        pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_RXW_SQNS, &g_sqns, sizeof(g_sqns));
 
278
        pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry, sizeof(peer_expiry));
 
279
        pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof(spmr_expiry));
 
280
        pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof(nak_bo_ivl));
 
281
        pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof(nak_rpt_ivl));
 
282
        pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof(nak_rdata_ivl));
 
283
        pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof(nak_data_retries));
 
284
        pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof(nak_ncf_retries));
 
285
 
 
286
/* create global session identifier */
 
287
        struct pgm_sockaddr_t addr;
 
288
        memset (&addr, 0, sizeof(addr));
 
289
        addr.sa_port = g_port ? g_port : DEFAULT_DATA_DESTINATION_PORT;
 
290
        addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT;
 
291
        if (!pgm_gsi_create_from_hostname (&addr.sa_addr.gsi, &pgm_err)) {
 
292
                g_error ("creating GSI: %s", pgm_err->message);
 
293
                goto err_abort;
 
294
        }
 
295
 
 
296
/* assign socket to specified address */
 
297
        struct pgm_interface_req_t if_req;
 
298
        memset (&if_req, 0, sizeof(if_req));
 
299
        if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface;
 
300
        if_req.ir_scope_id  = 0;
 
301
        if (AF_INET6 == sa_family) {
 
302
                struct sockaddr_in6 sa6;
 
303
                memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof(sa6));
 
304
                if_req.ir_scope_id = sa6.sin6_scope_id;
 
305
        }
 
306
        if (!pgm_bind3 (g_sock,
 
307
                        &addr, sizeof(addr),
 
308
                        &if_req, sizeof(if_req),        /* tx interface */
 
309
                        &if_req, sizeof(if_req),        /* rx interface */
 
310
                        &pgm_err))
 
311
        {
 
312
                g_error ("binding PGM socket: %s", pgm_err->message);
 
313
                goto err_abort;
 
314
        }
 
315
 
 
316
/* join IP multicast groups */
 
317
        for (unsigned i = 0; i < res->ai_recv_addrs_len; i++)
 
318
                pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_JOIN_GROUP, &res->ai_recv_addrs[i], sizeof(struct group_req));
 
319
        pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_SEND_GROUP, &res->ai_send_addrs[0], sizeof(struct group_req));
 
320
        pgm_freeaddrinfo (res);
 
321
 
 
322
/* set IP parameters */
 
323
        const int nonblocking = 1,
 
324
                  multicast_loop = g_multicast_loop ? 1 : 0,
 
325
                  multicast_hops = 16,
 
326
                  dscp = 0x2e << 2;             /* Expedited Forwarding PHB for network elements, no ECN. */
 
327
 
 
328
        pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_MULTICAST_LOOP, &multicast_loop, sizeof(multicast_loop));
 
329
        pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_MULTICAST_HOPS, &multicast_hops, sizeof(multicast_hops));
 
330
        if (AF_INET6 != sa_family)
 
331
                pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_TOS, &dscp, sizeof(dscp));
 
332
        pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking, sizeof(nonblocking));
 
333
 
 
334
        if (!pgm_connect (g_sock, &pgm_err)) {
 
335
                g_error ("connecting PGM socket: %s", pgm_err->message);
 
336
                goto err_abort;
 
337
        }
 
338
 
 
339
        g_message ("startup complete.");
 
340
        return TRUE;
 
341
 
 
342
err_abort:
 
343
        if (NULL != g_sock) {
 
344
                pgm_close (g_sock, FALSE);
 
345
                g_sock = NULL;
 
346
        }
 
347
        if (NULL != res) {
 
348
                pgm_freeaddrinfo (res);
 
349
                res = NULL;
 
350
        }
 
351
        if (NULL != pgm_err) {
 
352
                pgm_error_free (pgm_err);
 
353
                pgm_err = NULL;
 
354
        }
 
355
        return FALSE;
 
356
}
 
357
 
 
358
static
 
359
int
 
360
on_data (
 
361
        gconstpointer           data,
 
362
        size_t                  len,
 
363
        struct pgm_sockaddr_t*  from
 
364
        )
 
365
{
 
366
/* protect against non-null terminated strings */
 
367
        char buf[1024], tsi[PGM_TSISTRLEN];
 
368
        const size_t buflen = MIN(sizeof(buf) - 1, len);
 
369
        strncpy (buf, (const char*)data, buflen);
 
370
        buf[buflen] = '\0';
 
371
        pgm_tsi_print_r (&from->sa_addr, tsi, sizeof(tsi));
 
372
 
 
373
        g_message ("\"%s\" (%u bytes from %s)",
 
374
                        buf,
 
375
                        (unsigned)len,
 
376
                        tsi);
 
377
 
 
378
        return 0;
 
379
}
 
380
 
 
381
/* eof */