~ubuntu-branches/ubuntu/saucy/zeromq3/saucy

« back to all changes in this revision

Viewing changes to src/xpub.cpp

  • Committer: Package Import Robot
  • Author(s): Alessandro Ghedini
  • Date: 2012-06-04 21:21:09 UTC
  • Revision ID: package-import@ubuntu.com-20120604212109-b7b3m0rn21o8oo2q
Tags: upstream-3.1.0~beta+dfsg
ImportĀ upstreamĀ versionĀ 3.1.0~beta+dfsg

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
    Copyright (c) 2010-2011 250bpm s.r.o.
 
3
    Copyright (c) 2011 VMware, Inc.
 
4
    Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
 
5
 
 
6
    This file is part of 0MQ.
 
7
 
 
8
    0MQ is free software; you can redistribute it and/or modify it under
 
9
    the terms of the GNU Lesser General Public License as published by
 
10
    the Free Software Foundation; either version 3 of the License, or
 
11
    (at your option) any later version.
 
12
 
 
13
    0MQ is distributed in the hope that it will be useful,
 
14
    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
15
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
16
    GNU Lesser General Public License for more details.
 
17
 
 
18
    You should have received a copy of the GNU Lesser General Public License
 
19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
20
*/
 
21
 
 
22
#include <string.h>
 
23
 
 
24
#include "xpub.hpp"
 
25
#include "pipe.hpp"
 
26
#include "err.hpp"
 
27
#include "msg.hpp"
 
28
 
 
29
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) :
 
30
    socket_base_t (parent_, tid_),
 
31
    more (false)
 
32
{
 
33
    options.type = ZMQ_XPUB;
 
34
}
 
35
 
 
36
zmq::xpub_t::~xpub_t ()
 
37
{
 
38
}
 
39
 
 
40
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_)
 
41
{
 
42
    zmq_assert (pipe_);
 
43
    dist.attach (pipe_);
 
44
 
 
45
    //  The pipe is active when attached. Let's read the subscriptions from
 
46
    //  it, if any.
 
47
    xread_activated (pipe_);
 
48
}
 
49
 
 
50
void zmq::xpub_t::xread_activated (pipe_t *pipe_)
 
51
{
 
52
    //  There are some subscriptions waiting. Let's process them.
 
53
    msg_t sub;
 
54
    sub.init ();
 
55
    while (true) {
 
56
 
 
57
        //  Grab next subscription.
 
58
        if (!pipe_->read (&sub)) {
 
59
            sub.close ();
 
60
            return;
 
61
        }
 
62
 
 
63
        //  Apply the subscription to the trie.
 
64
        unsigned char *data = (unsigned char*) sub.data ();
 
65
        size_t size = sub.size ();
 
66
        zmq_assert (size > 0 && (*data == 0 || *data == 1));
 
67
        bool unique;
 
68
                if (*data == 0)
 
69
                    unique = subscriptions.rm (data + 1, size - 1, pipe_);
 
70
                else
 
71
                    unique = subscriptions.add (data + 1, size - 1, pipe_);
 
72
 
 
73
        //  If the subscription is not a duplicate store it so that it can be
 
74
        //  passed to used on next recv call.
 
75
        if (unique && options.type != ZMQ_PUB)
 
76
            pending.push_back (blob_t ((unsigned char*) sub.data (),
 
77
                sub.size ()));
 
78
    }
 
79
}
 
80
 
 
81
void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
 
82
{
 
83
    dist.activated (pipe_);
 
84
}
 
85
 
 
86
void zmq::xpub_t::xterminated (pipe_t *pipe_)
 
87
{
 
88
    //  Remove the pipe from the trie. If there are topics that nobody
 
89
    //  is interested in anymore, send corresponding unsubscriptions
 
90
    //  upstream.
 
91
    subscriptions.rm (pipe_, send_unsubscription, this);
 
92
 
 
93
    dist.terminated (pipe_);
 
94
}
 
95
 
 
96
void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
 
97
{
 
98
    xpub_t *self = (xpub_t*) arg_;
 
99
    self->dist.match (pipe_);
 
100
}
 
101
 
 
102
int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
 
103
{
 
104
    bool msg_more = msg_->flags () & msg_t::more ? true : false;
 
105
 
 
106
    //  For the first part of multi-part message, find the matching pipes.
 
107
    if (!more)
 
108
        subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
 
109
            mark_as_matching, this);
 
110
 
 
111
    //  Send the message to all the pipes that were marked as matching
 
112
    //  in the previous step.
 
113
    int rc = dist.send_to_matching (msg_, flags_);
 
114
    if (rc != 0)
 
115
        return rc;
 
116
 
 
117
    //  If we are at the end of multi-part message we can mark all the pipes
 
118
    //  as non-matching.
 
119
    if (!msg_more)
 
120
        dist.unmatch ();
 
121
 
 
122
    more = msg_more;
 
123
 
 
124
    return 0;
 
125
}
 
126
 
 
127
bool zmq::xpub_t::xhas_out ()
 
128
{
 
129
    return dist.has_out ();
 
130
}
 
131
 
 
132
int zmq::xpub_t::xrecv (msg_t *msg_, int flags_)
 
133
{
 
134
    //  If there is at least one 
 
135
    if (pending.empty ()) {
 
136
        errno = EAGAIN;
 
137
        return -1;
 
138
    }
 
139
 
 
140
    int rc = msg_->close ();
 
141
    errno_assert (rc == 0);
 
142
    rc = msg_->init_size (pending.front ().size ());
 
143
    errno_assert (rc == 0);
 
144
    memcpy (msg_->data (), pending.front ().data (),
 
145
        pending.front ().size ());
 
146
    pending.pop_front ();
 
147
    return 0;
 
148
}
 
149
 
 
150
bool zmq::xpub_t::xhas_in ()
 
151
{
 
152
    return !pending.empty ();
 
153
}
 
154
 
 
155
void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
 
156
    void *arg_)
 
157
{
 
158
    xpub_t *self = (xpub_t*) arg_;
 
159
 
 
160
    if (self->options.type != ZMQ_PUB) {
 
161
 
 
162
                //  Place the unsubscription to the queue of pending (un)sunscriptions
 
163
                //  to be retrived by the user later on.
 
164
                xpub_t *self = (xpub_t*) arg_;
 
165
                blob_t unsub (size_ + 1, 0);
 
166
                unsub [0] = 0;
 
167
                memcpy (&unsub [1], data_, size_);
 
168
                self->pending.push_back (unsub);
 
169
    }
 
170
}
 
171
 
 
172
zmq::xpub_session_t::xpub_session_t (io_thread_t *io_thread_, bool connect_,
 
173
      socket_base_t *socket_, const options_t &options_,
 
174
      const char *protocol_, const char *address_) :
 
175
    session_base_t (io_thread_, connect_, socket_, options_, protocol_,
 
176
        address_)
 
177
{
 
178
}
 
179
 
 
180
zmq::xpub_session_t::~xpub_session_t ()
 
181
{
 
182
}
 
183