~ubuntu-branches/ubuntu/maverick/zeromq/maverick

« back to all changes in this revision

Viewing changes to src/session.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Adrian von Bidder
  • Date: 2010-03-17 10:43:40 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20100317104340-un1ne0oqe16w8eaq
Tags: 2.0.6beta.dfsg-1
* New upstream version.
  - Source doesn't include non-C/C++ language bindings anymore.
  - New versioning: 2.0.6 is official upstream version which is a beta.
* Repacked orig tar: removed non-free RFC documents (closes: #567513)
* Improved/corrected description and copyright file, added bzip2 build
  dependency.  Thanks to feedback from zeromq mailing list.
* Disable OpenPGM on non-x86 architectures (closes: #567848)

Show diffs side-by-side

added added

removed removed

Lines of Context:
32
32
    out_pipe (NULL),
33
33
    engine (NULL),
34
34
    options (options_)
35
 
{
36
 
    type = unnamed;
37
 
    
 
35
{    
38
36
    //  It's possible to register the session at this point as it will be
39
37
    //  searched for only on reconnect, i.e. no race condition (session found
40
38
    //  before it is plugged into it's I/O thread) is possible.
42
40
}
43
41
 
44
42
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
45
 
      const options_t &options_, const char *name_) :
 
43
      const options_t &options_, const blob_t &peer_identity_) :
46
44
    owned_t (parent_, owner_),
47
45
    in_pipe (NULL),
48
46
    active (true),
49
47
    out_pipe (NULL),
50
48
    engine (NULL),
 
49
    ordinal (0),
 
50
    peer_identity (peer_identity_),
51
51
    options (options_)
52
52
{
53
 
    if (name_) {
54
 
        type = named;
55
 
        name = name_;
56
 
        ordinal = 0;
57
 
    }
58
 
    else {
59
 
        type = transient;
60
 
        //  TODO: Generate unique name here.
61
 
        ordinal = 0;
 
53
    if (!peer_identity.empty () && peer_identity [0] != 0) {
 
54
        if (!owner->register_session (peer_identity, this)) {
 
55
 
 
56
            //  TODO: There's already a session with the specified
 
57
            //  identity. We should presumably syslog it and drop the
 
58
            //  session.
 
59
            zmq_assert (false);
 
60
        }
62
61
    }
63
62
}
64
63
 
78
77
 
79
78
bool zmq::session_t::write (::zmq_msg_t *msg_)
80
79
{
81
 
    if (out_pipe->write (msg_)) {
 
80
    if (out_pipe && out_pipe->write (msg_)) {
82
81
        zmq_msg_init (msg_);
83
82
        return true;
84
83
    }
104
103
    engine = NULL;
105
104
 
106
105
    //  Terminate transient session.
107
 
    if (type == transient)
 
106
    if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0))
108
107
        term ();
109
108
}
110
109
 
120
119
 
121
120
uint64_t zmq::session_t::get_ordinal ()
122
121
{
123
 
    zmq_assert (type == unnamed);
124
122
    zmq_assert (ordinal);
125
123
    return ordinal;
126
124
}
127
125
 
128
126
void zmq::session_t::attach_pipes (class reader_t *inpipe_,
129
 
    class writer_t *outpipe_)
 
127
    class writer_t *outpipe_, const blob_t &peer_identity_)
