~ubuntu-branches/ubuntu/wily/zeromq3/wily-proposed

« back to all changes in this revision

Viewing changes to .pc/protocol-downgrade-attack.patch/src/session_base.cpp

  • Committer: Package Import Robot
  • Author(s): Laszlo Boszormenyi (GCS)
  • Date: 2015-05-05 21:06:02 UTC
  • Revision ID: package-import@ubuntu.com-20150505210602-7nmw5z31bei3puj6
Tags: 4.0.5+dfsg-3
V3 protocol handler vulnerable to downgrade attacks, use upstream
backported fix for this issue (closes: #784366).

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
    Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
 
3
 
 
4
    This file is part of 0MQ.
 
5
 
 
6
    0MQ is free software; you can redistribute it and/or modify it under
 
7
    the terms of the GNU Lesser General Public License as published by
 
8
    the Free Software Foundation; either version 3 of the License, or
 
9
    (at your option) any later version.
 
10
 
 
11
    0MQ is distributed in the hope that it will be useful,
 
12
    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
    GNU Lesser General Public License for more details.
 
15
 
 
16
    You should have received a copy of the GNU Lesser General Public License
 
17
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
18
*/
 
19
 
 
20
#include "session_base.hpp"
 
21
#include "i_engine.hpp"
 
22
#include "err.hpp"
 
23
#include "pipe.hpp"
 
24
#include "likely.hpp"
 
25
#include "tcp_connecter.hpp"
 
26
#include "ipc_connecter.hpp"
 
27
#include "pgm_sender.hpp"
 
28
#include "pgm_receiver.hpp"
 
29
#include "address.hpp"
 
30
 
 
31
#include "ctx.hpp"
 
32
#include "req.hpp"
 
33
 
 
34
zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
 
35
    bool connect_, class socket_base_t *socket_, const options_t &options_,
 
36
    const address_t *addr_)
 
37
{
 
38
    session_base_t *s = NULL;
 
39
    switch (options_.type) {
 
40
    case ZMQ_REQ:
 
41
        s = new (std::nothrow) req_session_t (io_thread_, connect_,
 
42
            socket_, options_, addr_);
 
43
        break;
 
44
    case ZMQ_DEALER:
 
45
    case ZMQ_REP:
 
46
    case ZMQ_ROUTER:
 
47
    case ZMQ_PUB:
 
48
    case ZMQ_XPUB:
 
49
    case ZMQ_SUB:
 
50
    case ZMQ_XSUB:
 
51
    case ZMQ_PUSH:
 
52
    case ZMQ_PULL:
 
53
    case ZMQ_PAIR:
 
54
    case ZMQ_STREAM:
 
55
        s = new (std::nothrow) session_base_t (io_thread_, connect_,
 
56
            socket_, options_, addr_);
 
57
        break;
 
58
    default:
 
59
        errno = EINVAL;
 
60
        return NULL;
 
61
    }
 
62
    alloc_assert (s);
 
63
    return s;
 
64
}
 
65
 
 
66
zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
 
67
      bool connect_, class socket_base_t *socket_, const options_t &options_,
 
68
      const address_t *addr_) :
 
69
    own_t (io_thread_, options_),
 
70
    io_object_t (io_thread_),
 
71
    connect (connect_),
 
72
    pipe (NULL),
 
73
    zap_pipe (NULL),
 
74
    incomplete_in (false),
 
75
    pending (false),
 
76
    engine (NULL),
 
77
    socket (socket_),
 
78
    io_thread (io_thread_),
 
79
    has_linger_timer (false),
 
80
    addr (addr_)
 
81
{
 
82
}
 
83
 
 
84
zmq::session_base_t::~session_base_t ()
 
85
{
 
86
    zmq_assert (!pipe);
 
87
    zmq_assert (!zap_pipe);
 
88
 
 
89
    //  If there's still a pending linger timer, remove it.
 
90
    if (has_linger_timer) {
 
91
        cancel_timer (linger_timer_id);
 
92
        has_linger_timer = false;
 
93
    }
 
94
 
 
95
    //  Close the engine.
 
96
    if (engine)
 
97
        engine->terminate ();
 
98
 
 
99
    delete addr;
 
100
}
 
