~ignacio-nin/+junk/percona-xtradb-cluster-galera

« back to all changes in this revision

Viewing changes to galera/src/ist.cpp

  • Committer: Alex Yurchenko
  • Date: 2011-11-13 20:37:10 UTC
  • mfrom: (87.1.2 1.x)
  • Revision ID: ayurchen@void-20111113203710-l2hjr20de14hk79j
References lp:884566 - synced with SVN branch 2.x r2509

Show diffs side-by-side

added added

removed removed

Lines of Context:
93
93
                {
94
94
                    T_NONE = 0,
95
95
                    T_HANDSHAKE = 1,
96
 
                    T_HANDSHAKE_RESPONSE,
97
 
                    T_CTRL,
98
 
                    T_TRX
 
96
                    T_HANDSHAKE_RESPONSE = 2,
 
97
                    T_CTRL = 3,
 
98
                    T_TRX = 4
99
99
                } Type;
100
100
                Message(int version = -1, Type type = T_NONE,
101
101
                        int16_t ctrl = 0, uint64_t len = 0)
190
190
                { }
191
191
            };
192
192
 
193
 
 
194
 
 
195
 
            Proto() : version_(0) { }
 
193
            Proto(int version) : version_(version) { }
196
194
 
197
195
            template <class ST>
198
196
            void send_handshake(ST& socket)
210
208
            template <class ST>
211
209
            void recv_handshake(ST& socket)
212
210
            {
213
 
                Handshake hs;
214
 
                gu::Buffer buf(serial_size(hs));
 
211
                Message msg;
 
212
                gu::Buffer buf(serial_size(msg));
215
213
                size_t n(asio::read(socket, asio::buffer(&buf[0], buf.size())));
216
214
                if (n != buf.size())
217
215
                {
218
216
                    gu_throw_error(EPROTO) << "error receiving handshake";
219
217
                }
220
 
                unserialize(&buf[0], buf.size(), 0, hs);
221
 
                log_debug << "hs: " << hs.version() << " " << hs.type() << " " << hs.len();
 
218
                (void)unserialize(&buf[0], buf.size(), 0, msg);
 
219
                log_debug << "handshake msg: " << msg.version() << " "
 
220
                          << msg.type() << " " << msg.len();
 
221
                switch (msg.type())
 
222
                {
 
223
                case Message::T_HANDSHAKE:
 
224
                    break;
 
225
                case Message::T_CTRL:
 
226
                    switch (msg.ctrl())
 
227
                    {
 
228
                    case Ctrl::C_EOF:
 
229
                        gu_throw_error(EINTR);
 
230
                        throw;
 
231
                    default:
 
232
                        gu_throw_error(EPROTO) << "unexpected ctrl code: " <<
 
233
                            msg.ctrl();
 
234
                    }
 
235
                    break;
 
236
                default:
 
237
                    gu_throw_error(EPROTO)
 
238
                        << "unexpected message type: " << msg.type();
 
239
                    throw;
 
240
                }
 
241
                if (msg.version() != version_)
 
242
                {
 
243
                    gu_throw_error(EPROTO) << "mismatching protocol version: "
 
244
                                           << msg.version()
 
245
                                           << " required: "
 
246
                                           << version_;
 
247
                }
222
248
                // TODO: Figure out protocol versions to use
223
249
            }
224
250
 
231
257
                size_t n(asio::write(socket, asio::buffer(&buf[0], buf.size())));
232
258
                if (n != offset)
233
259
                {
234
 
                    gu_throw_error(EPROTO) << "error sending handshake response";
 
260
                    gu_throw_error(EPROTO)
 
261
                        << "error sending handshake response";
235
262
                }
236
263
            }
237
264
 
245
272
                {
246
273
                    gu_throw_error(EPROTO) << "error receiving handshake";
247
274
                }
248
 
                unserialize(&buf[0], buf.size(), 0, msg);
 
275
                (void)unserialize(&buf[0], buf.size(), 0, msg);
249
276
 
250
 
                log_debug << "msg: " << msg.version() << " " << msg.type()
 
277
                log_debug << "handshake response msg: " << msg.version()
 
278
                          << " " << msg.type()
251
279
                          << " " << msg.len();
252
280
                switch (msg.type())
