~ubuntu-branches/ubuntu/saucy/zeromq3/saucy

« back to all changes in this revision

Viewing changes to .pc/01_fix-unused-variable-error.patch/src/pgm_socket.cpp

  • Committer: Package Import Robot
  • Author(s): Alessandro Ghedini
  • Date: 2012-10-16 19:49:30 UTC
  • mfrom: (1.1.2)
  • Revision ID: package-import@ubuntu.com-20121016194930-98r0bi746eoaa4iv
Tags: 3.2.1~rc2+dfsg-1
* New upstream RC release (Closes: #690704)
* Bump Standards-Version to 3.9.4 (no changes needed)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*
2
 
    Copyright (c) 2009-2011 250bpm s.r.o.
3
 
    Copyright (c) 2007-2009 iMatix Corporation
4
 
    Copyright (c) 2010-2011 Miru Limited
5
 
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
6
 
 
7
 
    This file is part of 0MQ.
8
 
 
9
 
    0MQ is free software; you can redistribute it and/or modify it under
10
 
    the terms of the GNU Lesser General Public License as published by
11
 
    the Free Software Foundation; either version 3 of the License, or
12
 
    (at your option) any later version.
13
 
 
14
 
    0MQ is distributed in the hope that it will be useful,
15
 
    but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
 
    GNU Lesser General Public License for more details.
18
 
 
19
 
    You should have received a copy of the GNU Lesser General Public License
20
 
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21
 
*/
22
 
 
23
 
#include "platform.hpp"
24
 
 
25
 
#ifdef ZMQ_HAVE_OPENPGM
26
 
 
27
 
#ifdef ZMQ_HAVE_WINDOWS
28
 
#include "windows.hpp"
29
 
#endif
30
 
 
31
 
#ifdef ZMQ_HAVE_LINUX
32
 
#include <poll.h>
33
 
#endif
34
 
 
35
 
#include <stdlib.h>
36
 
#include <string.h>
37
 
#include <string>
38
 
 
39
 
#include "options.hpp"
40
 
#include "pgm_socket.hpp"
41
 
#include "config.hpp"
42
 
#include "err.hpp"
43
 
#include "random.hpp"
44
 
#include "stdint.hpp"
45
 
 
46
 
#ifndef MSG_ERRQUEUE
47
 
#define MSG_ERRQUEUE 0x2000
48
 
#endif
49
 
 
50
 
zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :
51
 
    sock (NULL),
52
 
    options (options_),
53
 
    receiver (receiver_),
54
 
    pgm_msgv (NULL),
55
 
    pgm_msgv_len (0),
56
 
    nbytes_rec (0),
57
 
    nbytes_processed (0),
58
 
    pgm_msgv_processed (0)
59
 
{
60
 
}
61
 
 
62
 
//  Resolve PGM socket address.
63
 
//  network_ of the form <interface & multicast group decls>:<IP port>
64
 
//  e.g. eth0;239.192.0.1:7500
65
 
//       link-local;224.250.0.1,224.250.0.2;224.250.0.3:8000
66
 
//       ;[fe80::1%en0]:7500
67
 
int zmq::pgm_socket_t::init_address (const char *network_,
68
 
    struct pgm_addrinfo_t **addr, uint16_t *port_number)
69
 
{
70
 
    //  Parse port number, start from end for IPv6
71
 
    const char *port_delim = strrchr (network_, ':');
72
 
    if (!port_delim) {
73
 
        errno = EINVAL;
74
 
        return -1;
75
 
    }
76
 
 
77
 
    *port_number = atoi (port_delim + 1);
78
 
  
79
 
    char network [256];
80
 
    if (port_delim - network_ >= (int) sizeof (network) - 1) {
81
 
        errno = EINVAL;
82
 
        return -1;
83
 
    }
84
 
    memset (network, '\0', sizeof (network));
85
 
    memcpy (network, network_, port_delim - network_);
86
 
 
87
 
    pgm_error_t *pgm_error = NULL;
88
 
    struct pgm_addrinfo_t hints, *res = NULL;
89
 
    sa_family_t sa_family;
90
 
 
91
 
    memset (&hints, 0, sizeof (hints));
92
 
    hints.ai_family = AF_UNSPEC;
93
 
    if (!pgm_getaddrinfo (network, NULL, &res, &pgm_error)) {
94
 
 
95
 
        //  Invalid parameters don't set pgm_error_t.
96
 
        zmq_assert (pgm_error != NULL);
97
 
        if (pgm_error->domain == PGM_ERROR_DOMAIN_IF &&
98
 
 
99
 
              //  NB: cannot catch EAI_BADFLAGS.
100
 
            ( pgm_error->code != PGM_ERROR_SERVICE &&
101
 
              pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT)) {
102
 
 
103
 
            //  User, host, or network configuration or transient error.
104
 
            pgm_error_free (pgm_error);
105
 
            errno = EINVAL;
106
 
            return -1;
107
 
        }
108
 
 
109
 
        //  Fatal OpenPGM internal error.
110
 
        zmq_assert (false);
111
 
    }
112
 
    return 0;
113
 
}
114
 
 
115
 
//  Create, bind and connect PGM socket.
116
 
int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
117
 
{
118
 
    //  Can not open transport before destroying old one.
119
 
    zmq_assert (sock == NULL);
120
 
    zmq_assert (options.rate > 0);
121
 
 
122
 
    //  Zero counter used in msgrecv.
123
 
    nbytes_rec = 0;
124
 
    nbytes_processed = 0;
125
 
    pgm_msgv_processed = 0;
126
 
 
127
 
    uint16_t port_number;
128
 
    struct pgm_addrinfo_t *res = NULL;
129
 
    sa_family_t sa_family;
130
 
 
131
 
    pgm_error_t *pgm_error = NULL;
132
 
 
133
 
    if (init_address(network_, &res, &port_number) < 0) {
134
 
        goto err_abort;
135
 
    }
136
 
 
137
 
    zmq_assert (res != NULL);
138
 
 
139
 
    //  Pick up detected IP family.
140
 
    sa_family = res->ai_send_addrs[0].gsr_group.ss_family;
141
 
 
142
 
    //  Create IP/PGM or UDP/PGM socket.
143
 
    if (udp_encapsulation_) {
144
 
        if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP,
145
 
              &pgm_error)) {
146
 
 
147
 
            //  Invalid parameters don't set pgm_error_t.
148
 
            zmq_assert (pgm_error != NULL);
149
 
            if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
150
 
                  pgm_error->code != PGM_ERROR_BADF &&
151
 
                  pgm_error->code != PGM_ERROR_FAULT &&
152
 
                  pgm_error->code != PGM_ERROR_NOPROTOOPT &&
153
 
                  pgm_error->code != PGM_ERROR_FAILED))