101
 
 
102
void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
 
103
{
 
104
    zmq_assert (!is_terminating ());
 
105
    zmq_assert (!pipe);
 
106
    zmq_assert (pipe_);
 
107
    pipe = pipe_;
 
108
    pipe->set_event_sink (this);
 
109
}
 
110
 
 
111
int zmq::session_base_t::pull_msg (msg_t *msg_)
 
112
{
 
113
    if (!pipe || !pipe->read (msg_)) {
 
114
        errno = EAGAIN;
 
115
        return -1;
 
116
    }
 
117
    incomplete_in = msg_->flags () & msg_t::more ? true : false;
 
118
 
 
119
    return 0;
 
120
}
 
121
 
 
122
int zmq::session_base_t::push_msg (msg_t *msg_)
 
123
{
 
124
    if (pipe && pipe->write (msg_)) {
 
125
        int rc = msg_->init ();
 
126
        errno_assert (rc == 0);
 
127
        return 0;
 
128
    }
 
129
 
 
130
    errno = EAGAIN;
 
131
    return -1;
 
132
}
 
133
 
 
134
int zmq::session_base_t::read_zap_msg (msg_t *msg_)
 
135
{
 
136
    if (zap_pipe == NULL) {
 
137
        errno = ENOTCONN;
 
138
        return -1;
 
139
    }
 
140
 
 
141
    if (!zap_pipe->read (msg_)) {
 
142
        errno = EAGAIN;
 
143
        return -1;
 
144
    }
 
145
 
 
146
    return 0;
 
147
}
 
148
 
 
149
int zmq::session_base_t::write_zap_msg (msg_t *msg_)
 
150
{
 
151
    if (zap_pipe == NULL) {
 
152
        errno = ENOTCONN;
 
153
        return -1;
 
154
    }
 
155
 
 
156
    const bool ok = zap_pipe->write (msg_);
 
157
    zmq_assert (ok);
 
158
 
 
159
    if ((msg_->flags () & msg_t::more) == 0)
 
160
        zap_pipe->flush ();
 
161
 
 
162
    const int rc = msg_->init ();
 
163
    errno_assert (rc == 0);
 
164
    return 0;
 
165
}
 
166
 
 
167
void zmq::session_base_t::reset ()
 
168
{
 
169
}
 
170
 
 
171
void zmq::session_base_t::flush ()
 
172
{
 
173
    if (pipe)
 
174
        pipe->flush ();
 
175
}
 
176
 
 
177
void zmq::session_base_t::clean_pipes ()
 
178
{
 
179
    if (pipe) {
 
180
 
 
181
        //  Get rid of half-processed messages in the out pipe. Flush any
 
182
        //  unflushed messages upstream.
 
183
        pipe->rollback ();
 
184
        pipe->flush ();
 
185
 
 
186
        //  Remove any half-read message from the in pipe.
 
187
        while (incomplete_in) {
 
188
            msg_t msg;
 
189
            int rc = msg.init ();
 
190
            errno_assert (rc == 0);
 
191
            rc = pull_msg (&msg);
 
192
            errno_assert (rc == 0);
 
193
            rc = msg.close ();
 
194
            errno_assert (rc == 0);
 
195
        }
 
196
    }
 
197
}
 
198
 
 
199
void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
 
200
{
 
201
    // Drop the reference to the deallocated pipe if required.
 
202
    zmq_assert (pipe_ == pipe
 
203
             || pipe_ == zap_pipe
 
204
             || terminating_pipes.count (pipe_) == 1);
 
205
 
 
206
    if (pipe_ == pipe)
 
207
        // If this is our current pipe, remove it
 
208
        pipe = NULL;
 
209
    else
 
210
    if (pipe_ == zap_pipe) {
 
211
        zap_pipe = NULL;
 
212
    }
 
213
    else
 
214
        // Remove the pipe from the detached pipes set
 
215
        terminating_pipes.erase (pipe_);
 
216
 
 
217
    if (!is_terminating () && options.raw_sock) {
 
218
        if (engine) {
 
219
            engine->terminate ();
 
220
            engine = NULL;
 
221
        }
 
222
        terminate ();
 
223
    }
 
224
 
 
225
    //  If we are waiting for pending messages to be sent, at this point
 
226
    //  we are sure that there will be no more messages and we can proceed
 
227
    //  with termination safely.
 
228
    if (pending && !pipe && !zap_pipe && terminating_pipes.empty ())
 
229
        proceed_with_term ();
 
230
}
 