253
281
                {
254
282
                case Message::T_HANDSHAKE_RESPONSE:
255
283
                    break;
256
284
                case Message::T_CTRL:
257
 
                    gu_throw_error(EINTR) << "interrupted by ctrl";
258
 
                    throw;
 
285
                    switch (msg.ctrl())
 
286
                    {
 
287
                    case Ctrl::C_EOF:
 
288
                        gu_throw_error(EINTR) << "interrupted by ctrl";
 
289
                        throw;
 
290
                    default:
 
291
                        gu_throw_error(EPROTO) << "unexpected ctrl code: "
 
292
                                               << msg.ctrl();
 
293
                        throw;
 
294
                    }
259
295
                default:
260
296
                    gu_throw_error(EINVAL) << "unexpected message type: "
261
297
                                           << msg.type();
 
298
                    throw;
262
299
                }
263
 
 
264
300
            }
265
301
 
266
302
            template <class ST>
267
 
            void send_ctrl(ST& socket, int32_t code)
 
303
            void send_ctrl(ST& socket, int16_t code)
268
304
            {
269
305
                Ctrl ctrl(version_, code);
270
306
                gu::Buffer buf(serial_size(ctrl));
279
315
            template <class ST>
280
316
            int16_t recv_ctrl(ST& socket)
281
317
            {
282
 
                Handshake ctrl;
283
 
                gu::Buffer buf(serial_size(ctrl));
 
318
                Message msg;
 
319
                gu::Buffer buf(serial_size(msg));
284
320
                size_t n(asio::read(socket, asio::buffer(&buf[0], buf.size())));
285
321
                if (n != buf.size())
286
322
                {
287
323
                    gu_throw_error(EPROTO) << "error receiving handshake";
288
324
                }
289
 
                unserialize(&buf[0], buf.size(), 0, ctrl);
290
 
 
291
 
                log_debug << "ctrl: " << ctrl.version() << " " << ctrl.type()
292
 
                          << " " << ctrl.len();
293
 
                return ctrl.ctrl();
 
325
                (void)unserialize(&buf[0], buf.size(), 0, msg);
 
326
                log_debug << "msg: " << msg.version() << " " << msg.type()
 
327
                          << " " << msg.len();
 
328
                switch (msg.type())
 
329
                {
 
330
                case Message::T_CTRL:
 
331
                    break;
 
332
                default:
 
333
                    gu_throw_error(EPROTO) << "unexpected message type: "
 
334
                                           << msg.type();
 
335
                    throw;
 
336
                }
 
337
                return msg.ctrl();
294
338
            }
295
339
 
296
340
 
335
379
                {
336
380
                    gu_throw_error(EPROTO) << "error receiving trx header";
337
381
                }
338
 
                unserialize(&buf[0], buf.size(), 0, msg);
 
382
                (void)unserialize(&buf[0], buf.size(), 0, msg);
339
383
                log_debug << "received header: " << n << " bytes, type "
340
384
                          << msg.type() << " len " << msg.len();
341
385
                switch (msg.type())
394
438
                        }
395
439
                    }
396
440
                default:
397
 
                    gu_throw_error(EPROTO) << "unexpected message type " << msg.type();
 
441
                    gu_throw_error(EPROTO) << "unexpected message type: "
 
442
                                           << msg.type();
398
443
                    throw;
399
444
                }
400
445
                gu_throw_fatal;
413
458
                        const std::string& peer,
414
459
                        wsrep_seqno_t first,
415
460
                        wsrep_seqno_t last,
416
 
                        AsyncSenderMap& asmap)
 
461
                        AsyncSenderMap& asmap,
 
462
                        int version)
417
463
                :
418
 
                Sender(conf, asmap.gcache(), peer),
 
464
                Sender(conf, asmap.gcache(), peer, version),
419
465
                conf_(conf),
420
466
                peer_(peer),
421
467
                first_(first),
459
505
    error_code_(0),
460
506
    current_seqno_(-1),
461
507
    last_seqno_(-1),
462
 
    use_ssl_(false)
 
508
    use_ssl_(false),
 
509
    version_(-1)
463
510
{
464
511
    std::string recv_addr;
465
512
 
565
612
 
566
613
std::string
567
614
galera::ist::Receiver::prepare(wsrep_seqno_t first_seqno,
568
 
                               wsrep_seqno_t last_seqno)
 
615
                               wsrep_seqno_t last_seqno,
 
616
                               int           version)
