2
Copyright (c) 2010-2011 250bpm s.r.o.
3
Copyright (c) 2011 VMware, Inc.
4
Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
6
This file is part of 0MQ.
8
0MQ is free software; you can redistribute it and/or modify it under
9
the terms of the GNU Lesser General Public License as published by
10
the Free Software Foundation; either version 3 of the License, or
11
(at your option) any later version.
13
0MQ is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16
GNU Lesser General Public License for more details.
18
You should have received a copy of the GNU Lesser General Public License
19
along with this program. If not, see <http://www.gnu.org/licenses/>.
29
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) :
30
socket_base_t (parent_, tid_),
33
options.type = ZMQ_XPUB;
36
zmq::xpub_t::~xpub_t ()
40
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_)
45
// The pipe is active when attached. Let's read the subscriptions from
47
xread_activated (pipe_);
50
void zmq::xpub_t::xread_activated (pipe_t *pipe_)
52
// There are some subscriptions waiting. Let's process them.
57
// Grab next subscription.
58
if (!pipe_->read (&sub)) {
63
// Apply the subscription to the trie.
64
unsigned char *data = (unsigned char*) sub.data ();
65
size_t size = sub.size ();
66
zmq_assert (size > 0 && (*data == 0 || *data == 1));
69
unique = subscriptions.rm (data + 1, size - 1, pipe_);
71
unique = subscriptions.add (data + 1, size - 1, pipe_);
73
// If the subscription is not a duplicate store it so that it can be
74
// passed to used on next recv call.
75
if (unique && options.type != ZMQ_PUB)
76
pending.push_back (blob_t ((unsigned char*) sub.data (),
81
void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
83
dist.activated (pipe_);
86
void zmq::xpub_t::xterminated (pipe_t *pipe_)
88
// Remove the pipe from the trie. If there are topics that nobody
89
// is interested in anymore, send corresponding unsubscriptions
91
subscriptions.rm (pipe_, send_unsubscription, this);
93
dist.terminated (pipe_);
96
void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
98
xpub_t *self = (xpub_t*) arg_;
99
self->dist.match (pipe_);
102
int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
104
bool msg_more = msg_->flags () & msg_t::more ? true : false;
106
// For the first part of multi-part message, find the matching pipes.
108
subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
109
mark_as_matching, this);
111
// Send the message to all the pipes that were marked as matching
112
// in the previous step.
113
int rc = dist.send_to_matching (msg_, flags_);
117
// If we are at the end of multi-part message we can mark all the pipes
127
bool zmq::xpub_t::xhas_out ()
129
return dist.has_out ();
132
int zmq::xpub_t::xrecv (msg_t *msg_, int flags_)
134
// If there is at least one
135
if (pending.empty ()) {
140
int rc = msg_->close ();
141
errno_assert (rc == 0);
142
rc = msg_->init_size (pending.front ().size ());
143
errno_assert (rc == 0);
144
memcpy (msg_->data (), pending.front ().data (),
145
pending.front ().size ());
146
pending.pop_front ();
150
bool zmq::xpub_t::xhas_in ()
152
return !pending.empty ();
155
void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
158
xpub_t *self = (xpub_t*) arg_;
160
if (self->options.type != ZMQ_PUB) {
162
// Place the unsubscription to the queue of pending (un)sunscriptions
163
// to be retrived by the user later on.
164
xpub_t *self = (xpub_t*) arg_;
165
blob_t unsub (size_ + 1, 0);
167
memcpy (&unsub [1], data_, size_);
168
self->pending.push_back (unsub);
172
zmq::xpub_session_t::xpub_session_t (io_thread_t *io_thread_, bool connect_,
173
socket_base_t *socket_, const options_t &options_,
174
const char *protocol_, const char *address_) :
175
session_base_t (io_thread_, connect_, socket_, options_, protocol_,
180
zmq::xpub_session_t::~xpub_session_t ()