231
 
 
232
void zmq::session_base_t::read_activated (pipe_t *pipe_)
 
233
{
 
234
    // Skip activating if we're detaching this pipe
 
235
    if (unlikely(pipe_ != pipe && pipe_ != zap_pipe)) {
 
236
        zmq_assert (terminating_pipes.count (pipe_) == 1);
 
237
        return;
 
238
    }
 
239
 
 
240
    if (unlikely (engine == NULL)) {
 
241
        pipe->check_read ();
 
242
        return;
 
243
    }
 
244
 
 
245
    if (likely (pipe_ == pipe))
 
246
        engine->restart_output ();
 
247
    else
 
248
        engine->zap_msg_available ();
 
249
}
 
250
 
 
251
void zmq::session_base_t::write_activated (pipe_t *pipe_)
 
252
{
 
253
    // Skip activating if we're detaching this pipe
 
254
    if (pipe != pipe_) {
 
255
        zmq_assert (terminating_pipes.count (pipe_) == 1);
 
256
        return;
 
257
    }
 
258
 
 
259
    if (engine)
 
260
        engine->restart_input ();
 
261
}
 
262
 
 
263
void zmq::session_base_t::hiccuped (pipe_t *)
 
264
{
 
265
    //  Hiccups are always sent from session to socket, not the other
 
266
    //  way round.
 
267
    zmq_assert (false);
 
268
}
 
269
 
 
270
zmq::socket_base_t *zmq::session_base_t::get_socket ()
 
271
{
 
272
    return socket;
 
273
}
 
274
 
 
275
void zmq::session_base_t::process_plug ()
 
276
{
 
277
    if (connect)
 
278
        start_connecting (false);
 
279
}
 
280
 
 
281
int zmq::session_base_t::zap_connect ()
 
282
{
 
283
    zmq_assert (zap_pipe == NULL);
 
284
 
 
285
    endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
 
286
    if (peer.socket == NULL) {
 
287
        errno = ECONNREFUSED;
 
288
        return -1;
 
289
    }
 
290
    if (peer.options.type != ZMQ_REP
 
291
    &&  peer.options.type != ZMQ_ROUTER) {
 
292
        errno = ECONNREFUSED;
 
293
        return -1;
 
294
    }
 
295
 
 
296
    //  Create a bi-directional pipe that will connect
 
297
    //  session with zap socket.
 
298
    object_t *parents [2] = {this, peer.socket};
 
299
    pipe_t *new_pipes [2] = {NULL, NULL};
 
300
    int hwms [2] = {0, 0};
 
301
    bool conflates [2] = {false, false};
 
302
    int rc = pipepair (parents, new_pipes, hwms, conflates);
 
303
    errno_assert (rc == 0);
 
304
 
 
305
    //  Attach local end of the pipe to this socket object.
 
306
    zap_pipe = new_pipes [0];
 
307
    zap_pipe->set_nodelay ();
 
308
    zap_pipe->set_event_sink (this);
 
309
 
 
310
    send_bind (peer.socket, new_pipes [1], false);
 
311
 
 
312
    //  Send empty identity if required by the peer.
 
313
    if (peer.options.recv_identity) {
 
314
        msg_t id;
 
315
        rc = id.init ();
 
316
        errno_assert (rc == 0);
 
317
        id.set_flags (msg_t::identity);
 
318
        bool ok = zap_pipe->write (&id);
 
319
        zmq_assert (ok);
 
320
        zap_pipe->flush ();
 
321
    }
 
322
 
 
323
    return 0;
 
324
}
 
325
 
 
326
void zmq::session_base_t::process_attach (i_engine *engine_)
 