154
 
 
155
 
                //  User, host, or network configuration or transient error.
156
 
                goto err_abort;
157
 
 
158
 
            //  Fatal OpenPGM internal error.
159
 
            zmq_assert (false);
160
 
        }
161
 
 
162
 
        //  All options are of data type int
163
 
        const int encapsulation_port = port_number;
164
 
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT,
165
 
                &encapsulation_port, sizeof (encapsulation_port)))
166
 
            goto err_abort;
167
 
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT,
168
 
                &encapsulation_port, sizeof (encapsulation_port)))
169
 
            goto err_abort;
170
 
    }
171
 
    else {
172
 
        if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM,
173
 
              &pgm_error)) {
174
 
 
175
 
            //  Invalid parameters don't set pgm_error_t.
176
 
            zmq_assert (pgm_error != NULL);
177
 
            if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
178
 
                  pgm_error->code != PGM_ERROR_BADF &&
179
 
                  pgm_error->code != PGM_ERROR_FAULT &&
180
 
                  pgm_error->code != PGM_ERROR_NOPROTOOPT &&
181
 
                  pgm_error->code != PGM_ERROR_FAILED))
182
 
 
183
 
                //  User, host, or network configuration or transient error.
184
 
                goto err_abort;
185
 
 
186
 
            //  Fatal OpenPGM internal error.
187
 
            zmq_assert (false);
188
 
        }
189
 
    }
190
 
 
191
 
    {
192
 
                const int rcvbuf = (int) options.rcvbuf;
193
 
                if (rcvbuf) {
194
 
                    if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf,
195
 
                          sizeof (rcvbuf)))
196
 
                        goto err_abort;
197
 
                }
198
 
 
199
 
                const int sndbuf = (int) options.sndbuf;
