~ignacio-nin/+junk/percona-xtradb-cluster-galera

« back to all changes in this revision

Viewing changes to galera/src/replicator_str.cpp

  • Committer: Alex Yurchenko
  • Date: 2011-11-13 20:37:10 UTC
  • mfrom: (87.1.2 1.x)
  • Revision ID: ayurchen@void-20111113203710-l2hjr20de14hk79j
References lp:884566 - synced with SVN branch 2.x r2509

Show diffs side-by-side

added added

removed removed

Lines of Context:
274
274
    char* const tmp(strndup(reinterpret_cast<const char*>(req), req_size));
275
275
    std::string const req_str(tmp);
276
276
    free (tmp);
277
 
    bool const trivial_sst(req_str == TRIVIAL_SST);
278
 
 
279
 
    if (!trivial_sst)
 
277
 
 
278
    bool const skip_state_transfer (req_str == TRIVIAL_SST);
 
279
    wsrep_seqno_t rcode (0);
 
280
 
 
281
    if (!skip_state_transfer)
280
282
    {
281
283
        StateRequest* const streq (read_state_request (req, req_size));
 
284
 
282
285
        if (streq->ist_len())
283
286
        {
284
287
            IST_request istr;
286
289
                                streq->ist_len());
287
290
            std::istringstream is(ist_str);
288
291
            is >> istr;
 
292
 
289
293
            if (istr.uuid() == state_uuid_)
290
294
            {
291
295
                log_info << "IST request: " << istr;
298
302
                {
299
303
                    log_info << "IST first seqno " << istr.last_applied() + 1
300
304
                             << " not found from cache, falling back to SST";
301
 
                    goto sst;
302
 
                }
303
 
 
304
 
                trivial_sst_ = true;
305
 
                sst_donate_cb_(app_ctx_, recv_ctx,
306
 
                               streq->sst_req(), streq->sst_len(),
307
 
                               &istr.uuid(), istr.last_applied(), 0, 0, true);
308
 
                trivial_sst_ = false;
309
 
                try
310
 
                {
311
 
                    ist_senders_.run(config_,
312
 
                                     istr.peer(),
313
 
                                     istr.last_applied() + 1,
314
 
                                     istr.group_seqno());
315
 
                }
316
 
                catch (gu::Exception& e)
317
 
                {
318
 
                    log_error << "failed to serve ist " << e.what();
319
 
                    gcs_.join(-e.get_errno());
320
 
                }
321
 
                local_monitor_.leave(lo);
322
 
 
323
 
                delete streq;
324
 
                return;
 
305
                    // @todo: close IST channel explicitly
 
306
                    goto full_sst;
 
307
                }
 
308
 
 
309
                if (streq->sst_len()) // if joiner is waiting for SST, notify it
 
310
                {
 
311
                    ist_sst_ = true; // gcs_.join() shall be called by IST
 
312
                    rcode = sst_donate_cb_(app_ctx_, recv_ctx,
 
313
                                           streq->sst_req(),
 
314
                                           streq->sst_len(),
 
315
                                           &istr.uuid(),
 
316
                                           istr.last_applied(), 0, 0, true);
 
317
// this must be reset only in sst_sent() call            ist_sst_ = false;
 
318
                }
 
319
 
 
320
                if (rcode >= 0)
 
321
                {
 
322
                    try
 
323
                    {
 
324
                        ist_senders_.run(config_,
 
325
                                         istr.peer(),
 
326
                                         istr.last_applied() + 1,
 
327
                                         istr.group_seqno(),
 
328
                                         protocol_version_);
 
329
                    }
 
330
                    catch (gu::Exception& e)
 
331
                    {
 
332
                        log_error << "IST failed: " << e.what();
 
333
                        rcode = -e.get_errno();
 
334
                    }
 
335
                }
 
336
                else
 
337
                {
 
338
                    log_error << "Failed to bypass SST: " << -rcode
 
339
                              << " (" << strerror (-rcode) << ')';
 
340
                }
 
341
 
 
342
                goto out;
325
343
            }
326
344
        }
327
345
 
328
 
    sst:
 
346
    full_sst:
329
347
        if (streq->sst_len())
330
348
        {
 
349
            assert(0 == rcode);
331
350
            sst_donate_cb_(app_ctx_, recv_ctx,
332
351
                           streq->sst_req(), streq->sst_len(),
333
352
                           &state_uuid_, donor_seq, 0, 0, false);
334
353
        }
 
354
        else
 
355
        {
 
356
            log_warn << "SST request is null, SST canceled.";
 
357
            rcode = -ECANCELED;
 
358
        }
 
359
 
 
360
    out:
335
361
        delete streq;
336
362
    }
337
363
 
338
364
    local_monitor_.leave(lo);
339
365
 
340
 
    if (trivial_sst)
 
366
    if (skip_state_transfer || rcode < 0)
341
367
    {
342
 
        gcs_.join(donor_seq);
 
368
        gcs_.join(rcode < 0 ? rcode : donor_seq);
343
369
    }
344
370
}
345
371
 
352
378
    std::ostringstream os;
353
379
 