327
{
 
328
    zmq_assert (engine_ != NULL);
 
329
 
 
330
    //  Create the pipe if it does not exist yet.
 
331
    if (!pipe && !is_terminating ()) {
 
332
        object_t *parents [2] = {this, socket};
 
333
        pipe_t *pipes [2] = {NULL, NULL};
 
334
 
 
335
        bool conflate = options.conflate &&
 
336
            (options.type == ZMQ_DEALER ||
 
337
             options.type == ZMQ_PULL ||
 
338
             options.type == ZMQ_PUSH ||
 
339
             options.type == ZMQ_PUB ||
 
340
             options.type == ZMQ_SUB);
 
341
 
 
342
        int hwms [2] = {conflate? -1 : options.rcvhwm,
 
343
            conflate? -1 : options.sndhwm};
 
344
        bool conflates [2] = {conflate, conflate};
 
345
        int rc = pipepair (parents, pipes, hwms, conflates);
 
346
        errno_assert (rc == 0);
 
347
 
 
348
        //  Plug the local end of the pipe.
 
349
        pipes [0]->set_event_sink (this);
 
350
 
 
351
        //  Remember the local end of the pipe.
 
352
        zmq_assert (!pipe);
 
353
        pipe = pipes [0];
 
354
 
 
355
        //  Ask socket to plug into the remote end of the pipe.
 
356
        send_bind (socket, pipes [1]);
 
357
    }
 
358
 
 
359
    //  Plug in the engine.
 
360
    zmq_assert (!engine);
 
361
    engine = engine_;
 
362
    engine->plug (io_thread, this);
 
363
}
 
364
 
 
365
void zmq::session_base_t::detach ()
 
366
{
 
367
    //  Engine is dead. Let's forget about it.
 
368
    engine = NULL;
 
369
 
 
370
    //  Remove any half-done messages from the pipes.
 
371
    clean_pipes ();
 
372
 
 
373
    //  Send the event to the derived class.
 
374
    detached ();
 
375
 
 
376
    //  Just in case there's only a delimiter in the pipe.
 
377
    if (pipe)
 
378
        pipe->check_read ();
 
379
 
 
380
    if (zap_pipe)
 
381
        zap_pipe->check_read ();
 
382
}
 
383
 
 
384
void zmq::session_base_t::process_term (int linger_)
 
385
{
 
386
    zmq_assert (!pending);
 
387
 
 
388
    //  If the termination of the pipe happens before the term command is
 
389
    //  delivered there's nothing much to do. We can proceed with the
 
390
    //  standard termination immediately.
 
391
    if (!pipe && !zap_pipe) {
 
392
        proceed_with_term ();
 
393
        return;
 
394
    }
 
395
 
 
396
    pending = true;
 
397
 
 
398
    if (pipe != NULL) {
 
399
        //  If there's finite linger value, delay the termination.
 
400
        //  If linger is infinite (negative) we don't even have to set
 
401
        //  the timer.
 
402
        if (linger_ > 0) {
 
403
            zmq_assert (!has_linger_timer);
 
404
            add_timer (linger_, linger_timer_id);
 
405
            has_linger_timer = true;
 
406
        }
 
407
 
 
408
        //  Start pipe termination process. Delay the termination till all messages
 
409
        //  are processed in case the linger time is non-zero.
 
410
        pipe->terminate (linger_ != 0);
 
411
 
 
412
        //  TODO: Should this go into pipe_t::terminate ?
 
413
        //  In case there's no engine and there's only delimiter in the
 
414
        //  pipe it wouldn't be ever read. Thus we check for it explicitly.
 
415
        pipe->check_read ();
 
416
    }
 
417
 
 
418
    if (zap_pipe != NULL)
 
419
        zap_pipe->terminate (false);
 
420
}
 
421
 
 
422
void zmq::session_base_t::proceed_with_term ()
 
423
{
 
424
    //  The pending phase has just ended.
 
425
    pending = false;
 
426
 
 
427
    //  Continue with standard termination.
 
428
    own_t::process_term (0);
 
429
}
 
430
 
 
431
void zmq::session_base_t::timer_event (int id_)
 