200
 
                if (sndbuf) {
201
 
                    if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf,
202
 
                          sizeof (sndbuf)))
203
 
                        goto err_abort;
204
 
                }
205
 
 
206
 
                const int max_tpdu = (int) pgm_max_tpdu;
207
 
                if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu,
208
 
                      sizeof (max_tpdu)))
209
 
                    goto err_abort;
210
 
    }
211
 
 
212
 
    if (receiver) {
213
 
        const int recv_only        = 1,
214
 
                  rxw_max_tpdu     = (int) pgm_max_tpdu,
215
 
                  rxw_sqns         = compute_sqns (rxw_max_tpdu),
216
 
                  peer_expiry      = pgm_secs (300),
217
 
                  spmr_expiry      = pgm_msecs (25),
218
 
                  nak_bo_ivl       = pgm_msecs (50),
219
 
                  nak_rpt_ivl      = pgm_msecs (200),
220
 
                  nak_rdata_ivl    = pgm_msecs (200),
221
 
                  nak_data_retries = 50,
222
 
                  nak_ncf_retries  = 50;
223
 
 
224
 
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only,
225
 
                sizeof (recv_only)) ||
226
 
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns,
227
 
                sizeof (rxw_sqns)) ||
228
 
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry,
229
 
                sizeof (peer_expiry)) ||
230
 
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry,
231
 
                sizeof (spmr_expiry)) ||
232
 
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl,
233
 
                sizeof (nak_bo_ivl)) ||
234
 
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl,
235
 
                sizeof (nak_rpt_ivl)) ||
236
 
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL,
237
 
                &nak_rdata_ivl, sizeof (nak_rdata_ivl)) ||
238
 
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES,
239
 
                &nak_data_retries, sizeof (nak_data_retries)) ||
240
 
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES,
241
 
                &nak_ncf_retries, sizeof (nak_ncf_retries)))
242
 
            goto err_abort;
243
 
    } else {
244
 
        const int send_only        = 1,
245
 
                  max_rte      = (int) ((options.rate * 1000) / 8),
246
 
                  txw_max_tpdu     = (int) pgm_max_tpdu,
247
 
                  txw_sqns         = compute_sqns (txw_max_tpdu),
248
 
                  ambient_spm      = pgm_secs (30),
249
 
                  heartbeat_spm[]  = { pgm_msecs (100),
250
 
                                       pgm_msecs (100),
251
 
                                       pgm_msecs (100),
252
 
                                       pgm_msecs (100),
253
 
                                       pgm_msecs (1300),
254
 
                                       pgm_secs  (7),
255
 
                                       pgm_secs  (16),
256
 
                                       pgm_secs  (25),
257
 
                                       pgm_secs  (30) };
258
 
 
259
 
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY,
260
 
                &send_only, sizeof (send_only)) ||
261
 
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_ODATA_MAX_RTE,
262
 
                &max_rte, sizeof (max_rte)) ||
263
 
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS,
264
 
                &txw_sqns, sizeof (txw_sqns)) ||
265
 
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM,
266
 
                &ambient_spm, sizeof (ambient_spm)) ||
267
 
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM,
268
 
                &heartbeat_spm, sizeof (heartbeat_spm)))
269
 
            goto err_abort;
270
 
    }
271
 
 
272
 
    //  PGM transport GSI.
273
 
    struct pgm_sockaddr_t addr;
274
 
 
275
 
    memset (&addr, 0, sizeof(addr));
276
 
    addr.sa_port = port_number;
277
 
    addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT;
278
 
 
279
 
    //  Create random GSI.
280
 
    uint32_t buf [2];
281
 
    buf [0] = generate_random ();
282
 
    buf [1] = generate_random ();
283
 
    if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, (uint8_t*) buf, 8))
284
 
        goto err_abort;
285
 
 
286
 
 
287
 
    //  Bind a transport to the specified network devices.
288
 
    struct pgm_interface_req_t if_req;
289
 
    memset (&if_req, 0, sizeof(if_req));
290
 
    if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface;
291
 
    if_req.ir_scope_id  = 0;
292
 
    if (AF_INET6 == sa_family) {
293
 
        struct sockaddr_in6 sa6;
294
 
        memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof (sa6));
295
 
        if_req.ir_scope_id = sa6.sin6_scope_id;
296
 
    }
