~ubuntu-branches/ubuntu/saucy/zeromq3/saucy

« back to all changes in this revision

Viewing changes to src/pgm_sender.cpp

  • Committer: Package Import Robot
  • Author(s): Alessandro Ghedini
  • Date: 2012-06-04 21:21:09 UTC
  • Revision ID: package-import@ubuntu.com-20120604212109-b7b3m0rn21o8oo2q
Tags: upstream-3.1.0~beta+dfsg
ImportĀ upstreamĀ versionĀ 3.1.0~beta+dfsg

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
    Copyright (c) 2009-2011 250bpm s.r.o.
 
3
    Copyright (c) 2007-2009 iMatix Corporation
 
4
    Copyright (c) 2010-2011 Miru Limited
 
5
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
 
6
 
 
7
    This file is part of 0MQ.
 
8
 
 
9
    0MQ is free software; you can redistribute it and/or modify it under
 
10
    the terms of the GNU Lesser General Public License as published by
 
11
    the Free Software Foundation; either version 3 of the License, or
 
12
    (at your option) any later version.
 
13
 
 
14
    0MQ is distributed in the hope that it will be useful,
 
15
    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
16
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
17
    GNU Lesser General Public License for more details.
 
18
 
 
19
    You should have received a copy of the GNU Lesser General Public License
 
20
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
21
*/
 
22
 
 
23
#include "platform.hpp"
 
24
 
 
25
#if defined ZMQ_HAVE_OPENPGM
 
26
 
 
27
#ifdef ZMQ_HAVE_WINDOWS
 
28
#include "windows.hpp"
 
29
#endif
 
30
 
 
31
#include <stdlib.h>
 
32
 
 
33
#include "io_thread.hpp"
 
34
#include "pgm_sender.hpp"
 
35
#include "session_base.hpp"
 
36
#include "err.hpp"
 
37
#include "wire.hpp"
 
38
#include "stdint.hpp"
 
39
 
 
40
zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, 
 
41
      const options_t &options_) :
 
42
    io_object_t (parent_),
 
43
    has_tx_timer (false),
 
44
    has_rx_timer (false),
 
45
    encoder (0),
 
46
    pgm_socket (false, options_),
 
47
    options (options_),
 
48
    out_buffer (NULL),
 
49
    out_buffer_size (0),
 
50
    write_size (0)
 
51
{
 
52
}
 
53
 
 
54
int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
 
55
{
 
56
    int rc = pgm_socket.init (udp_encapsulation_, network_);
 
57
    if (rc != 0)
 
58
        return rc;
 
59
 
 
60
    out_buffer_size = pgm_socket.get_max_tsdu_size ();
 
61
    out_buffer = (unsigned char*) malloc (out_buffer_size);
 
62
    alloc_assert (out_buffer);
 
63
 
 
64
    return rc;
 
65
}
 
66
 
 
67
void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
 
68
{
 
69
    //  Alocate 2 fds for PGM socket.
 
70
    fd_t downlink_socket_fd = retired_fd;
 
71
    fd_t uplink_socket_fd = retired_fd;
 
72
    fd_t rdata_notify_fd = retired_fd;
 
73
    fd_t pending_notify_fd = retired_fd;
 
74
 
 
75
    encoder.set_session (session_);
 
76
 
 
77
    //  Fill fds from PGM transport and add them to the poller.
 
78
    pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
 
79
        &rdata_notify_fd, &pending_notify_fd);
 
80
 
 
81
    handle = add_fd (downlink_socket_fd);
 
82
    uplink_handle = add_fd (uplink_socket_fd);
 
83
    rdata_notify_handle = add_fd (rdata_notify_fd);   
 
84
    pending_notify_handle = add_fd (pending_notify_fd);
 
85
 
 
86
    //  Set POLLIN. We wont never want to stop polling for uplink = we never
 
87
    //  want to stop porocess NAKs.
 
88
    set_pollin (uplink_handle);
 
89
    set_pollin (rdata_notify_handle);
 
90
    set_pollin (pending_notify_handle);
 
91
 
 
92
    //  Set POLLOUT for downlink_socket_handle.
 
93
    set_pollout (handle);
 
94
 
 
95
    //  PGM is not able to pass subscriptions upstream, thus we have no idea
 
96
    //  what messages are peers interested in. Because of that we have to
 
97
    //  subscribe for all the messages.
 
98
    msg_t msg;
 
99
    msg.init_size (1);
 
100
    *(unsigned char*) msg.data () = 1;
 
101
    bool ok = session_->write (&msg);
 
102
    zmq_assert (ok);
 