432
{
 
433
 
 
434
    //  Linger period expired. We can proceed with termination even though
 
435
    //  there are still pending messages to be sent.
 
436
    zmq_assert (id_ == linger_timer_id);
 
437
    has_linger_timer = false;
 
438
 
 
439
    //  Ask pipe to terminate even though there may be pending messages in it.
 
440
    zmq_assert (pipe);
 
441
    pipe->terminate (false);
 
442
}
 
443
 
 
444
void zmq::session_base_t::detached ()
 
445
{
 
446
    //  Transient session self-destructs after peer disconnects.
 
447
    if (!connect) {
 
448
        terminate ();
 
449
        return;
 
450
    }
 
451
 
 
452
    //  For delayed connect situations, terminate the pipe
 
453
    //  and reestablish later on
 
454
    if (pipe && options.immediate == 1
 
455
        && addr->protocol != "pgm" && addr->protocol != "epgm") {
 
456
        pipe->hiccup ();
 
457
        pipe->terminate (false);
 
458
        terminating_pipes.insert (pipe);
 
459
        pipe = NULL;
 
460
    }
 
461
 
 
462
    reset ();
 
463
 
 
464
    //  Reconnect.
 
465
    if (options.reconnect_ivl != -1)
 
466
        start_connecting (true);
 
467
 
 
468
    //  For subscriber sockets we hiccup the inbound pipe, which will cause
 
469
    //  the socket object to resend all the subscriptions.
 
470
    if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
 
471
        pipe->hiccup ();
 
472
}
 
473
 
 
474
void zmq::session_base_t::start_connecting (bool wait_)
 
475
{
 
476
    zmq_assert (connect);
 
477
 
 
478
    //  Choose I/O thread to run connecter in. Given that we are already
 
479
    //  running in an I/O thread, there must be at least one available.
 
480
    io_thread_t *io_thread = choose_io_thread (options.affinity);
 
481
    zmq_assert (io_thread);
 
482
 
 
483
    //  Create the connecter object.
 
484
 
 
485
    if (addr->protocol == "tcp") {
 
486
        tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t (
 
487
            io_thread, this, options, addr, wait_);
 
488
        alloc_assert (connecter);
 
489
        launch_child (connecter);
 
490
        return;
 
491
    }
 
492
 
 
493
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
 
494
    if (addr->protocol == "ipc") {
 
495
        ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t (
 
496
            io_thread, this, options, addr, wait_);
 
497
        alloc_assert (connecter);
 
498
        launch_child (connecter);
 
499
        return;
 
500
    }
 
501
#endif
 
502
 
 
503
#ifdef ZMQ_HAVE_OPENPGM
 
504
 
 
505
    //  Both PGM and EPGM transports are using the same infrastructure.
 
506
    if (addr->protocol == "pgm" || addr->protocol == "epgm") {
 
507
 
 
508
        zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
 
509
                 || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);
 
510
 
 
511
        //  For EPGM transport with UDP encapsulation of PGM is used.
 
512
        bool const udp_encapsulation = addr->protocol == "epgm";
 
513
 
 
514
        //  At this point we'll create message pipes to the session straight
 
515
        //  away. There's no point in delaying it as no concept of 'connect'
 
516
        //  exists with PGM anyway.
 
517
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
 
518
 
 
519
            //  PGM sender.
 
520
            pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
 
521
                io_thread, options);
 
522
            alloc_assert (pgm_sender);
 
523
 
 
524
            int rc = pgm_sender->init (udp_encapsulation, addr->address.c_str ());
 
525
            errno_assert (rc == 0);
 
526
 
 
527
            send_attach (this, pgm_sender);
 
528
        }
 
529
        else {
 
530
 
 
531
            //  PGM receiver.
 
532
            pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
 
533
                io_thread, options);
 
534
            alloc_assert (pgm_receiver);
 
535
 
 
536
            int rc = pgm_receiver->init (udp_encapsulation, addr->address.c_str ());
 
537
            errno_assert (rc == 0);
 
538
 
 
539
            send_attach (this, pgm_receiver);
 
540
        }
 
541
 
 
542
        return;
 
543
    }
 
544
#endif
 
545
 
 
546
    zmq_assert (false);
 
547
}
 
548