297
 
    if (!pgm_bind3 (sock, &addr, sizeof (addr), &if_req, sizeof (if_req),
298
 
          &if_req, sizeof (if_req), &pgm_error)) {
299
 
 
300
 
        //  Invalid parameters don't set pgm_error_t.
301
 
        zmq_assert (pgm_error != NULL);
302
 
        if ((pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET ||
303
 
             pgm_error->domain == PGM_ERROR_DOMAIN_IF) && (
304
 
             pgm_error->code != PGM_ERROR_INVAL &&
305
 
             pgm_error->code != PGM_ERROR_BADF &&
306
 
             pgm_error->code != PGM_ERROR_FAULT))
307
 
 
308
 
            //  User, host, or network configuration or transient error.
309
 
            goto err_abort;
310
 
 
311
 
        //  Fatal OpenPGM internal error.
312
 
        zmq_assert (false);
313
 
    }
314
 
 
315
 
    //  Join IP multicast groups.
316
 
    for (unsigned i = 0; i < res->ai_recv_addrs_len; i++) {
317
 
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_JOIN_GROUP,
318
 
              &res->ai_recv_addrs [i], sizeof (struct group_req)))
319
 
            goto err_abort;
320
 
    }
321
 
    if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_GROUP,
322
 
          &res->ai_send_addrs [0], sizeof (struct group_req)))
323
 
        goto err_abort;
324
 
 
325
 
    pgm_freeaddrinfo (res);
326
 
    res = NULL;
327
 
 
328
 
    //  Set IP level parameters.
329
 
    {
330
 
                const int multicast_loop = 0;
331
 
                if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP,
332
 
                      &multicast_loop, sizeof (multicast_loop)))
333
 
                    goto err_abort;
334
 
 
335
 
                const int multicast_hops = options.multicast_hops;
336
 
                if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS,
337
 
                        &multicast_hops, sizeof (multicast_hops)))
338
 
                    goto err_abort;
339
 
 
340
 
                //  Expedited Forwarding PHB for network elements, no ECN.
341
 
                const int dscp = 0x2e << 2; 
342
 
                if (AF_INET6 != sa_family && !pgm_setsockopt (sock,
343
 
                      IPPROTO_PGM, PGM_TOS, &dscp, sizeof (dscp)))
344
 
                    goto err_abort;
345
 
 
346
 
                const int nonblocking = 1;
347
 
                if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK,
348
 
                      &nonblocking, sizeof (nonblocking)))
349
 
                    goto err_abort;
350
 
    }
351
 
 
352
 
    //  Connect PGM transport to start state machine.
353
 
    if (!pgm_connect (sock, &pgm_error)) {
354
 
 
355
 
        //  Invalid parameters don't set pgm_error_t.
356
 
        zmq_assert (pgm_error != NULL);
357
 
        goto err_abort;
358
 
    }
359
 
 
360
 
    //  For receiver transport preallocate pgm_msgv array.
361
 
    if (receiver) {
362
 
        zmq_assert (in_batch_size > 0);
363
 
        size_t max_tsdu_size = get_max_tsdu_size ();
364
 
        pgm_msgv_len = (int) in_batch_size / max_tsdu_size;
365
 
        if ((int) in_batch_size % max_tsdu_size)
366
 
            pgm_msgv_len++;
367
 
        zmq_assert (pgm_msgv_len);
368
 
 
369
 
        pgm_msgv = (pgm_msgv_t*) malloc (sizeof (pgm_msgv_t) * pgm_msgv_len);
370
 
        alloc_assert (pgm_msgv);
371
 
    }
372
 
 
373
 
    return 0;
374
 
 
375
 
err_abort:
376
 
    if (sock != NULL) {
377
 
        pgm_close (sock, FALSE);
378
 
        sock = NULL;
379
 
    }
380
 
    if (res != NULL) {
381
 
        pgm_freeaddrinfo (res);
382
 
        res = NULL;
383
 
    }
384
 
    if (pgm_error != NULL) {
385
 
        pgm_error_free (pgm_error);
386
 
        pgm_error = NULL;
387
 
    }
388
 
    errno = EINVAL;
389
 
    return -1;
390
 
}
391
 
 
392
 
zmq::pgm_socket_t::~pgm_socket_t ()
393
 