354
380
    std::string recv_addr = ist_receiver_.prepare(
355
 
        apply_monitor_.last_left() + 1, group_seqno);
 
381
        apply_monitor_.last_left() + 1, group_seqno, protocol_version_);
356
382
 
357
383
    os << IST_request(recv_addr,
358
384
                      state_uuid_, apply_monitor_.last_left(), group_seqno);
523
549
                                       ssize_t       const sst_req_len)
524
550
    throw()
525
551
{
526
 
    assert(sst_req != 0);
527
 
    assert(sst_req_len > 0);
528
 
 
529
 
    StateRequest* const req(prepare_state_request(sst_req, sst_req_len,
530
 
                                                  group_seqno));
 
552
    assert(sst_req_len >= 0);
531
553
 
532
554
    log_debug << "State transfer required: "
533
555
              << "\n\tGroup state: "
535
557
              << "\n\tLocal state: " << state_uuid_
536
558
              << ":" << apply_monitor_.last_left();
537
559
 
 
560
    if (0 == sst_req_len && state_uuid_ != group_uuid)
 
561
    {
 
562
        log_fatal << "Local state UUID " << state_uuid_
 
563
                  << "is different from group state UUID " << group_uuid
 
564
                  << ", and SST request is null: restart required.";
 
565
        abort();
 
566
    }
 
567
 
 
568
    StateRequest* const req(prepare_state_request(sst_req, sst_req_len,
 
569
                                                  group_seqno));
538
570
    gu::Lock lock(sst_mutex_);
539
571
 
540
572
    send_state_request (group_uuid, group_seqno, req);
541
573
 
542
574
    state_.shift_to(S_JOINING);
543
575
    sst_state_ = SST_WAIT;
544
 
 
545
576
    /* while waiting for state transfer to complete is a good point
546
577
     * to reset gcache, since it may ivolve some IO too */
547
578
    gcache_.seqno_reset();
548
579
 
549
 
    lock.wait(sst_cond_);
550
 
 
551
 
    if (sst_uuid_ != group_uuid)
 
580
    if (sst_req_len != 0)
552
581
    {
553
 
        log_fatal << "Application received wrong state: "
554
 
                  << "\n\tReceived: " << sst_uuid_
555
 
                  << "\n\tRequired: " << group_uuid;
556
 
        sst_state_ = SST_FAILED;
557
 
        log_fatal << "Application state transfer failed. This is "
558
 
                  << "unrecoverable condition, restart required.";
559
 
        abort();
 
582
        lock.wait(sst_cond_);
 
583
 
 
584
        if (sst_uuid_ != group_uuid)
 
585
        {
 
586
            log_fatal << "Application received wrong state: "
 
587
                      << "\n\tReceived: " << sst_uuid_
 
588
                      << "\n\tRequired: " << group_uuid;
 
589
            sst_state_ = SST_FAILED;
 
590
            log_fatal << "Application state transfer failed. This is "
 
591
                      << "unrecoverable condition, restart required.";
 
592
            abort();
 
593
        }
 
594
        else
 
595
        {
 
596
            update_state_uuid (sst_uuid_);
 
597
            apply_monitor_.set_initial_position(-1);
 
598
            apply_monitor_.set_initial_position(sst_seqno_);
 
599
 
 
600
            if (co_mode_ != CommitOrder::BYPASS)
 
601
            {
 
602
                commit_monitor_.set_initial_position(-1);
 
603
                commit_monitor_.set_initial_position(sst_seqno_);
 
604
            }
 
605
 
 
606
            log_info << "SST finished: " << state_uuid_ << ":" << sst_seqno_;
 
607
        }
560
608
    }
561
609
    else
562
610
    {
563
 
        update_state_uuid (sst_uuid_);
564
 
        apply_monitor_.set_initial_position(-1);
565
 
        apply_monitor_.set_initial_position(sst_seqno_);
566
 
 
567
 
        if (co_mode_ != CommitOrder::BYPASS)
568
 
        {
569
 
            commit_monitor_.set_initial_position(-1);
570
 
            commit_monitor_.set_initial_position(sst_seqno_);
571
 
        }
572
 
 
573
 
        log_info << "SST finished: " << state_uuid_ << ":" << sst_seqno_;
574
 
 
575
 
        if (sst_seqno_ < group_seqno)
576
 
        {
577
 
            log_info << "Receiving IST: " << (group_seqno - sst_seqno_)
578
 
                     << " writesets.";
579
 
            // go to receive IST
580
 
            recv_IST(recv_ctx);
581
 
        }
582
 
    }
 
611
        assert (state_uuid_ == group_uuid);
 
612
    }
 
613
 
 
614
    if (apply_monitor_.last_left() < group_seqno)
 
615
    {
 
616
        log_info << "Receiving IST: "
 
617
                 << (group_seqno - apply_monitor_.last_left()) << " writesets.";
 
618
        recv_IST(recv_ctx);
 
619
        sst_seqno_ = group_seqno;
 
620
    }
 
621
 
583
622
    ist_receiver_.finished();
 
623
 
584
624
    delete req;
585
625
}
586
626