274
274
char* const tmp(strndup(reinterpret_cast<const char*>(req), req_size));
275
275
std::string const req_str(tmp);
277
bool const trivial_sst(req_str == TRIVIAL_SST);
278
bool const skip_state_transfer (req_str == TRIVIAL_SST);
279
wsrep_seqno_t rcode (0);
281
if (!skip_state_transfer)
281
283
StateRequest* const streq (read_state_request (req, req_size));
282
285
if (streq->ist_len())
284
287
IST_request istr;
299
303
log_info << "IST first seqno " << istr.last_applied() + 1
300
304
<< " not found from cache, falling back to SST";
305
sst_donate_cb_(app_ctx_, recv_ctx,
306
streq->sst_req(), streq->sst_len(),
307
&istr.uuid(), istr.last_applied(), 0, 0, true);
308
trivial_sst_ = false;
311
ist_senders_.run(config_,
313
istr.last_applied() + 1,
316
catch (gu::Exception& e)
318
log_error << "failed to serve ist " << e.what();
319
gcs_.join(-e.get_errno());
321
local_monitor_.leave(lo);
305
// @todo: close IST channel explicitly
309
if (streq->sst_len()) // if joiner is waiting for SST, notify it
311
ist_sst_ = true; // gcs_.join() shall be called by IST
312
rcode = sst_donate_cb_(app_ctx_, recv_ctx,
316
istr.last_applied(), 0, 0, true);
317
// this must be reset only in sst_sent() call ist_sst_ = false;
324
ist_senders_.run(config_,
326
istr.last_applied() + 1,
330
catch (gu::Exception& e)
332
log_error << "IST failed: " << e.what();
333
rcode = -e.get_errno();
338
log_error << "Failed to bypass SST: " << -rcode
339
<< " (" << strerror (-rcode) << ')';
329
347
if (streq->sst_len())
331
350
sst_donate_cb_(app_ctx_, recv_ctx,
332
351
streq->sst_req(), streq->sst_len(),
333
352
&state_uuid_, donor_seq, 0, 0, false);
356
log_warn << "SST request is null, SST canceled.";
338
364
local_monitor_.leave(lo);
366
if (skip_state_transfer || rcode < 0)
342
gcs_.join(donor_seq);
368
gcs_.join(rcode < 0 ? rcode : donor_seq);
352
378
std::ostringstream os;
354
380
std::string recv_addr = ist_receiver_.prepare(
355
apply_monitor_.last_left() + 1, group_seqno);
381
apply_monitor_.last_left() + 1, group_seqno, protocol_version_);
357
383
os << IST_request(recv_addr,
358
384
state_uuid_, apply_monitor_.last_left(), group_seqno);
535
557
<< "\n\tLocal state: " << state_uuid_
536
558
<< ":" << apply_monitor_.last_left();
560
if (0 == sst_req_len && state_uuid_ != group_uuid)
562
log_fatal << "Local state UUID " << state_uuid_
563
<< "is different from group state UUID " << group_uuid
564
<< ", and SST request is null: restart required.";
568
StateRequest* const req(prepare_state_request(sst_req, sst_req_len,
538
570
gu::Lock lock(sst_mutex_);
540
572
send_state_request (group_uuid, group_seqno, req);
542
574
state_.shift_to(S_JOINING);
543
575
sst_state_ = SST_WAIT;
545
576
/* while waiting for state transfer to complete is a good point
546
577
* to reset gcache, since it may ivolve some IO too */
547
578
gcache_.seqno_reset();
549
lock.wait(sst_cond_);
551
if (sst_uuid_ != group_uuid)
580
if (sst_req_len != 0)
553
log_fatal << "Application received wrong state: "
554
<< "\n\tReceived: " << sst_uuid_
555
<< "\n\tRequired: " << group_uuid;
556
sst_state_ = SST_FAILED;
557
log_fatal << "Application state transfer failed. This is "
558
<< "unrecoverable condition, restart required.";
582
lock.wait(sst_cond_);
584
if (sst_uuid_ != group_uuid)
586
log_fatal << "Application received wrong state: "
587
<< "\n\tReceived: " << sst_uuid_
588
<< "\n\tRequired: " << group_uuid;
589
sst_state_ = SST_FAILED;
590
log_fatal << "Application state transfer failed. This is "
591
<< "unrecoverable condition, restart required.";
596
update_state_uuid (sst_uuid_);
597
apply_monitor_.set_initial_position(-1);
598
apply_monitor_.set_initial_position(sst_seqno_);
600
if (co_mode_ != CommitOrder::BYPASS)
602
commit_monitor_.set_initial_position(-1);
603
commit_monitor_.set_initial_position(sst_seqno_);
606
log_info << "SST finished: " << state_uuid_ << ":" << sst_seqno_;
563
update_state_uuid (sst_uuid_);
564
apply_monitor_.set_initial_position(-1);
565
apply_monitor_.set_initial_position(sst_seqno_);
567
if (co_mode_ != CommitOrder::BYPASS)
569
commit_monitor_.set_initial_position(-1);
570
commit_monitor_.set_initial_position(sst_seqno_);
573
log_info << "SST finished: " << state_uuid_ << ":" << sst_seqno_;
575
if (sst_seqno_ < group_seqno)
577
log_info << "Receiving IST: " << (group_seqno - sst_seqno_)
611
assert (state_uuid_ == group_uuid);
614
if (apply_monitor_.last_left() < group_seqno)
616
log_info << "Receiving IST: "
617
<< (group_seqno - apply_monitor_.last_left()) << " writesets.";
619
sst_seqno_ = group_seqno;
583
622
ist_receiver_.finished();