{
394
 
    if (pgm_msgv)
395
 
        free (pgm_msgv);
396
 
    if (sock) 
397
 
        pgm_close (sock, TRUE);
398
 
}
399
 
 
400
 
//  Get receiver fds. receive_fd_ is signaled for incoming packets,
401
 
//  waiting_pipe_fd_ is signaled for state driven events and data.
402
 
void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_, 
403
 
    fd_t *waiting_pipe_fd_)
404
 
{
405
 
    socklen_t socklen;
406
 
    bool rc;
407
 
 
408
 
    zmq_assert (receive_fd_);
409
 
    zmq_assert (waiting_pipe_fd_);
410
 
 
411
 
    socklen = sizeof (*receive_fd_);
412
 
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_,
413
 
        &socklen);
414
 
    zmq_assert (rc);
415
 
    zmq_assert (socklen == sizeof (*receive_fd_));
416
 
 
417
 
    socklen = sizeof (*waiting_pipe_fd_);
418
 
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, waiting_pipe_fd_,
419
 
        &socklen);
420
 
    zmq_assert (rc);
421
 
    zmq_assert (socklen == sizeof (*waiting_pipe_fd_));
422
 
}
423
 
 
424
 
//  Get fds and store them into user allocated memory. 
425
 
//  send_fd is for non-blocking send wire notifications.
426
 
//  receive_fd_ is for incoming back-channel protocol packets.
427
 
//  rdata_notify_fd_ is raised for waiting repair transmissions.
428
 
//  pending_notify_fd_ is for state driven events.
429
 
void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_, 
430
 
    fd_t *rdata_notify_fd_, fd_t *pending_notify_fd_)
431
 
{
432
 
    socklen_t socklen;
433
 
    bool rc;
434
 
 
435
 
    zmq_assert (send_fd_);
436
 
    zmq_assert (receive_fd_);
437
 
    zmq_assert (rdata_notify_fd_);
438
 
    zmq_assert (pending_notify_fd_);
439
 
 
440
 
    socklen = sizeof (*send_fd_);
441
 
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_SEND_SOCK, send_fd_, &socklen);
442
 
    zmq_assert (rc);
443
 
    zmq_assert (socklen == sizeof (*receive_fd_));
444
 
 
445
 
    socklen = sizeof (*receive_fd_);
446
 
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_,
447
 
        &socklen);
448
 
    zmq_assert (rc);
449
 
    zmq_assert (socklen == sizeof (*receive_fd_));
450
 
 
451
 
    socklen = sizeof (*rdata_notify_fd_);
452
 
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_REPAIR_SOCK, rdata_notify_fd_,
453
 
        &socklen);
454
 
    zmq_assert (rc);
455
 
    zmq_assert (socklen == sizeof (*rdata_notify_fd_));
456
 
 
457
 
    socklen = sizeof (*pending_notify_fd_);
458
 
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK,
459
 
        pending_notify_fd_, &socklen);
460
 
    zmq_assert (rc);
461
 
    zmq_assert (socklen == sizeof (*pending_notify_fd_));
462
 
}
463
 
 
464
 
//  Send one APDU, transmit window owned memory.
465
 
//  data_len_ must be less than one TPDU.
466
 
size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
467
 
{
468
 
    size_t nbytes = 0;
469
 
   
470
 
    const int status = pgm_send (sock, data_, data_len_, &nbytes);
471
 
 
472
 
    //  We have to write all data as one packet.
473
 
    if (nbytes > 0) {
474
 
        zmq_assert (status == PGM_IO_STATUS_NORMAL);
475
 
        zmq_assert (nbytes == data_len_);
476
 
    } else {
477
 
        zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED ||
478
 
            status == PGM_IO_STATUS_WOULD_BLOCK);
479
 
 
480
 
        if (status == PGM_IO_STATUS_RATE_LIMITED)
481
 
            errno = ENOMEM;
482
 
        else
483
 
            errno = EBUSY;
484
 
    }
485
 
 
486
 
    //  Save return value.
487
 
    last_tx_status = status;
488
 
 
489
 
    return nbytes;
490
 
}
491
 
 
492
 
long zmq::pgm_socket_t::get_rx_timeout ()
493
 
{
494
 
    if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED &&
495
 
          last_rx_status != PGM_IO_STATUS_TIMER_PENDING)
496
 
        return -1;
