~ubuntu-branches/ubuntu/maverick/zeromq/maverick

« back to all changes in this revision

Viewing changes to src/pub.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Adrian von Bidder
  • Date: 2010-03-17 10:43:40 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20100317104340-un1ne0oqe16w8eaq
Tags: 2.0.6beta.dfsg-1
* New upstream version.
  - Source doesn't include non-C/C++ language bindings anymore.
  - New versioning: 2.0.6 is official upstream version which is a beta.
* Repacked orig tar: removed non-free RFC documents (closes: #567513)
* Improved/corrected description and copyright file, added bzip2 build
  dependency.  Thanks to feedback from zeromq mailing list.
* Disable OpenPGM on non-x86 architectures (closes: #567848)

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
18
*/
19
19
 
20
 
#include "../bindings/c/zmq.h"
 
20
#include "../include/zmq.h"
21
21
 
22
22
#include "pub.hpp"
23
23
#include "err.hpp"
25
25
#include "pipe.hpp"
26
26
 
27
27
zmq::pub_t::pub_t (class app_thread_t *parent_) :
28
 
    socket_base_t (parent_)
 
28
    socket_base_t (parent_),
 
29
    stalled_pipe (NULL)
29
30
{
30
31
    options.requires_in = false;
31
32
    options.requires_out = true;
39
40
}
40
41
 
41
42
void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
42
 
    class writer_t *outpipe_)
 
43
    class writer_t *outpipe_, const blob_t &peer_identity_)
43
44
{
44
45
    zmq_assert (!inpipe_);
45
46
    out_pipes.push_back (outpipe_);
53
54
void zmq::pub_t::xdetach_outpipe (class writer_t *pipe_)
54
55
{
55
56
    out_pipes.erase (pipe_);
 
57
    if (pipe_ == stalled_pipe)
 
58
        stalled_pipe = NULL;
56
59
}
57
60
 
58
61
void zmq::pub_t::xkill (class reader_t *pipe_)
65
68
    zmq_assert (false);
66
69
}
67
70
 
 
71
void zmq::pub_t::xrevive (class writer_t *pipe_)
 
72
{
 
73
    zmq_assert (stalled_pipe = pipe_);
 
74
    stalled_pipe = NULL;
 
75
}
 
76
 
68
77
int zmq::pub_t::xsetsockopt (int option_, const void *optval_,
69
78
    size_t optvallen_)
70
79
{
86
95
    }
87
96
 
88
97
    //  First check whether all pipes are available for writing.
89
 
    for (out_pipes_t::size_type i = 0; i != pipes_count; i++)
90
 
        if (!out_pipes [i]->check_write (zmq_msg_size (msg_))) {
91
 
            errno = EAGAIN;
92
 
            return -1;
93
 
        }
 
98
    if (!check_write ()) {
 
99
        errno = EAGAIN;
 
100
        return -1;
 
101
    }
94
102
 
95
103
    msg_content_t *content = (msg_content_t*) msg_->content;
96
104
 
97
105
    //  For VSMs the copying is straighforward.
98
106
    if (content == (msg_content_t*) ZMQ_VSM) {
99
107
        for (out_pipes_t::size_type i = 0; i != pipes_count; i++) {
100
 
            out_pipes [i]->write (msg_);
101
 
            if (!(flags_ & ZMQ_NOFLUSH))
102
 
                out_pipes [i]->flush ();
 
108
            bool written = out_pipes [i]->write (msg_);
 
109
            zmq_assert (written);
 
110
            out_pipes [i]->flush ();
103
111
        }
104
112
        int rc = zmq_msg_init (msg_);
105
113
        zmq_assert (rc == 0);
110
118
    //  to send the message to - no refcount adjustment i.e. no atomic
111
119
    //  operations are needed.
112
120
    if (pipes_count == 1) {
113
 
        out_pipes [0]->write (msg_);
114
 
        if (!(flags_ & ZMQ_NOFLUSH))
115
 
            out_pipes [0]->flush ();
 
121
        bool written = out_pipes [0]->write (msg_);
 
122
        zmq_assert (written);
 
123
        out_pipes [0]->flush ();
116
124
        int rc = zmq_msg_init (msg_);
117
125
        zmq_assert (rc == 0);
118
126
        return 0;
121
129
    //  There are at least 2 destinations for the message. That means we have
122
130
    //  to deal with reference counting. First add N-1 references to
123
131
    //  the content (we are holding one reference anyway, that's why -1).
124
 
    if (msg_->shared)
 
132
    if (msg_->flags & ZMQ_MSG_SHARED)
125
133
        content->refcnt.add (pipes_count - 1);
126
134
    else {
127
135
        content->refcnt.set (pipes_count);
128
 
        msg_->shared = true;
 
136
        msg_->flags |= ZMQ_MSG_SHARED;
129
137
    }
130
138
 
131
139
    //  Push the message to all destinations.
132
140
    for (out_pipes_t::size_type i = 0; i != pipes_count; i++) {
133
 
        out_pipes [i]->write (msg_);
134
 
        if (!(flags_ & ZMQ_NOFLUSH))
135
 
            out_pipes [i]->flush ();
 
141
        bool written = out_pipes [i]->write (msg_);
 
142
        zmq_assert (written);
 
143
        out_pipes [i]->flush ();
136
144
    }
137
145
 
138
146
    //  Detach the original message from the data buffer.
142
150
    return 0;
143
151
}
144
152
 
145
 
int zmq::pub_t::xflush ()
146
 
{
147
 
    out_pipes_t::size_type pipe_count = out_pipes.size ();
148
 
    for (out_pipes_t::size_type i = 0; i != pipe_count; i++)
149
 
        out_pipes [i]->flush ();
150
 
    return 0;
151
 
}
152
 
 
153
153
int zmq::pub_t::xrecv (zmq_msg_t *msg_, int flags_)
154
154
{
155
155
    errno = ENOTSUP;
163
163
 
164
164
bool zmq::pub_t::xhas_out ()
165
165
{
166
 
    //  TODO: Reimplement when queue limits are added.
 
166
    return check_write ();
 
167
}
 
168
 
 
169
bool zmq::pub_t::check_write ()
 
170
{
 
171
    if (stalled_pipe != NULL)
 
172
        return false;
 
173
 
 
174
    out_pipes_t::size_type pipes_num = out_pipes.size ();
 
175
    for (out_pipes_t::size_type i = 0; i < pipes_num; i++) {
 
176
        if (!out_pipes [i]->check_write ()) {
 
177
            stalled_pipe = out_pipes [i];
 
178
            return false;
 
179
        }
 
180
    }
 
181
 
167
182
    return true;
168
183
}
169
184