~ubuntu-branches/ubuntu/saucy/zeromq3/saucy

« back to all changes in this revision

Viewing changes to src/own.cpp

  • Committer: Package Import Robot
  • Author(s): Alessandro Ghedini
  • Date: 2012-06-04 21:21:09 UTC
  • Revision ID: package-import@ubuntu.com-20120604212109-b7b3m0rn21o8oo2q
Tags: upstream-3.1.0~beta+dfsg
ImportĀ upstreamĀ versionĀ 3.1.0~beta+dfsg

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
    Copyright (c) 2010-2011 250bpm s.r.o.
 
3
    Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
 
4
 
 
5
    This file is part of 0MQ.
 
6
 
 
7
    0MQ is free software; you can redistribute it and/or modify it under
 
8
    the terms of the GNU Lesser General Public License as published by
 
9
    the Free Software Foundation; either version 3 of the License, or
 
10
    (at your option) any later version.
 
11
 
 
12
    0MQ is distributed in the hope that it will be useful,
 
13
    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
14
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
15
    GNU Lesser General Public License for more details.
 
16
 
 
17
    You should have received a copy of the GNU Lesser General Public License
 
18
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
19
*/
 
20
 
 
21
#include "own.hpp"
 
22
#include "err.hpp"
 
23
#include "io_thread.hpp"
 
24
 
 
25
zmq::own_t::own_t (class ctx_t *parent_, uint32_t tid_) :
 
26
    object_t (parent_, tid_),
 
27
    terminating (false),
 
28
    sent_seqnum (0),
 
29
    processed_seqnum (0),
 
30
    owner (NULL),
 
31
    term_acks (0)
 
32
{
 
33
}
 
34
 
 
35
zmq::own_t::own_t (io_thread_t *io_thread_, const options_t &options_) :
 
36
    object_t (io_thread_),
 
37
    options (options_),
 
38
    terminating (false),
 
39
    sent_seqnum (0),
 
40
    processed_seqnum (0),
 
41
    owner (NULL),
 
42
    term_acks (0)
 
43
{
 
44
}
 
45
 
 
46
zmq::own_t::~own_t ()
 
47
{
 
48
}
 
49
 
 
50
void zmq::own_t::set_owner (own_t *owner_)
 
51
{
 
52
    zmq_assert (!owner);
 
53
    owner = owner_;
 
54
}
 
55
 
 
56
void zmq::own_t::inc_seqnum ()
 
57
{
 
58
    //  This function may be called from a different thread!
 
59
    sent_seqnum.add (1);
 
60
}
 
61
 
 
62
void zmq::own_t::process_seqnum ()
 
63
{
 
64
    //  Catch up with counter of processed commands.
 
65
    processed_seqnum++;
 
66
 
 
67
    //  We may have catched up and still have pending terms acks.
 
68
    check_term_acks ();
 
69
}
 
70
 
 
71
void zmq::own_t::launch_child (own_t *object_)
 
72
{
 
73
    //  Specify the owner of the object.
 
74
    object_->set_owner (this);
 
75
 
 
76
    //  Plug the object into the I/O thread.
 
77
    send_plug (object_);
 
78
 
 
79
    //  Take ownership of the object.
 
80
    send_own (this, object_);
 
81
}
 
82
 
 
83
void zmq::own_t::launch_sibling (own_t *object_)
 
84
{
 
85
    //  At this point it is important that object is plugged in before its
 
86
    //  owner has a chance to terminate it. Thus, 'plug' command is sent before
 
87
    //  the 'own' command. Given that the mailbox preserves ordering of
 
88
    //  commands, 'term' command from the owner cannot make it to the object
 
89
    //  before the already written 'plug' command.
 
90
 
 
91
    //  Specify the owner of the object.
 
92
    object_->set_owner (owner);
 
93
 
 
94
    //  Plug the object into its I/O thread.
 
95
    send_plug (object_);
 
96
 
 
97
    //  Make parent own the object.
 
98
    send_own (owner, object_);
 
99
}
 
100
 
 
101
void zmq::own_t::process_term_req (own_t *object_)
 