103
    session_->flush ();
 
104
}
 
105
 
 
106
void zmq::pgm_sender_t::unplug ()
 
107
{
 
108
    if (has_rx_timer) {
 
109
        cancel_timer (rx_timer_id);
 
110
        has_rx_timer = false;
 
111
    }
 
112
 
 
113
    if (has_tx_timer) {
 
114
        cancel_timer (tx_timer_id);
 
115
        has_tx_timer = false;
 
116
    }
 
117
 
 
118
    rm_fd (handle);
 
119
    rm_fd (uplink_handle);
 
120
    rm_fd (rdata_notify_handle);
 
121
    rm_fd (pending_notify_handle);
 
122
    encoder.set_session (NULL);
 
123
}
 
124
 
 
125
void zmq::pgm_sender_t::terminate ()
 
126
{
 
127
    unplug ();
 
128
    delete this;
 
129
}
 
130
 
 
131
void zmq::pgm_sender_t::activate_out ()
 
132
{
 
133
    set_pollout (handle);
 
134
    out_event ();
 
135
}
 
136
 
 
137
void zmq::pgm_sender_t::activate_in ()
 
138
{
 
139
    zmq_assert (false);
 
140
}
 
141
 
 
142
zmq::pgm_sender_t::~pgm_sender_t ()
 
143
{
 
144
    if (out_buffer) {
 
145
        free (out_buffer);
 
146
        out_buffer = NULL;
 
147
    }
 
148
}
 
149
 
 
150
void zmq::pgm_sender_t::in_event ()
 
151
{
 
152
    if (has_rx_timer) {
 
153
        cancel_timer (rx_timer_id);
 
154
        has_rx_timer = false;
 
155
    }
 
156
 
 
157
    //  In-event on sender side means NAK or SPMR receiving from some peer.
 
158
    pgm_socket.process_upstream ();
 
159
    if (errno == ENOMEM || errno == EBUSY) {
 
160
        const long timeout = pgm_socket.get_rx_timeout ();
 
161
        add_timer (timeout, rx_timer_id);
 
162
        has_rx_timer = true;
 
163
    }
 
164
}
 
165
 
 
166
void zmq::pgm_sender_t::out_event ()
 
167
{
 
168
    //  POLLOUT event from send socket. If write buffer is empty, 
 
169
    //  try to read new data from the encoder.
 
170
    if (write_size == 0) {
 
171
 
 
172
        //  First two bytes (sizeof uint16_t) are used to store message 
 
173
        //  offset in following steps. Note that by passing our buffer to
 
174
        //  the get data function we prevent it from returning its own buffer.
 
175
        unsigned char *bf = out_buffer + sizeof (uint16_t);
 
176
        size_t bfsz = out_buffer_size - sizeof (uint16_t);
 
177
        int offset = -1;
 
178
        encoder.get_data (&bf, &bfsz, &offset);
 
179
 
 
180
        //  If there are no data to write stop polling for output.
 
181
        if (!bfsz) {
 
182
            reset_pollout (handle);
 
183
            return;
 
184
        }
 
185
 
 
186
        //  Put offset information in the buffer.
 
187
        write_size = bfsz + sizeof (uint16_t);
 
188
        put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset);
 
189
    }
 
190
 
 
191
    if (has_tx_timer) {
 
192
        cancel_timer (tx_timer_id);
 
193
        has_tx_timer = false;
 
194
    }
 
195
 
 
196
    //  Send the data.
 
197
    size_t nbytes = pgm_socket.send (out_buffer, write_size);
 
198
 
 
199
    //  We can write either all data or 0 which means rate limit reached.
 
200
    if (nbytes == write_size) {
 
201
        write_size = 0;
 
202
    } else {
 
203
        zmq_assert (nbytes == 0);
 
204
 
 
205
        if (errno == ENOMEM) {
 
206
            const long timeout = pgm_socket.get_tx_timeout ();
 
207
            add_timer (timeout, tx_timer_id);
 
208
            has_tx_timer = true;
 
209
        } else
 
210
            zmq_assert (errno == EBUSY);
 
211
    }
 
212
}
 
213
 
 
214
void zmq::pgm_sender_t::timer_event (int token)
 
215
{
 
216
    //  Timer cancels on return by poller_base.
 
217
    if (token == rx_timer_id) {
 
218
        has_rx_timer = false;
 
219
        in_event ();
 
220
    } else if (token == tx_timer_id) {
 
221
        has_tx_timer = false;
 
222
        out_event ();
 
223
    } else
 
224
        zmq_assert (false);
 
225
}
 
226
 
 
227
#endif
 
228