44
42
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
45
const options_t &options_, const char *name_) :
43
const options_t &options_, const blob_t &peer_identity_) :
46
44
owned_t (parent_, owner_),
50
peer_identity (peer_identity_),
60
// TODO: Generate unique name here.
53
if (!peer_identity.empty () && peer_identity [0] != 0) {
54
if (!owner->register_session (peer_identity, this)) {
56
// TODO: There's already a session with the specified
57
// identity. We should presumably syslog it and drop the
121
120
uint64_t zmq::session_t::get_ordinal ()
123
zmq_assert (type == unnamed);
124
122
zmq_assert (ordinal);
128
126
void zmq::session_t::attach_pipes (class reader_t *inpipe_,
129
class writer_t *outpipe_)
127
class writer_t *outpipe_, const blob_t &peer_identity_)
132
130
zmq_assert (!in_pipe);
166
164
engine->revive ();
167
void zmq::session_t::revive (writer_t *pipe_)
169
zmq_assert (out_pipe == pipe_);
171
engine->resume_input ();
169
174
void zmq::session_t::process_plug ()
171
// Register the session with the socket.
172
if (!name.empty ()) {
173
bool ok = owner->register_session (name.c_str (), this);
175
// There's already a session with the specified identity.
176
// We should syslog it and drop the session. TODO
180
// If session is created by 'connect' function, it has the pipes set
181
// already. Otherwise, it's being created by the listener and the pipes
182
// are yet to be created.
183
if (!in_pipe && !out_pipe) {
185
pipe_t *inbound = NULL;
186
pipe_t *outbound = NULL;
188
if (options.requires_out) {
189
inbound = new (std::nothrow) pipe_t (this, owner,
190
options.hwm, options.lwm);
191
zmq_assert (inbound);
192
in_pipe = &inbound->reader;
193
in_pipe->set_endpoint (this);
196
if (options.requires_in) {
197
outbound = new (std::nothrow) pipe_t (owner, this,
198
options.hwm, options.lwm);
199
zmq_assert (outbound);
200
out_pipe = &outbound->writer;
201
out_pipe->set_endpoint (this);
204
send_bind (owner, outbound ? &outbound->reader : NULL,
205
inbound ? &inbound->writer : NULL);
209
178
void zmq::session_t::process_unplug ()
211
// Unregister the session from the socket. There's nothing to do here
212
// for transient sessions.
180
// Unregister the session from the socket.
214
182
owner->unregister_session (ordinal);
215
else if (type == named)
216
owner->unregister_session (name.c_str ());
183
else if (!peer_identity.empty () && peer_identity [0] != 0)
184
owner->unregister_session (peer_identity);
218
186
// Ask associated pipes to terminate.
235
void zmq::session_t::process_attach (i_engine *engine_)
203
void zmq::session_t::process_attach (i_engine *engine_,
204
const blob_t &peer_identity_)
206
if (!peer_identity.empty ()) {
208
// If both IDs are temporary, no checking is needed.
209
// TODO: Old ID should be reused in this case...
210
if (peer_identity.empty () || peer_identity [0] != 0 ||
211
peer_identity_.empty () || peer_identity_ [0] != 0) {
213
// If we already know the peer name do nothing, just check whether
214
// it haven't changed.
215
zmq_assert (peer_identity == peer_identity_);
218
else if (!peer_identity_.empty ()) {
220
// Store the peer identity.
221
peer_identity = peer_identity_;
223
// If the session is not registered with the ordinal, let's register
224
// it using the peer name.
226
if (!owner->register_session (peer_identity, this)) {
228
// TODO: There's already a session with the specified
229
// identity. We should presumably syslog it and drop the
236
// Check whether the required pipes already exist. If not so, we'll
237
// create them and bind them to the socket object.
238
reader_t *socket_reader = NULL;
239
writer_t *socket_writer = NULL;
241
if (options.requires_in && !out_pipe) {
242
pipe_t *pipe = new (std::nothrow) pipe_t (owner, this,
243
options.hwm, options.lwm);
245
out_pipe = &pipe->writer;
246
out_pipe->set_endpoint (this);
247
socket_reader = &pipe->reader;
250
if (options.requires_out && !in_pipe) {
251
pipe_t *pipe = new (std::nothrow) pipe_t (this, owner,
252
options.hwm, options.lwm);
254
in_pipe = &pipe->reader;
255
in_pipe->set_endpoint (this);
256
socket_writer = &pipe->writer;
259
if (socket_reader || socket_writer)
260
send_bind (owner, socket_reader, socket_writer, peer_identity);
262
// Plug in the engine.
237
263
zmq_assert (!engine);
238
264
zmq_assert (engine_);
239
265
engine = engine_;
240
266
engine->plug (this);
268
// Once the initial handshaking is over tracerouting should trim prefixes
269
// from outbound messages.
270
if (options.traceroute)
271
engine->trim_prefix ();