102
{
 
103
    //  When shutting down we can ignore termination requests from owned
 
104
    //  objects. The termination request was already sent to the object.
 
105
    if (terminating)
 
106
        return;
 
107
 
 
108
    //  If I/O object is well and alive let's ask it to terminate.
 
109
    owned_t::iterator it = std::find (owned.begin (), owned.end (), object_);
 
110
 
 
111
    //  If not found, we assume that termination request was already sent to
 
112
    //  the object so we can safely ignore the request.
 
113
    if (it == owned.end ())
 
114
        return;
 
115
 
 
116
    owned.erase (it);
 
117
    register_term_acks (1);
 
118
 
 
119
    //  Note that this object is the root of the (partial shutdown) thus, its
 
120
    //  value of linger is used, rather than the value stored by the children.
 
121
    send_term (object_, options.linger);
 
122
}
 
123
 
 
124
void zmq::own_t::process_own (own_t *object_)
 
125
{
 
126
    //  If the object is already being shut down, new owned objects are
 
127
    //  immediately asked to terminate. Note that linger is set to zero.
 
128
    if (terminating) {
 
129
        register_term_acks (1);
 
130
        send_term (object_, 0);
 
131
        return;
 
132
    }
 
133
 
 
134
    //  Store the reference to the owned object.
 
135
    owned.insert (object_);
 
136
}
 
137
 
 
138
void zmq::own_t::terminate ()
 
139
{
 
140
    //  If termination is already underway, there's no point
 
141
    //  in starting it anew.
 
142
    if (terminating)
 
143
        return;
 
144
 
 
145
    //  As for the root of the ownership tree, there's noone to terminate it,
 
146
    //  so it has to terminate itself.
 
147
    if (!owner) {
 
148
        process_term (options.linger);
 
149
        return;
 
150
    }
 
151
 
 
152
    //  If I am an owned object, I'll ask my owner to terminate me.
 
153
    send_term_req (owner, this);
 
154
}
 
155
 
 
156
bool zmq::own_t::is_terminating ()
 
157
{
 
158
    return terminating;
 
159
}
 
160
 
 
161
void zmq::own_t::process_term (int linger_)
 
162
{
 
163
    //  Double termination should never happen.
 
164
    zmq_assert (!terminating);
 
165
 
 
166
    //  Send termination request to all owned objects.
 
167
    for (owned_t::iterator it = owned.begin (); it != owned.end (); ++it)
 
168
        send_term (*it, linger_);
 
169
    register_term_acks ((int) owned.size ());
 
170
    owned.clear ();
 
171
 
 
172
    //  Start termination process and check whether by chance we cannot
 
173
    //  terminate immediately.
 
174
    terminating = true;
 
175
    check_term_acks ();
 
176
}
 
177
 
 
178
void zmq::own_t::register_term_acks (int count_)
 
179
{
 
180
    term_acks += count_;
 
181
}
 
182
 
 
183
void zmq::own_t::unregister_term_ack ()
 
184
{
 
185
    zmq_assert (term_acks > 0);
 
186
    term_acks--;
 
187
 
 
188
    //  This may be a last ack we are waiting for before termination...
 
189
    check_term_acks (); 
 
190
}
 
191
 
 
192
void zmq::own_t::process_term_ack ()
 
193
{
 
194
    unregister_term_ack ();
 
195
}
 
196
 
 
197
void zmq::own_t::check_term_acks ()
 
198
{
 
199
    if (terminating && processed_seqnum == sent_seqnum.get () &&
 
200
          term_acks == 0) {
 
201
 
 
202
        //  Sanity check. There should be no active children at this point.
 
203
        zmq_assert (owned.empty ());
 
204
 
 
205
        //  The root object has nobody to confirm the termination to.
 
206
        //  Other nodes will confirm the termination to the owner.
 
207
        if (owner)
 
208
            send_term_ack (owner);
 
209
 
 
210
        //  Deallocate the resources.
 
211
        process_destroy ();
 
212
    }
 
213
}
 
214
 
 
215
void zmq::own_t::process_destroy ()
 
216
{
 
217
    delete this;
 
218
}
 
219