130
128
{
131
129
    if (inpipe_) {
132
130
        zmq_assert (!in_pipe);
166
164
        engine->revive ();
167
165
}
168
166
 
 
167
void zmq::session_t::revive (writer_t *pipe_)
 
168
{
 
169
    zmq_assert (out_pipe == pipe_);
 
170
    if (engine)
 
171
        engine->resume_input ();
 
172
}
 
173
 
169
174
void zmq::session_t::process_plug ()
170
175
{
171
 
    //  Register the session with the socket.
172
 
    if (!name.empty ()) {
173
 
        bool ok = owner->register_session (name.c_str (), this);
174
 
 
175
 
        //  There's already a session with the specified identity.
176
 
        //  We should syslog it and drop the session. TODO
177
 
        zmq_assert (ok);
178
 
    }
179
 
 
180
 
    //  If session is created by 'connect' function, it has the pipes set
181
 
    //  already. Otherwise, it's being created by the listener and the pipes
182
 
    //  are yet to be created.
183
 
    if (!in_pipe && !out_pipe) {
184
 
 
185
 
        pipe_t *inbound = NULL;
186
 
        pipe_t *outbound = NULL;
187
 
 
188
 
        if (options.requires_out) {
189
 
            inbound = new (std::nothrow) pipe_t (this, owner,
190
 
                options.hwm, options.lwm);
191
 
            zmq_assert (inbound);
192
 
            in_pipe = &inbound->reader;
193
 
            in_pipe->set_endpoint (this);
194
 
        }
195
 
 
196
 
        if (options.requires_in) {
197
 
            outbound = new (std::nothrow) pipe_t (owner, this,
198
 
                options.hwm, options.lwm);
199
 
            zmq_assert (outbound);
200
 
            out_pipe = &outbound->writer;
201
 
            out_pipe->set_endpoint (this);
202
 
        }
203
 
 
204
 
        send_bind (owner, outbound ? &outbound->reader : NULL,
205
 
            inbound ? &inbound->writer : NULL);
206
 
    }
207
176
}
208
177
 
209
178
void zmq::session_t::process_unplug ()
210
179
{
211
 
    //  Unregister the session from the socket. There's nothing to do here
212
 
    //  for transient sessions.
213
 
    if (type == unnamed)
 
180
    //  Unregister the session from the socket.
 
181
    if (ordinal)
214
182
        owner->unregister_session (ordinal);
215
 
    else if (type == named)
216
 
        owner->unregister_session (name.c_str ());
 
183
    else if (!peer_identity.empty () && peer_identity [0] != 0)
 
184
        owner->unregister_session (peer_identity);
217
185
 
218
186
    //  Ask associated pipes to terminate.
219
187
    if (in_pipe) {
232
200
    }
233
201
}
234
202
 
235
 
void zmq::session_t::process_attach (i_engine *engine_)
 
203
void zmq::session_t::process_attach (i_engine *engine_,
 
204
    const blob_t &peer_identity_)
236
205
{
 
206
    if (!peer_identity.empty ()) {
 
207
 
 
208
        //  If both IDs are temporary, no checking is needed.
 
209
        //  TODO: Old ID should be reused in this case...
 
210
        if (peer_identity.empty () || peer_identity [0] != 0 ||
 
211
            peer_identity_.empty () || peer_identity_ [0] != 0) {
 
212
 
 
213
            //  If we already know the peer name do nothing, just check whether
 
214
            //  it haven't changed.
 
215
            zmq_assert (peer_identity == peer_identity_);
 
216
        }
 
217
    }
 
218
    else if (!peer_identity_.empty ()) {
 
219
 
 
220
        //  Store the peer identity.
 
221
        peer_identity = peer_identity_;
 
222
 
 
223
        //  If the session is not registered with the ordinal, let's register
 
224
        //  it using the peer name.
 
225
        if (!ordinal) {
 
226
            if (!owner->register_session (peer_identity, this)) {
 
227
 
 
228
                //  TODO: There's already a session with the specified
 
229
                //  identity. We should presumably syslog it and drop the
 
230
                //  session.
 
231
                zmq_assert (false);
 
232
            }
 
233
        }
 
234
    }
 
235
 
 
236
    //  Check whether the required pipes already exist. If not so, we'll
 
237
    //  create them and bind them to the socket object.
 
238
    reader_t *socket_reader = NULL;
 
239
    writer_t *socket_writer = NULL;
 
240
 
 
241
    if (options.requires_in && !out_pipe) {
 
242
        pipe_t *pipe = new (std::nothrow) pipe_t (owner, this,
 
243
            options.hwm, options.lwm);
 
244
        zmq_assert (pipe);
 
245
        out_pipe = &pipe->writer;
 
246
        out_pipe->set_endpoint (this);
 
247
        socket_reader = &pipe->reader;
 
248
    }
 
249
 
 
250
    if (options.requires_out && !in_pipe) {
 
251
        pipe_t *pipe = new (std::nothrow) pipe_t (this, owner,
 
252
            options.hwm, options.lwm);
 
253
        zmq_assert (pipe);
 
254
        in_pipe = &pipe->reader;
 
255
        in_pipe->set_endpoint (this);
 
256
        socket_writer = &pipe->writer;
 
257
    }
 
258
 
 
259
    if (socket_reader || socket_writer)
 
260
        send_bind (owner, socket_reader, socket_writer, peer_identity);
 
261
 
 
262
    //  Plug in the engine.
237
263
    zmq_assert (!engine);
238
264
    zmq_assert (engine_);
239
265
    engine = engine_;
240
266
    engine->plug (this);
 
267
 
 
268
    //  Once the initial handshaking is over tracerouting should trim prefixes
 
269
    //  from outbound messages.
 
270
    if (options.traceroute)
 
271
        engine->trim_prefix ();
241
272
}