497
 
 
498
 
    struct timeval tv;
499
 
    socklen_t optlen = sizeof (tv);
500
 
    const bool rc = pgm_getsockopt (sock, IPPROTO_PGM,
501
 
        last_rx_status == PGM_IO_STATUS_RATE_LIMITED ? PGM_RATE_REMAIN :
502
 
        PGM_TIME_REMAIN, &tv, &optlen);
503
 
    zmq_assert (rc);
504
 
 
505
 
    const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
506
 
 
507
 
    return timeout;
508
 
}
509
 
 
510
 
long zmq::pgm_socket_t::get_tx_timeout ()
511
 
{
512
 
    if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED)
513
 
        return -1;
514
 
 
515
 
    struct timeval tv;
516
 
    socklen_t optlen = sizeof (tv);
517
 
    const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv,
518
 
        &optlen);
519
 
    zmq_assert (rc);
520
 
 
521
 
    const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
522
 
 
523
 
    return timeout;
524
 
}
525
 
 
526
 
//  Return max TSDU size without fragmentation from current PGM transport.
527
 
size_t zmq::pgm_socket_t::get_max_tsdu_size ()
528
 
{
529
 
    int max_tsdu = 0;
530
 
    socklen_t optlen = sizeof (max_tsdu);
531
 
 
532
 
    bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_MSS, &max_tsdu, &optlen);
533
 
    zmq_assert (rc);
534
 
    zmq_assert (optlen == sizeof (max_tsdu));
535
 
    return (size_t) max_tsdu;
536
 
}
537
 
 
538
 
//  pgm_recvmsgv is called to fill the pgm_msgv array up to  pgm_msgv_len.
539
 
//  In subsequent calls data from pgm_msgv structure are returned.
540
 
ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
541
 
{
542
 
    size_t raw_data_len = 0;
543
 
 
544
 
    //  We just sent all data from pgm_transport_recvmsgv up 
545
 
    //  and have to return 0 that another engine in this thread is scheduled.
546
 
    if (nbytes_rec == nbytes_processed && nbytes_rec > 0) {
547
 
 
548
 
        //  Reset all the counters.
549
 
        nbytes_rec = 0;
550
 
        nbytes_processed = 0;
551
 
        pgm_msgv_processed = 0;
552
 
        errno = EAGAIN;
553
 
        return 0;
554
 
    }
555
 
 
556
 
    //  If we have are going first time or if we have processed all pgm_msgv_t
557
 
    //  structure previously read from the pgm socket.
558
 
    if (nbytes_rec == nbytes_processed) {
559
 
 
560
 
        //  Check program flow.
561
 
        zmq_assert (pgm_msgv_processed == 0);
562
 
        zmq_assert (nbytes_processed == 0);
563
 
        zmq_assert (nbytes_rec == 0);
564
 
 
565
 
        //  Receive a vector of Application Protocol Domain Unit's (APDUs) 
566
 
        //  from the transport.
567
 
        pgm_error_t *pgm_error = NULL;
568
 
 
569
 
        const int status = pgm_recvmsgv (sock, pgm_msgv,
570
 
            pgm_msgv_len, MSG_ERRQUEUE, &nbytes_rec, &pgm_error);
571
 
 
572
 
        //  Invalid parameters.
573
 
        zmq_assert (status != PGM_IO_STATUS_ERROR);
574
 
 
575
 
        last_rx_status = status;
576
 
 
577
 
        //  In a case when no ODATA/RDATA fired POLLIN event (SPM...)
578
 
        //  pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING.
579
 
        if (status == PGM_IO_STATUS_TIMER_PENDING) {
580
 
 
581
 
            zmq_assert (nbytes_rec == 0);
582
 
 
583
 
            //  In case if no RDATA/ODATA caused POLLIN 0 is 
584
 
            //  returned.
585
 
            nbytes_rec = 0;
586
 
            errno = EBUSY;
587
 
            return 0;
588
 
        }
589
 
 
590
 
        //  Send SPMR, NAK, ACK is rate limited.
591
 
        if (status == PGM_IO_STATUS_RATE_LIMITED) {
592
 
 
593
 
            zmq_assert (nbytes_rec == 0);
594
 
 
595
 
            //  In case if no RDATA/ODATA caused POLLIN 0 is returned.
596
 
            nbytes_rec = 0;
597
 
            errno = ENOMEM;
598
 
            return 0;
599
 
        }
600
 
 
601
 
        //  No peers and hence no incoming packets.
602
 
        if (status == PGM_IO_STATUS_WOULD_BLOCK) {
603
 
 
604
 
            zmq_assert (nbytes_rec == 0);
605
 
 
606
 
            //  In case if no RDATA/ODATA caused POLLIN 0 is returned.
607
 
            nbytes_rec = 0;
608
 
            errno = EAGAIN;
609
 
            return 0;
610
 
        }
611
 
 
612
 
        //  Data loss.
613
 
        if (status == PGM_IO_STATUS_RESET) {
614
 
 
615
 
            struct pgm_sk_buff_t* skb = pgm_msgv [0].msgv_skb [0];
616
 
 
617
 
            //  Save lost data TSI.
618
 
            *tsi_ = &skb->tsi;
619
 
            nbytes_rec = 0;
620
 
 
621
 
            //  In case of dala loss -1 is returned.
622
 
            errno = EINVAL;
623
 
            pgm_free_skb (skb);
624
 
            return -1;
625
 
        }
626
 
 
627
 
        zmq_assert (status == PGM_IO_STATUS_NORMAL);
628
 
    }
629
 
    else
630
 
    {
631
 
        zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
632
 
    }
633
 
 
634
 
    // Zero byte payloads are valid in PGM, but not 0MQ protocol.
635
 
    zmq_assert (nbytes_rec > 0);
636
 
 
637
 
    // Only one APDU per pgm_msgv_t structure is allowed.
638
 
    zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_len == 1);