569
617
{
 
618
    version_ = version;
570
619
    recv_addr_ = IST_determine_recv_addr(conf_);
571
620
    gu::URI     const uri(recv_addr_);
572
621
    try
632
681
    int ec(0);
633
682
    try
634
683
    {
635
 
        Proto p;
 
684
        Proto p(version_);
636
685
        if (use_ssl_ == true)
637
686
        {
638
687
            p.send_handshake(ssl_stream);
685
734
    }
686
735
    catch (asio::system_error& e)
687
736
    {
688
 
        log_fatal << "got error while reading ist stream: " << e.code();
 
737
        log_error << "got error while reading ist stream: " << e.code();
689
738
        ec = e.code().value();
690
739
    }
691
740
    catch (gu::Exception& e)
692
741
    {
693
742
        ec = e.get_errno();
 
743
        if (ec != EINTR)
 
744
        {
 
745
            log_error << "got exception while reading ist stream: " << e.what();
 
746
        }
694
747
    }
695
748
 
696
749
err:
786
839
                ssl_stream(io_service_, ssl_ctx_);
787
840
            ssl_stream.lowest_layer().connect(*i);
788
841
            ssl_stream.handshake(asio::ssl::stream<asio::ip::tcp::socket>::client);
789
 
            Proto p;
 
842
            Proto p(version_);
790
843
            p.recv_handshake(ssl_stream);
791
844
            p.send_ctrl(ssl_stream, Proto::Ctrl::C_EOF);
792
845
            p.recv_ctrl(ssl_stream);
795
848
        {
796
849
            asio::ip::tcp::socket socket(io_service_);
797
850
            socket.connect(*i);
798
 
            Proto p;
 
851
            Proto p(version_);
799
852
            p.recv_handshake(socket);
800
853
            p.send_ctrl(socket, Proto::Ctrl::C_EOF);
801
854
            p.recv_ctrl(socket);
810
863
 
811
864
galera::ist::Sender::Sender(const gu::Config& conf,
812
865
                            gcache::GCache&    gcache,
813
 
                            const std::string& peer)
 
866
                            const std::string& peer,
 
867
                            int version)
814
868
    :
815
869
    io_service_(),
816
870
    socket_(io_service_),
817
871
    ssl_ctx_(io_service_, asio::ssl::context::sslv23),
818
872
    ssl_stream_(io_service_, ssl_ctx_),
819
873
    use_ssl_(false),
820
 
    gcache_(gcache)
 
874
    gcache_(gcache),
 
875
    version_(version)
821
876
{
822
877
    gu::URI uri(peer);
823
878
    asio::ip::tcp::resolver resolver(io_service_);
859
914
{
860
915
    try
861
916
    {
862
 
        Proto p;
 
917
        Proto p(version_);
863
918
        int32_t ctrl;
864
919
        if (use_ssl_ == true)
865
920
        {
969
1024
    {
970
1025
        as->asmap().remove(as, join_seqno);
971
1026
        pthread_detach(as->thread());
 
1027
        delete as;
972
1028
    }
973
1029
    catch (gu::NotFound& nf)
974
1030
    {
975
 
        log_info << "async IST sender already removed";
 
1031
        log_debug << "async IST sender already removed";
976
1032
    }
977
 
    log_info << "async IST sender served: " << as->peer();
 
1033
    log_info << "async IST sender served";
978
1034
 
979
1035
    return 0;
980
1036
}
981
1037
 
982
1038
 
983
 
void galera::ist::AsyncSenderMap::run(const gu::Config& conf,
 
1039
void galera::ist::AsyncSenderMap::run(const gu::Config&  conf,
984
1040
                                      const std::string& peer,
985
1041
                                      wsrep_seqno_t      first,
986
 
                                      wsrep_seqno_t      last)
 
1042
                                      wsrep_seqno_t      last,
 
1043
                                      int                version)
987
1044
{
988
1045
    gu::Critical crit(monitor_);
989
 
    AsyncSender* as(new AsyncSender(conf, peer, first, last, *this));
 
1046
    AsyncSender* as(new AsyncSender(conf, peer, first, last, *this, version));
990
1047
    int err(pthread_create(&as->thread_, 0, &run_async_sender, as));
991
1048
    if (err != 0)
992
1049
    {
1025
1082
            log_warn << "pthread_join() failed: " << err;
1026
1083
        }
1027
1084
        monitor_.enter();
1028
 
 
1029
1085
        delete as;
1030
1086
    }
1031
1087