~ignacio-nin/+junk/percona-xtradb-cluster-galera

« back to all changes in this revision

Viewing changes to gcs/src/gcs_gcomm.cpp

  • Committer: Alex Yurchenko
  • Date: 2011-11-13 20:37:10 UTC
  • mfrom: (87.1.2 1.x)
  • Revision ID: ayurchen@void-20111113203710-l2hjr20de14hk79j
References lp:884566 - synced with SVN branch 2.x r2509

Show diffs side-by-side

added added

removed removed

Lines of Context:
182
182
        mutex(),
183
183
        refcnt(0),
184
184
        terminated(false),
 
185
        error(0),
185
186
        recv_buf(),
186
187
        current_view(),
187
188
        prof("gcs_gcomm")
237
238
        log_info << "gcomm: connected";
238
239
    }
239
240
 
240
 
    void close()
 
241
    void close(bool join = true)
241
242
    {
242
243
        if (tp == 0)
243
244
        {
246
247
        }
247
248
        log_info << "gcomm: terminating thread";
248
249
        terminate();
249
 
        log_info << "gcomm: joining thread";
250
 
        pthread_join(thd, 0);
 
250
        if (join == true)
 
251
        {
 
252
            log_info << "gcomm: joining thread";
 
253
            pthread_join(thd, 0);
 
254
        }
251
255
        log_info << "gcomm: closing backend";
252
256
        tp->close();
253
 
 
254
257
        gcomm::disconnect(tp, this);
255
258
        delete tp;
256
259
        tp = 0;
295
298
    bool        get_use_prod_cons() const { return use_prod_cons; }
296
299
    Protonet&   get_pnet()                { return *net; }
297
300
    gu::Config& get_conf()                { return conf; }
298
 
 
 
301
    int         get_error() const         { return error; }
299
302
    class Ref
300
303
    {
301
304
    public:
353
356
    Mutex mutex;
354
357
    size_t refcnt;
355
358
    bool terminated;
 
359
    bool error;
356
360
    RecvBuf recv_buf;
357
361
    View current_view;
358
362
    Profile prof;
361
365
 
362
366
void GCommConn::handle_up(const void* id, const Datagram& dg, const ProtoUpMeta& um)
363
367
{
364
 
    if (um.has_view() == true)
 
368
    if (um.get_errno() != 0)
 
369
    {
 
370
        error = um.get_errno();
 
371
        recv_buf.push_back(RecvBufData(numeric_limits<size_t>::max(), dg, um));
 
372
    }
 
373
    else if (um.has_view() == true)
365
374
    {
366
375
        current_view = um.get_view();
367
 
        recv_buf.push_back(RecvBufData(numeric_limits<size_t>::max(),
368
 
                                       dg, um));
 
376
        recv_buf.push_back(RecvBufData(numeric_limits<size_t>::max(), dg, um));
369
377
        if (current_view.is_empty())
370
378
        {
371
379
            log_debug << "handle_up: self leave";
447
455
            }
448
456
        }
449
457
 
450
 
        net->event_loop(Sec);
 
458
        try
 
459
        {
 
460
            net->event_loop(Sec);
 
461
        }
 
462
        catch (gu::Exception& e)
 
463
        {
 
464
            log_error << "exception from gcomm, backend must be restarted:"
 
465
                      << e.what();
 
466
            gcomm::Critical<Protonet> crit(get_pnet());
 
467
            handle_up(0, Datagram(),
 
468
                      ProtoUpMeta(UUID::nil(),
 
469
                                  ViewId(V_NON_PRIM),
 
470
                                  0,
 
471
                                  0xff,
 
472
                                  O_DROP,
 
473
                                  -1,
 
474
                                  e.get_errno()));
 
475
            close(false);
 
476
            pthread_detach(thd);
 
477
            break;
 
478
        }
 
479
        catch (...)
 
480
        {
 
481
            log_error
 
482
                << "unknow exception from gcomm, backend must be restarted";
 
483
            gcomm::Critical<Protonet> crit(get_pnet());
 
484
            handle_up(0, Datagram(),
 
485
                      ProtoUpMeta(UUID::nil(),
 
486
                                  ViewId(V_NON_PRIM),
 
487
                                  0,
 
488
                                  0xff,
 
489
                                  O_DROP,
 
490
                                  -1,
 
491
                                  gu::Exception::E_UNSPEC));
 
492
            close(false);
 
493
            pthread_detach(thd);
 
494
            break;
 
495
        }
451
496
    }
452
497
}
453
498
 
496
541
                new Buffer(reinterpret_cast<const byte_t*>(buf),
497
542
                           reinterpret_cast<const byte_t*>(buf) + len)));
498
543
        gcomm::Critical<Protonet> crit(conn.get_pnet());
 
544
        if (gu_unlikely(conn.get_error() != 0))
 
545
        {
 
546
            return -ENOTCONN;
 
547
        }
499
548
        int err = conn.send_down(
500
549
            dg,
501
 
            ProtoDownMeta(msg_type, msg_type == GCS_MSG_CAUSAL ? O_LOCAL_CAUSAL : O_SAFE));
 
550
            ProtoDownMeta(msg_type, msg_type == GCS_MSG_CAUSAL ?
 
551
                          O_LOCAL_CAUSAL : O_SAFE));
502
552
        return (err == 0 ? len : -err);
503
553
    }
504
554
}
573
623
                msg->type = GCS_MSG_ERROR;
574
624
            }
575
625
        }
 
626
        else if (um.get_errno() != 0)
 
627
        {
 
628
            gcs_comp_msg_t* cm(gcs_comp_msg_leave());
 
629
            const ssize_t cm_size(gcs_comp_msg_size(cm));
 
630
            msg->size = cm_size;
 
631
            if (gu_likely(cm_size <= msg->buf_len))
 
632
            {
 
633
                memcpy(msg->buf, cm, cm_size);
 
634
                recv_buf.pop_front();
 
635
                msg->type = GCS_MSG_COMPONENT;
 
636
            }
 
637
            else
 
638
            {
 
639
                msg->type = GCS_MSG_ERROR;
 
640
            }
 
641
            gcs_comp_msg_delete(cm);
 
642
        }
576
643
        else
577
644
        {
578
645
            assert(um.has_view() == true);
665
732
    }
666
733
 
667
734
    GCommConn& conn(*ref.get());
668
 
 
 
735
    gcomm::Critical<Protonet> crit(conn.get_pnet());
669
736
    try
670
737
    {
671
738
        conn.close();