639
 
 
640
 
    struct pgm_sk_buff_t* skb = 
641
 
        pgm_msgv [pgm_msgv_processed].msgv_skb [0];
642
 
 
643
 
    //  Take pointers from pgm_msgv_t structure.
644
 
    *raw_data_ = skb->data;
645
 
    raw_data_len = skb->len;
646
 
 
647
 
    //  Save current TSI.
648
 
    *tsi_ = &skb->tsi;
649
 
 
650
 
    //  Move the the next pgm_msgv_t structure.
651
 
    pgm_msgv_processed++;
652
 
    zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
653
 
    nbytes_processed +=raw_data_len;
654
 
 
655
 
    return raw_data_len;
656
 
}
657
 
 
658
 
void zmq::pgm_socket_t::process_upstream ()
659
 
{
660
 
    pgm_msgv_t dummy_msg;
661
 
 
662
 
    size_t dummy_bytes = 0;
663
 
    pgm_error_t *pgm_error = NULL;
664
 
 
665
 
    const int status = pgm_recvmsgv (sock, &dummy_msg,
666
 
        1, MSG_ERRQUEUE, &dummy_bytes, &pgm_error);
667
 
 
668
 
    //  Invalid parameters.
669
 
    zmq_assert (status != PGM_IO_STATUS_ERROR);
670
 
 
671
 
    //  No data should be returned.
672
 
    zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING || 
673
 
        status == PGM_IO_STATUS_RATE_LIMITED ||
674
 
        status == PGM_IO_STATUS_WOULD_BLOCK));
675
 
 
676
 
    last_rx_status = status;
677
 
 
678
 
    if (status == PGM_IO_STATUS_TIMER_PENDING)
679
 
        errno = EBUSY;
680
 
    else if (status == PGM_IO_STATUS_RATE_LIMITED)
681
 
        errno = ENOMEM;
682
 
    else
683
 
        errno = EAGAIN;
684
 
}
685
 
 
686
 
int zmq::pgm_socket_t::compute_sqns (int tpdu_)
687
 
{
688
 
    //  Convert rate into B/ms.
689
 
    uint64_t rate = uint64_t (options.rate) / 8;
690
 
        
691
 
    //  Compute the size of the buffer in bytes.
692
 
    uint64_t size = uint64_t (options.recovery_ivl) * rate;
693
 
 
694
 
    //  Translate the size into number of packets.
695
 
    uint64_t sqns = size / tpdu_;
696
 
 
697
 
    //  Buffer should be able to hold at least one packet.
698
 
    if (sqns == 0)
699
 
        sqns = 1;
700
 
 
701
 
    return (int) sqns;
702
 
}
703
 
 
704
 
#endif
705