~ubuntu-branches/ubuntu/precise/corosync/precise-proposed

« back to all changes in this revision

Viewing changes to exec/totemsrp.c

  • Committer: Bazaar Package Importer
  • Author(s): Ante Karamatic
  • Date: 2009-08-21 09:29:56 UTC
  • mfrom: (1.1.2 upstream)
  • Revision ID: james.westby@ubuntu.com-20090821092956-w9qxxxx3zeoh8dem
Tags: 1.0.0-4ubuntu2
* debian/control:
  - 'Ubuntu Developers' instead of 'Ubuntu Core Developers'
    as maintainer
  - Bump debhelper dependecy to 7

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/*
2
2
 * Copyright (c) 2003-2006 MontaVista Software, Inc.
3
 
 * Copyright (c) 2006-2008 Red Hat, Inc.
 
3
 * Copyright (c) 2006-2009 Red Hat, Inc.
4
4
 *
5
5
 * All rights reserved.
6
6
 *
7
7
 * Author: Steven Dake (sdake@redhat.com)
8
8
 *
9
9
 * This software licensed under BSD license, the text of which follows:
10
 
 * 
 
10
 *
11
11
 * Redistribution and use in source and binary forms, with or without
12
12
 * modification, are permitted provided that the following conditions are met:
13
13
 *
35
35
 
36
36
/*
37
37
 * The first version of this code was based upon Yair Amir's PhD thesis:
38
 
 *      http://www.cs.jhu.edu/~yairamir/phd.ps) (ch4,5). 
 
38
 *      http://www.cs.jhu.edu/~yairamir/phd.ps) (ch4,5).
39
39
 *
40
40
 * The current version of totemsrp implements the Totem protocol specified in:
41
41
 *      http://citeseer.ist.psu.edu/amir95totem.html
47
47
 *   usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
48
48
 */
49
49
 
 
50
#include <config.h>
 
51
 
50
52
#include <assert.h>
 
53
#ifdef HAVE_ALLOCA_H
 
54
#include <alloca.h>
 
55
#endif
51
56
#include <sys/mman.h>
52
57
#include <sys/types.h>
53
58
#include <sys/stat.h>
63
68
#include <stdlib.h>
64
69
#include <stdio.h>
65
70
#include <errno.h>
66
 
#include <signal.h>
67
71
#include <sched.h>
68
72
#include <time.h>
69
73
#include <sys/time.h>
70
74
#include <sys/poll.h>
 
75
#include <limits.h>
71
76
 
72
77
#include <corosync/swab.h>
73
 
#include <corosync/queue.h>
 
78
#include <corosync/cs_queue.h>
74
79
#include <corosync/sq.h>
75
80
#include <corosync/list.h>
76
81
#include <corosync/hdb.h>
77
82
#include <corosync/totem/coropoll.h>
 
83
#define LOGSYS_UTILS_ONLY 1
 
84
#include <corosync/engine/logsys.h>
 
85
 
78
86
#include "totemsrp.h"
79
87
#include "totemrrp.h"
 
88
#include "totemnet.h"
80
89
#include "wthread.h"
81
90
 
82
91
#include "crypto.h"
85
94
#define QUEUE_RTR_ITEMS_SIZE_MAX                256 /* allow 256 retransmit items */
86
95
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX          500 /* allow 500 messages to be queued */
87
96
#define RECEIVED_MESSAGE_QUEUE_SIZE_MAX         500 /* allow 500 messages to be queued */
88
 
#define MAXIOVS                                 5       
 
97
#define MAXIOVS                                 5
89
98
#define RETRANSMIT_ENTRIES_MAX                  30
90
99
#define TOKEN_SIZE_MAX                          64000 /* bytes */
 
100
#define LEAVE_DUMMY_NODEID                      0
91
101
 
92
102
/*
93
103
 * Rollover handling:
108
118
 
109
119
/*
110
120
 * These can be used ot test different rollover points
111
 
 * #define SEQNO_START_MSG 0xfffffe00 
 
121
 * #define SEQNO_START_MSG 0xfffffe00
112
122
 * #define SEQNO_START_TOKEN 0xfffffe00
113
123
 */
114
124
 
142
152
        MESSAGE_NOT_ENCAPSULATED = 2
143
153
};
144
154
 
145
 
/* 
 
155
/*
146
156
 * New membership algorithm local variables
147
157
 */
148
158
struct srp_addr {
158
168
 
159
169
struct token_callback_instance {
160
170
        struct list_head list;
161
 
        int (*callback_fn) (enum totem_callback_token_type type, void *);
 
171
        int (*callback_fn) (enum totem_callback_token_type type, const void *);
162
172
        enum totem_callback_token_type callback_type;
163
173
        int delete;
164
174
        void *data;
211
221
        unsigned int token_seq;
212
222
        unsigned int aru;
213
223
        unsigned int aru_addr;
214
 
        struct memb_ring_id ring_id; 
 
224
        struct memb_ring_id ring_id;
215
225
        unsigned int backlog;
216
226
        unsigned int fcc;
217
227
        int retrans_flg;
234
244
 */
235
245
} __attribute__((packed));
236
246
 
237
 
 
 
247
 
238
248
struct memb_merge_detect {
239
249
        struct message_header header;
240
250
        struct srp_addr system_from;
274
284
 
275
285
struct message_item {
276
286
        struct mcast *mcast;
277
 
        struct iovec iovec[MAXIOVS];
278
 
        int iov_len;
 
287
        unsigned int msg_len;
279
288
};
280
289
 
281
290
struct sort_queue_item {
282
 
        struct iovec iovec[MAXIOVS];
283
 
        int iov_len;
 
291
        struct mcast *mcast;
 
292
        unsigned int msg_len;
284
293
};
285
294
 
286
295
struct orf_token_mcast_thread_state {
366
375
        int my_retrans_flg_count;
367
376
 
368
377
        unsigned int my_high_ring_delivered;
369
 
        
 
378
 
370
379
        int heartbeat_timeout;
371
380
 
372
381
        /*
373
382
         * Queues used to order, deliver, and recover messages
374
383
         */
375
 
        struct queue new_message_queue;
 
384
        struct cs_queue new_message_queue;
376
385
 
377
 
        struct queue retrans_message_queue;
 
386
        struct cs_queue retrans_message_queue;
378
387
 
379
388
        struct sq regular_sort_queue;
380
389
 
400
409
        /*
401
410
         * Timers
402
411
         */
 
412
        poll_timer_handle timer_pause_timeout;
 
413
 
403
414
        poll_timer_handle timer_orf_token_timeout;
404
415
 
405
416
        poll_timer_handle timer_orf_token_retransmit_timeout;
429
440
 
430
441
        int totemsrp_log_level_debug;
431
442
 
432
 
        void (*totemsrp_log_printf) (char *file, int line, int level, char *format, ...) __attribute__((format(printf, 4, 5)));
 
443
        int totemsrp_subsys_id;
 
444
 
 
445
        void (*totemsrp_log_printf) (
 
446
                unsigned int rec_ident,
 
447
                const char *function,
 
448
                const char *file,
 
449
                int line,
 
450
                const char *format, ...)__attribute__((format(printf, 5, 6)));;
433
451
 
434
452
        enum memb_state memb_state;
435
453
 
436
454
//TODO  struct srp_addr next_memb;
437
455
 
438
 
        char iov_buffer[FRAME_SIZE_MAX];
439
 
 
440
 
        struct iovec totemsrp_iov_recv;
441
 
 
442
 
        poll_handle totemsrp_poll_handle;
443
 
 
444
 
        /*
445
 
         * Function called when new message received
446
 
         */
447
 
        int (*totemsrp_recv) (char *group, struct iovec *iovec, int iov_len);
 
456
        hdb_handle_t totemsrp_poll_handle;
448
457
 
449
458
        struct totem_ip_address mcast_address;
450
459
 
451
460
        void (*totemsrp_deliver_fn) (
452
461
                unsigned int nodeid,
453
 
                struct iovec *iovec,
454
 
                int iov_len,
 
462
                const void *msg,
 
463
                unsigned int msg_len,
455
464
                int endian_conversion_required);
456
465
 
457
466
        void (*totemsrp_confchg_fn) (
458
467
                enum totem_configuration_type configuration_type,
459
 
                unsigned int *member_list, int member_list_entries,
460
 
                unsigned int *left_list, int left_list_entries,
461
 
                unsigned int *joined_list, int joined_list_entries,
462
 
                struct memb_ring_id *ring_id);
 
468
                const unsigned int *member_list, size_t member_list_entries,
 
469
                const unsigned int *left_list, size_t left_list_entries,
 
470
                const unsigned int *joined_list, size_t joined_list_entries,
 
471
                const struct memb_ring_id *ring_id);
463
472
 
464
473
        int global_seqno;
465
474
 
483
492
 
484
493
        struct timeval tv_old;
485
494
 
486
 
        totemrrp_handle totemrrp_handle;
 
495
        hdb_handle_t totemrrp_handle;
487
496
 
488
497
        struct totem_config *totem_config;
489
498
 
494
503
        unsigned int my_pbl;
495
504
 
496
505
        unsigned int my_cbl;
 
506
 
 
507
        struct timeval pause_timestamp;
497
508
};
498
509
 
499
510
struct message_handlers {
500
511
        int count;
501
512
        int (*handler_functions[6]) (
502
513
                struct totemsrp_instance *instance,
503
 
                void *msg,
504
 
                int msg_len,
 
514
                const void *msg,
 
515
                size_t msg_len,
505
516
                int endian_conversion_needed);
506
517
};
507
518
 
510
521
 */
511
522
static int message_handler_orf_token (
512
523
        struct totemsrp_instance *instance,
513
 
        void *msg,
514
 
        int msg_len,
 
524
        const void *msg,
 
525
        size_t msg_len,
515
526
        int endian_conversion_needed);
516
527
 
517
528
static int message_handler_mcast (
518
529
        struct totemsrp_instance *instance,
519
 
        void *msg,
520
 
        int msg_len,
 
530
        const void *msg,
 
531
        size_t msg_len,
521
532
        int endian_conversion_needed);
522
533
 
523
534
static int message_handler_memb_merge_detect (
524
535
        struct totemsrp_instance *instance,
525
 
        void *msg,
526
 
        int msg_len,
 
536
        const void *msg,
 
537
        size_t msg_len,
527
538
        int endian_conversion_needed);
528
539
 
529
540
static int message_handler_memb_join (
530
541
        struct totemsrp_instance *instance,
531
 
        void *msg,
532
 
        int msg_len,
 
542
        const void *msg,
 
543
        size_t msg_len,
533
544
        int endian_conversion_needed);
534
545
 
535
546
static int message_handler_memb_commit_token (
536
547
        struct totemsrp_instance *instance,
537
 
        void *msg,
538
 
        int msg_len,
 
548
        const void *msg,
 
549
        size_t msg_len,
539
550
        int endian_conversion_needed);
540
551
 
541
552
static int message_handler_token_hold_cancel (
542
553
        struct totemsrp_instance *instance,
543
 
        void *msg,
544
 
        int msg_len,
 
554
        const void *msg,
 
555
        size_t msg_len,
545
556
        int endian_conversion_needed);
546
557
 
 
558
static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
 
559
 
 
560
static unsigned int main_msgs_missing (void);
 
561
 
 
562
static void main_token_seqid_get (
 
563
        const void *msg,
 
564
        unsigned int *seqid,
 
565
        unsigned int *token_is);
 
566
 
 
567
static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src);
 
568
 
 
569
static void srp_addr_to_nodeid (
 
570
        unsigned int *nodeid_out,
 
571
        struct srp_addr *srp_addr_in,
 
572
        unsigned int entries);
 
573
 
 
574
static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
 
575
 
 
576
static void memb_leave_message_send (struct totemsrp_instance *instance);
 
577
 
547
578
static void memb_ring_id_create_or_load (struct totemsrp_instance *, struct memb_ring_id *);
548
579
 
549
580
static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
554
585
static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
555
586
 
556
587
static void memb_ring_id_set_and_store (struct totemsrp_instance *instance,
557
 
        struct memb_ring_id *ring_id);
 
588
        const struct memb_ring_id *ring_id);
558
589
static void memb_state_commit_token_update (struct totemsrp_instance *instance, struct memb_commit_token *commit_token);
559
590
static void memb_state_commit_token_target_set (struct totemsrp_instance *instance, struct memb_commit_token *commit_token);
560
591
static int memb_state_commit_token_send (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
561
592
static void memb_state_commit_token_create (struct totemsrp_instance *instance, struct memb_commit_token *commit_token);
562
593
static int token_hold_cancel_send (struct totemsrp_instance *instance);
563
 
static void orf_token_endian_convert (struct orf_token *in, struct orf_token *out);
564
 
static void memb_commit_token_endian_convert (struct memb_commit_token *in, struct memb_commit_token *out);
565
 
static void memb_join_endian_convert (struct memb_join *in, struct memb_join *out);
566
 
static void mcast_endian_convert (struct mcast *in, struct mcast *out);
 
594
static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
 
595
static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
 
596
static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
 
597
static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
567
598
static void memb_merge_detect_endian_convert (
568
 
        struct memb_merge_detect *in,
 
599
        const struct memb_merge_detect *in,
569
600
        struct memb_merge_detect *out);
570
 
static void srp_addr_copy_endian_convert (struct srp_addr *out, struct srp_addr *in);
 
601
static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in);
571
602
static void timer_function_orf_token_timeout (void *data);
 
603
static void timer_function_pause_timeout (void *data);
572
604
static void timer_function_heartbeat_timeout (void *data);
573
605
static void timer_function_token_retransmit_timeout (void *data);
574
606
static void timer_function_token_hold_retransmit_timeout (void *data);
576
608
 
577
609
void main_deliver_fn (
578
610
        void *context,
579
 
        void *msg,
580
 
        int msg_len);
 
611
        const void *msg,
 
612
        unsigned int msg_len);
581
613
 
582
614
void main_iface_change_fn (
583
615
        void *context,
584
 
        struct totem_ip_address *iface_address,
 
616
        const struct totem_ip_address *iface_address,
585
617
        unsigned int iface_no);
586
618
 
587
619
/*
588
620
 * All instances in one database
589
621
 */
590
 
static struct hdb_handle_database totemsrp_instance_database = {
591
 
        .handle_count   = 0,
592
 
        .handles        = 0,
593
 
        .iterator       = 0,
594
 
        .mutex          = PTHREAD_MUTEX_INITIALIZER
595
 
};
 
622
DECLARE_HDB_DATABASE (totemsrp_instance_database,NULL);
 
623
 
596
624
struct message_handlers totemsrp_message_handlers = {
597
625
        6,
598
626
        {
605
633
        }
606
634
};
607
635
 
608
 
static char *rundir = NULL;
609
 
 
610
 
#define log_printf(level, format, args...) \
611
 
    instance->totemsrp_log_printf (__FILE__, __LINE__, level, format, ##args)
612
 
 
613
 
void totemsrp_instance_initialize (struct totemsrp_instance *instance)
 
636
static const char *rundir = NULL;
 
637
 
 
638
#define log_printf(level, format, args...)                              \
 
639
do {                                                                    \
 
640
        instance->totemsrp_log_printf (                                 \
 
641
                LOGSYS_ENCODE_RECID(level,                              \
 
642
                                   instance->totemsrp_subsys_id,        \
 
643
                                   LOGSYS_RECID_LOG),                   \
 
644
                __FUNCTION__, __FILE__, __LINE__,                       \
 
645
                format, ##args);                                        \
 
646
} while (0);
 
647
 
 
648
static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
614
649
{
615
650
        memset (instance, 0, sizeof (struct totemsrp_instance));
616
651
 
618
653
 
619
654
        list_init (&instance->token_callback_sent_listhead);
620
655
 
621
 
        instance->my_received_flg = 0;
 
656
        instance->my_received_flg = 1;
622
657
 
623
658
        instance->my_token_seq = SEQNO_START_TOKEN - 1;
624
659
 
633
668
        instance->my_high_delivered = SEQNO_START_MSG;
634
669
}
635
670
 
636
 
void main_token_seqid_get (
637
 
        void *msg,
 
671
static void main_token_seqid_get (
 
672
        const void *msg,
638
673
        unsigned int *seqid,
639
674
        unsigned int *token_is)
640
675
{
641
 
        struct orf_token *token = (struct orf_token *)msg;
 
676
        const struct orf_token *token = msg;
642
677
 
643
678
        *seqid = 0;
644
679
        *token_is = 0;
648
683
        }
649
684
}
650
685
 
651
 
unsigned int main_msgs_missing (void)
 
686
static unsigned int main_msgs_missing (void)
652
687
{
653
688
// TODO
654
689
        return (0);
655
690
}
656
691
 
 
692
static int pause_flush (struct totemsrp_instance *instance)
 
693
{
 
694
        struct timeval now;
 
695
        uint64_t now_msec;
 
696
        uint64_t timestamp_msec;
 
697
        int res = 0;
 
698
 
 
699
        gettimeofday (&now, NULL);
 
700
        now_msec = ((now.tv_sec * 1000ULL) + (now.tv_usec / 1000ULL));
 
701
        timestamp_msec = ((instance->pause_timestamp.tv_sec * 1000ULL) + (instance->pause_timestamp.tv_usec/1000ULL));
 
702
 
 
703
        if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) {
 
704
                log_printf (instance->totemsrp_log_level_notice,
 
705
                        "Process pause detected for %d ms, flushing membership messages.\n", (unsigned int)(now_msec - timestamp_msec));
 
706
                /*
 
707
                 * -1 indicates an error from recvmsg
 
708
                 */
 
709
                do {
 
710
                        res = totemrrp_mcast_recv_empty (instance->totemrrp_handle);
 
711
                } while (res == -1);
 
712
        }
 
713
        return (res);
 
714
}
 
715
 
657
716
/*
658
717
 * Exported interfaces
659
718
 */
660
719
int totemsrp_initialize (
661
 
        poll_handle poll_handle,
662
 
        totemsrp_handle *handle,
 
720
        hdb_handle_t poll_handle,
 
721
        hdb_handle_t *handle,
663
722
        struct totem_config *totem_config,
664
723
 
665
724
        void (*deliver_fn) (
666
725
                unsigned int nodeid,
667
 
                struct iovec *iovec,
668
 
                int iov_len,
 
726
                const void *msg,
 
727
                unsigned int msg_len,
669
728
                int endian_conversion_required),
670
729
 
671
730
        void (*confchg_fn) (
672
731
                enum totem_configuration_type configuration_type,
673
 
                unsigned int *member_list, int member_list_entries,
674
 
                unsigned int *left_list, int left_list_entries,
675
 
                unsigned int *joined_list, int joined_list_entries,
676
 
                struct memb_ring_id *ring_id))
 
732
                const unsigned int *member_list, size_t member_list_entries,
 
733
                const unsigned int *left_list, size_t left_list_entries,
 
734
                const unsigned int *joined_list, size_t joined_list_entries,
 
735
                const struct memb_ring_id *ring_id))
677
736
{
678
737
        struct totemsrp_instance *instance;
679
738
        unsigned int res;
691
750
 
692
751
        rundir = getenv ("COROSYNC_RUN_DIR");
693
752
        if (rundir == NULL) {
694
 
                rundir = "/var/lib/corosync";
 
753
                rundir = LOCALSTATEDIR "/lib/corosync";
695
754
        }
696
 
        
 
755
 
697
756
        res = mkdir (rundir, 0700);
698
 
        chdir (rundir);
 
757
        if (res == -1 && errno != EEXIST) {
 
758
                goto error_put;
 
759
        }
 
760
 
 
761
        res = chdir (rundir);
 
762
        if (res == -1) {
 
763
                goto error_put;
 
764
        }
699
765
 
700
766
        totemsrp_instance_initialize (instance);
701
767
 
709
775
        instance->totemsrp_log_level_warning = totem_config->totem_logging_configuration.log_level_warning;
710
776
        instance->totemsrp_log_level_notice = totem_config->totem_logging_configuration.log_level_notice;
711
777
        instance->totemsrp_log_level_debug = totem_config->totem_logging_configuration.log_level_debug;
 
778
        instance->totemsrp_subsys_id = totem_config->totem_logging_configuration.log_subsys_id;
712
779
        instance->totemsrp_log_printf = totem_config->totem_logging_configuration.log_printf;
713
780
 
714
781
        /*
716
783
         */
717
784
        totemip_copy (&instance->mcast_address, &totem_config->interfaces[0].mcast_addr);
718
785
 
719
 
        memset (instance->iov_buffer, 0, FRAME_SIZE_MAX);
720
 
 
721
786
        /*
722
787
         * Display totem configuration
723
788
         */
724
 
        log_printf (instance->totemsrp_log_level_notice,
 
789
        log_printf (instance->totemsrp_log_level_debug,
725
790
                "Token Timeout (%d ms) retransmit timeout (%d ms)\n",
726
791
                totem_config->token_timeout, totem_config->token_retransmit_timeout);
727
 
        log_printf (instance->totemsrp_log_level_notice,
 
792
        log_printf (instance->totemsrp_log_level_debug,
728
793
                "token hold (%d ms) retransmits before loss (%d retrans)\n",
729
794
                totem_config->token_hold_timeout, totem_config->token_retransmits_before_loss_const);
730
 
        log_printf (instance->totemsrp_log_level_notice,
 
795
        log_printf (instance->totemsrp_log_level_debug,
731
796
                "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)\n",
732
797
                totem_config->join_timeout,
733
798
                totem_config->send_join_timeout,
734
799
                totem_config->consensus_timeout,
735
800
 
736
801
                totem_config->merge_timeout);
737
 
        log_printf (instance->totemsrp_log_level_notice,
 
802
        log_printf (instance->totemsrp_log_level_debug,
738
803
                "downcheck (%d ms) fail to recv const (%d msgs)\n",
739
804
                totem_config->downcheck_timeout, totem_config->fail_to_recv_const);
740
 
        log_printf (instance->totemsrp_log_level_notice,
 
805
        log_printf (instance->totemsrp_log_level_debug,
741
806
                "seqno unchanged const (%d rotations) Maximum network MTU %d\n", totem_config->seqno_unchanged_const, totem_config->net_mtu);
742
807
 
743
 
        log_printf (instance->totemsrp_log_level_notice,
 
808
        log_printf (instance->totemsrp_log_level_debug,
744
809
                "window size per rotation (%d messages) maximum messages per rotation (%d messages)\n",
745
810
                totem_config->window_size, totem_config->max_messages);
746
811
 
747
 
        log_printf (instance->totemsrp_log_level_notice,
 
812
        log_printf (instance->totemsrp_log_level_debug,
748
813
                "send threads (%d threads)\n", totem_config->threads);
749
 
        log_printf (instance->totemsrp_log_level_notice,
 
814
        log_printf (instance->totemsrp_log_level_debug,
750
815
                "RRP token expired timeout (%d ms)\n",
751
816
                totem_config->rrp_token_expired_timeout);
752
 
        log_printf (instance->totemsrp_log_level_notice,
 
817
        log_printf (instance->totemsrp_log_level_debug,
753
818
                "RRP token problem counter (%d ms)\n",
754
819
                totem_config->rrp_problem_count_timeout);
755
 
        log_printf (instance->totemsrp_log_level_notice,
 
820
        log_printf (instance->totemsrp_log_level_debug,
756
821
                "RRP threshold (%d problem count)\n",
757
822
                totem_config->rrp_problem_count_threshold);
758
 
        log_printf (instance->totemsrp_log_level_notice,
 
823
        log_printf (instance->totemsrp_log_level_debug,
759
824
                "RRP mode set to %s.\n", instance->totem_config->rrp_mode);
760
825
 
761
 
        log_printf (instance->totemsrp_log_level_notice,
 
826
        log_printf (instance->totemsrp_log_level_debug,
762
827
                "heartbeat_failures_allowed (%d)\n", totem_config->heartbeat_failures_allowed);
763
 
        log_printf (instance->totemsrp_log_level_notice,
 
828
        log_printf (instance->totemsrp_log_level_debug,
764
829
                "max_network_delay (%d ms)\n", totem_config->max_network_delay);
765
830
 
766
831
 
767
 
        queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
 
832
        cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
768
833
                sizeof (struct message_item));
769
834
 
770
835
        sq_init (&instance->regular_sort_queue,
780
845
        instance->totemsrp_confchg_fn = confchg_fn;
781
846
        instance->use_heartbeat = 1;
782
847
 
 
848
        gettimeofday (&instance->pause_timestamp, NULL);
 
849
 
783
850
        if ( totem_config->heartbeat_failures_allowed == 0 ) {
784
 
                log_printf (instance->totemsrp_log_level_notice,
 
851
                log_printf (instance->totemsrp_log_level_debug,
785
852
                        "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0\n");
786
853
                instance->use_heartbeat = 0;
787
854
        }
788
855
 
789
856
        if (instance->use_heartbeat) {
790
 
                instance->heartbeat_timeout 
791
 
                        = (totem_config->heartbeat_failures_allowed) * totem_config->token_retransmit_timeout 
 
857
                instance->heartbeat_timeout
 
858
                        = (totem_config->heartbeat_failures_allowed) * totem_config->token_retransmit_timeout
792
859
                                + totem_config->max_network_delay;
793
860
 
794
861
                if (instance->heartbeat_timeout >= totem_config->token_timeout) {
795
 
                        log_printf (instance->totemsrp_log_level_notice,
796
 
                                "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)\n", 
 
862
                        log_printf (instance->totemsrp_log_level_debug,
 
863
                                "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)\n",
797
864
                                instance->heartbeat_timeout,
798
865
                                totem_config->token_timeout);
799
 
                        log_printf (instance->totemsrp_log_level_notice,
 
866
                        log_printf (instance->totemsrp_log_level_debug,
800
867
                                "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay\n");
801
 
                        log_printf (instance->totemsrp_log_level_notice,
 
868
                        log_printf (instance->totemsrp_log_level_debug,
802
869
                                "heartbeat timeout should be less than the token timeout. HeartBeat is Diabled !!\n");
803
870
                        instance->use_heartbeat = 0;
804
871
                }
805
872
                else {
806
 
                        log_printf (instance->totemsrp_log_level_notice,
 
873
                        log_printf (instance->totemsrp_log_level_debug,
807
874
                                "total heartbeat_timeout (%d ms)\n", instance->heartbeat_timeout);
808
875
                }
809
876
        }
810
 
        
 
877
 
811
878
        totemrrp_initialize (
812
879
                poll_handle,
813
880
                &instance->totemrrp_handle,
821
888
        /*
822
889
         * Must have net_mtu adjusted by totemrrp_initialize first
823
890
         */
824
 
        queue_init (&instance->new_message_queue,
 
891
        cs_queue_init (&instance->new_message_queue,
825
892
                MESSAGE_QUEUE_MAX,
826
893
                sizeof (struct message_item));
827
894
 
 
895
        hdb_handle_put (&totemsrp_instance_database, *handle);
 
896
 
828
897
        return (0);
829
898
 
 
899
error_put:
 
900
        hdb_handle_put (&totemsrp_instance_database, *handle);
 
901
 
830
902
error_destroy:
831
903
        hdb_handle_destroy (&totemsrp_instance_database, *handle);
832
904
 
835
907
}
836
908
 
837
909
void totemsrp_finalize (
838
 
        totemsrp_handle handle)
 
910
        hdb_handle_t handle)
839
911
{
840
912
        struct totemsrp_instance *instance;
841
913
        unsigned int res;
845
917
        if (res != 0) {
846
918
                return;
847
919
        }
 
920
        memb_leave_message_send (instance);
848
921
 
849
922
        hdb_handle_put (&totemsrp_instance_database, handle);
850
923
}
851
924
 
852
925
int totemsrp_ifaces_get (
853
 
        totemsrp_handle handle,
 
926
        hdb_handle_t handle,
854
927
        unsigned int nodeid,
855
928
        struct totem_ip_address *interfaces,
856
929
        char ***status,
880
953
                *iface_count = instance->totem_config->interface_count;
881
954
                goto finish;
882
955
        }
883
 
       
 
956
 
884
957
        for (i = 0; i < instance->my_left_memb_entries; i++) {
885
958
                if (instance->my_left_memb_list[i].addr[0].nodeid == nodeid) {
886
959
                        found = 1;
904
977
        return (res);
905
978
}
906
979
 
907
 
int totemsrp_my_nodeid_get (
908
 
        totemsrp_handle handle)
 
980
int totemsrp_crypto_set (
 
981
        hdb_handle_t handle,
 
982
        unsigned int type)
909
983
{
910
 
        struct totemsrp_instance *instance;
911
984
        int res;
 
985
        struct totemsrp_instance *instance;
 
986
 
 
987
        res = hdb_handle_get (&totemsrp_instance_database, handle,
 
988
                (void *)&instance);
 
989
        if (res != 0) {
 
990
                return (0);
 
991
        }
 
992
 
 
993
        res = totemrrp_crypto_set(instance->totemrrp_handle, type);
 
994
 
 
995
        hdb_handle_put (&totemsrp_instance_database, handle);
 
996
        return (res);
 
997
}
 
998
 
 
999
 
 
1000
unsigned int totemsrp_my_nodeid_get (
 
1001
        hdb_handle_t handle)
 
1002
{
 
1003
        struct totemsrp_instance *instance;
 
1004
        unsigned int res;
912
1005
 
913
1006
        res = hdb_handle_get (&totemsrp_instance_database, handle,
914
1007
                (void *)&instance);
923
1016
}
924
1017
 
925
1018
int totemsrp_my_family_get (
926
 
        totemsrp_handle handle)
 
1019
        hdb_handle_t handle)
927
1020
{
928
1021
        struct totemsrp_instance *instance;
929
1022
        int res;
942
1035
 
943
1036
 
944
1037
int totemsrp_ring_reenable (
945
 
        totemsrp_handle handle)
 
1038
        hdb_handle_t handle)
946
1039
{
947
1040
        struct totemsrp_instance *instance;
948
1041
        int res;
964
1057
/*
965
1058
 * Set operations for use by the membership algorithm
966
1059
 */
967
 
int srp_addr_equal (struct srp_addr *a, struct srp_addr *b)
 
1060
static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b)
968
1061
{
969
1062
        unsigned int i;
970
1063
        unsigned int res;
978
1071
        return (1);
979
1072
}
980
1073
 
981
 
void srp_addr_copy (struct srp_addr *dest, struct srp_addr *src)
 
1074
static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src)
982
1075
{
983
1076
        unsigned int i;
984
1077
 
987
1080
        }
988
1081
}
989
1082
 
990
 
void srp_addr_to_nodeid (
 
1083
static void srp_addr_to_nodeid (
991
1084
        unsigned int *nodeid_out,
992
1085
        struct srp_addr *srp_addr_in,
993
1086
        unsigned int entries)
999
1092
        }
1000
1093
}
1001
1094
 
1002
 
static void srp_addr_copy_endian_convert (struct srp_addr *out, struct srp_addr *in)
 
1095
static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in)
1003
1096
{
1004
1097
        int i;
1005
1098
 
1044
1137
 */
1045
1138
static void memb_consensus_set (
1046
1139
        struct totemsrp_instance *instance,
1047
 
        struct srp_addr *addr)
 
1140
        const struct srp_addr *addr)
1048
1141
{
1049
1142
        int found = 0;
1050
1143
        int i;
1051
1144
 
 
1145
        if (addr->addr[0].nodeid == LEAVE_DUMMY_NODEID)
 
1146
                return;
 
1147
 
1052
1148
        for (i = 0; i < instance->consensus_list_entries; i++) {
1053
1149
                if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) {
1054
1150
                        found = 1;
1068
1164
 */
1069
1165
static int memb_consensus_isset (
1070
1166
        struct totemsrp_instance *instance,
1071
 
        struct srp_addr *addr)
 
1167
        const struct srp_addr *addr)
1072
1168
{
1073
1169
        int i;
1074
1170
 
1159
1255
 * Is subset fully contained in fullset
1160
1256
 */
1161
1257
static int memb_set_subset (
1162
 
        struct srp_addr *subset, int subset_entries,
1163
 
        struct srp_addr *fullset, int fullset_entries)
 
1258
        const struct srp_addr *subset, int subset_entries,
 
1259
        const struct srp_addr *fullset, int fullset_entries)
1164
1260
{
1165
1261
        int i;
1166
1262
        int j;
1186
1282
 * merge subset into fullset taking care not to add duplicates
1187
1283
 */
1188
1284
static void memb_set_merge (
1189
 
        struct srp_addr *subset, int subset_entries,
 
1285
        const struct srp_addr *subset, int subset_entries,
1190
1286
        struct srp_addr *fullset, int *fullset_entries)
1191
1287
{
1192
1288
        int found = 0;
1198
1294
                        if (srp_addr_equal (&fullset[j], &subset[i])) {
1199
1295
                                found = 1;
1200
1296
                                break;
1201
 
                        }       
 
1297
                        }
1202
1298
                }
1203
1299
                if (found == 0) {
1204
1300
                        srp_addr_copy (&fullset[*fullset_entries], &subset[i]);
1297
1393
                instance->old_ring_state_saved = 1;
1298
1394
                instance->old_ring_state_aru = instance->my_aru;
1299
1395
                instance->old_ring_state_high_seq_received = instance->my_high_seq_received;
1300
 
                log_printf (instance->totemsrp_log_level_notice,
 
1396
                log_printf (instance->totemsrp_log_level_debug,
1301
1397
                        "Saving state aru %x high seq received %x\n",
1302
1398
                        instance->my_aru, instance->my_high_seq_received);
1303
1399
        }
1323
1419
                totemip_zero_set(&instance->my_ring_id.rep);
1324
1420
                instance->my_aru = instance->old_ring_state_aru;
1325
1421
                instance->my_high_seq_received = instance->old_ring_state_high_seq_received;
1326
 
                log_printf (instance->totemsrp_log_level_notice,
 
1422
                log_printf (instance->totemsrp_log_level_debug,
1327
1423
                        "Restoring instance->my_aru %x my high seq received %x\n",
1328
1424
                        instance->my_aru, instance->my_high_seq_received);
1329
1425
        }
1334
1430
        instance->old_ring_state_saved = 0;
1335
1431
}
1336
1432
 
 
1433
static void reset_pause_timeout (struct totemsrp_instance *instance)
 
1434
{
 
1435
        poll_timer_delete (instance->totemsrp_poll_handle, instance->timer_pause_timeout);
 
1436
        poll_timer_add (instance->totemsrp_poll_handle,
 
1437
                instance->totem_config->token_timeout / 5,
 
1438
                (void *)instance,
 
1439
                timer_function_pause_timeout,
 
1440
                &instance->timer_pause_timeout);
 
1441
}
 
1442
 
1337
1443
static void reset_token_timeout (struct totemsrp_instance *instance) {
1338
1444
        poll_timer_delete (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1339
1445
        poll_timer_add (instance->totemsrp_poll_handle,
1414
1520
/*
1415
1521
 * Timers used for various states of the membership algorithm
1416
1522
 */
 
1523
static void timer_function_pause_timeout (void *data)
 
1524
{
 
1525
        struct totemsrp_instance *instance = data;
 
1526
 
 
1527
        gettimeofday (&instance->pause_timestamp, NULL);
 
1528
        reset_pause_timeout (instance);
 
1529
}
 
1530
 
1417
1531
static void timer_function_orf_token_timeout (void *data)
1418
1532
{
1419
 
        struct totemsrp_instance *instance = (struct totemsrp_instance *)data;
 
1533
        struct totemsrp_instance *instance = data;
1420
1534
 
1421
1535
        switch (instance->memb_state) {
1422
1536
                case MEMB_STATE_OPERATIONAL:
1423
 
                        log_printf (instance->totemsrp_log_level_notice,
 
1537
                        log_printf (instance->totemsrp_log_level_debug,
1424
1538
                                "The token was lost in the OPERATIONAL state.\n");
 
1539
                        log_printf (instance->totemsrp_log_level_notice,
 
1540
                                "A processor failed, forming new configuration.\n");
1425
1541
                        totemrrp_iface_check (instance->totemrrp_handle);
1426
1542
                        memb_state_gather_enter (instance, 2);
1427
1543
                        break;
1428
1544
 
1429
1545
                case MEMB_STATE_GATHER:
1430
 
                        log_printf (instance->totemsrp_log_level_notice,
 
1546
                        log_printf (instance->totemsrp_log_level_debug,
1431
1547
                                "The consensus timeout expired.\n");
1432
1548
                        memb_state_consensus_timeout_expired (instance);
1433
1549
                        memb_state_gather_enter (instance, 3);
1434
1550
                        break;
1435
1551
 
1436
1552
                case MEMB_STATE_COMMIT:
1437
 
                        log_printf (instance->totemsrp_log_level_notice,
 
1553
                        log_printf (instance->totemsrp_log_level_debug,
1438
1554
                                "The token was lost in the COMMIT state.\n");
1439
1555
                        memb_state_gather_enter (instance, 4);
1440
1556
                        break;
1441
 
                
 
1557
 
1442
1558
                case MEMB_STATE_RECOVERY:
1443
 
                        log_printf (instance->totemsrp_log_level_notice,
 
1559
                        log_printf (instance->totemsrp_log_level_debug,
1444
1560
                                "The token was lost in the RECOVERY state.\n");
1445
1561
                        ring_state_restore (instance);
1446
1562
                        memb_state_gather_enter (instance, 5);
1450
1566
 
1451
1567
static void timer_function_heartbeat_timeout (void *data)
1452
1568
{
1453
 
        struct totemsrp_instance *instance = (struct totemsrp_instance *)data;
1454
 
        log_printf (instance->totemsrp_log_level_notice,
 
1569
        struct totemsrp_instance *instance = data;
 
1570
        log_printf (instance->totemsrp_log_level_debug,
1455
1571
                "HeartBeat Timer expired Invoking token loss mechanism in state %d \n", instance->memb_state);
1456
1572
        timer_function_orf_token_timeout(data);
1457
1573
}
1458
1574
 
1459
1575
static void memb_timer_function_state_gather (void *data)
1460
1576
{
1461
 
        struct totemsrp_instance *instance = (struct totemsrp_instance *)data;
 
1577
        struct totemsrp_instance *instance = data;
1462
1578
 
1463
1579
        switch (instance->memb_state) {
1464
1580
        case MEMB_STATE_OPERATIONAL:
1473
1589
                 * Restart the join timeout
1474
1590
                `*/
1475
1591
                poll_timer_delete (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
1476
 
        
 
1592
 
1477
1593
                poll_timer_add (instance->totemsrp_poll_handle,
1478
1594
                        instance->totem_config->join_timeout,
1479
1595
                        (void *)instance,
1485
1601
 
1486
1602
static void memb_timer_function_gather_consensus_timeout (void *data)
1487
1603
{
1488
 
        struct totemsrp_instance *instance = (struct totemsrp_instance *)data;
 
1604
        struct totemsrp_instance *instance = data;
1489
1605
        memb_state_consensus_timeout_expired (instance);
1490
1606
}
1491
1607
 
1513
1629
                if (res != 0) {
1514
1630
                        continue;
1515
1631
                }
1516
 
                recovery_message_item = (struct sort_queue_item *)ptr;
 
1632
                recovery_message_item = ptr;
1517
1633
 
1518
1634
                /*
1519
1635
                 * Convert recovery message into regular message
1520
1636
                 */
1521
 
                if (recovery_message_item->iov_len > 1) {
1522
 
                        mcast = recovery_message_item->iovec[1].iov_base;
1523
 
                        memcpy (&regular_message_item.iovec[0],
1524
 
                                &recovery_message_item->iovec[1],
1525
 
                                sizeof (struct iovec) * recovery_message_item->iov_len);
 
1637
                mcast = recovery_message_item->mcast;
 
1638
                if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) {
 
1639
                        /*
 
1640
                         * Message is a recovery message encapsulated
 
1641
                         * in a new ring message
 
1642
                         */
 
1643
                        regular_message_item.mcast =
 
1644
                                (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
 
1645
                        regular_message_item.msg_len =
 
1646
                        recovery_message_item->msg_len - sizeof (struct mcast);
 
1647
                        mcast = regular_message_item.mcast;
1526
1648
                } else {
1527
 
                        mcast = recovery_message_item->iovec[0].iov_base;
1528
 
                        if (mcast->header.encapsulated == 1) {
1529
 
                                /*
1530
 
                                 * Message is a recovery message encapsulated
1531
 
                                 * in a new ring message
1532
 
                                 */
1533
 
                                regular_message_item.iovec[0].iov_base =
1534
 
                                        recovery_message_item->iovec[0].iov_base + sizeof (struct mcast);
1535
 
                                regular_message_item.iovec[0].iov_len =
1536
 
                                recovery_message_item->iovec[0].iov_len - sizeof (struct mcast);
1537
 
                                regular_message_item.iov_len = 1;
1538
 
                                mcast = regular_message_item.iovec[0].iov_base;
1539
 
                        } else {
1540
 
                                continue; /* TODO this case shouldn't happen */
1541
 
                                /*
1542
 
                                 * Message is originated on new ring and not
1543
 
                                 * encapsulated
1544
 
                                 */
1545
 
                                regular_message_item.iovec[0].iov_base =
1546
 
                                        recovery_message_item->iovec[0].iov_base;
1547
 
                                regular_message_item.iovec[0].iov_len =
1548
 
                                recovery_message_item->iovec[0].iov_len;
1549
 
                        }
 
1649
                        /*
 
1650
                         * TODO this case shouldn't happen
 
1651
                         */
 
1652
                        continue;
1550
1653
                }
1551
1654
 
1552
1655
                log_printf (instance->totemsrp_log_level_debug,
1561
1664
                if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
1562
1665
                        sizeof (struct memb_ring_id)) == 0) {
1563
1666
 
1564
 
                        regular_message_item.iov_len = recovery_message_item->iov_len;
 
1667
                        regular_message_item.msg_len = recovery_message_item->msg_len;
1565
1668
                        res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
1566
1669
                        if (res == 0) {
1567
1670
                                sq_item_add (&instance->regular_sort_queue,
1571
1674
                                }
1572
1675
                        }
1573
1676
                } else {
1574
 
                        log_printf (instance->totemsrp_log_level_notice,
 
1677
                        log_printf (instance->totemsrp_log_level_debug,
1575
1678
                                "-not adding msg with seq no %x\n", mcast->seq);
1576
1679
                }
1577
1680
        }
1637
1740
                trans_memb_list_totemip, instance->my_trans_memb_entries,
1638
1741
                left_list, instance->my_left_memb_entries,
1639
1742
                0, 0, &instance->my_ring_id);
1640
 
                
 
1743
 
1641
1744
// TODO we need to filter to ensure we only deliver those
1642
1745
// messages which are part of instance->my_deliver_memb
1643
1746
        messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
1682
1785
        instance->my_high_delivered = instance->my_aru;
1683
1786
// TODO the recovery messages are leaked
1684
1787
 
1685
 
        log_printf (instance->totemsrp_log_level_notice,
 
1788
        log_printf (instance->totemsrp_log_level_debug,
1686
1789
                "entering OPERATIONAL state.\n");
 
1790
        log_printf (instance->totemsrp_log_level_notice,
 
1791
                "A processor joined or left the membership and a new membership was formed.\n");
1687
1792
        instance->memb_state = MEMB_STATE_OPERATIONAL;
1688
1793
 
1689
 
        instance->my_received_flg = 0;
 
1794
        instance->my_received_flg = 1;
 
1795
 
 
1796
        reset_pause_timeout (instance);
1690
1797
 
1691
1798
        return;
1692
1799
}
1736
1843
 
1737
1844
        memb_consensus_set (instance, &instance->my_id);
1738
1845
 
1739
 
        log_printf (instance->totemsrp_log_level_notice,
 
1846
        log_printf (instance->totemsrp_log_level_debug,
1740
1847
                "entering GATHER state from %d.\n", gather_from);
1741
1848
 
1742
1849
        instance->memb_state = MEMB_STATE_GATHER;
1752
1859
{
1753
1860
        ring_save (instance);
1754
1861
 
1755
 
        old_ring_state_save (instance); 
 
1862
        old_ring_state_save (instance);
1756
1863
 
1757
1864
        memb_state_commit_token_update (instance, commit_token);
1758
1865
 
1775
1882
        reset_token_timeout (instance); // REVIEWED
1776
1883
        reset_token_retransmit_timeout (instance); // REVIEWED
1777
1884
 
1778
 
        log_printf (instance->totemsrp_log_level_notice,
 
1885
        log_printf (instance->totemsrp_log_level_debug,
1779
1886
                "entering COMMIT state.\n");
1780
1887
 
1781
1888
        instance->memb_state = MEMB_STATE_COMMIT;
1801
1908
        char is_originated[4096];
1802
1909
        char not_originated[4096];
1803
1910
        char seqno_string_hex[10];
1804
 
        struct srp_addr *addr;
 
1911
        const struct srp_addr *addr;
1805
1912
        struct memb_commit_token_memb_entry *memb_list;
1806
1913
 
1807
 
        addr = (struct srp_addr *)commit_token->end_of_commit_token;
 
1914
        addr = (const struct srp_addr *)commit_token->end_of_commit_token;
1808
1915
        memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
1809
1916
 
1810
 
        log_printf (instance->totemsrp_log_level_notice,
 
1917
        log_printf (instance->totemsrp_log_level_debug,
1811
1918
                "entering RECOVERY state.\n");
1812
1919
 
1813
1920
        instance->my_high_ring_delivered = 0;
1814
1921
 
1815
1922
        sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
1816
 
        queue_reinit (&instance->retrans_message_queue);
 
1923
        cs_queue_reinit (&instance->retrans_message_queue);
1817
1924
 
1818
1925
        low_ring_aru = instance->old_ring_state_high_seq_received;
1819
1926
 
1836
1943
                instance->my_trans_memb_list, &instance->my_trans_memb_entries);
1837
1944
 
1838
1945
        for (i = 0; i < instance->my_new_memb_entries; i++) {
1839
 
                log_printf (instance->totemsrp_log_level_notice,
 
1946
                log_printf (instance->totemsrp_log_level_debug,
1840
1947
                        "position [%d] member %s:\n", i, totemip_print (&addr[i].addr[0]));
1841
 
                log_printf (instance->totemsrp_log_level_notice,
 
1948
                log_printf (instance->totemsrp_log_level_debug,
1842
1949
                        "previous ring seq %lld rep %s\n",
1843
1950
                        memb_list[i].ring_id.seq,
1844
1951
                        totemip_print (&memb_list[i].ring_id.rep));
1845
1952
 
1846
 
                log_printf (instance->totemsrp_log_level_notice,
 
1953
                log_printf (instance->totemsrp_log_level_debug,
1847
1954
                        "aru %x high delivered %x received flag %d\n",
1848
1955
                        memb_list[i].aru,
1849
1956
                        memb_list[i].high_delivered,
1904
2011
        }
1905
2012
        assert (range < 1024);
1906
2013
 
1907
 
        log_printf (instance->totemsrp_log_level_notice,
 
2014
        log_printf (instance->totemsrp_log_level_debug,
1908
2015
                "copying all old ring messages from %x-%x.\n",
1909
2016
                low_ring_aru + 1, instance->old_ring_state_high_seq_received);
1910
2017
        strcpy (not_originated, "Not Originated for recovery: ");
1911
2018
        strcpy (is_originated, "Originated for recovery: ");
1912
 
                
 
2019
 
1913
2020
        for (i = 1; i <= range; i++) {
1914
2021
                struct sort_queue_item *sort_queue_item;
1915
2022
                struct message_item message_item;
1921
2028
                        low_ring_aru + i, &ptr);
1922
2029
                if (res != 0) {
1923
2030
                        strcat (not_originated, seqno_string_hex);
1924
 
                continue;
1925
 
        }
1926
 
        strcat (is_originated, seqno_string_hex);
1927
 
        sort_queue_item = ptr;
1928
 
        assert (sort_queue_item->iov_len > 0);
1929
 
        assert (sort_queue_item->iov_len <= MAXIOVS);
1930
 
        messages_originated++;
1931
 
        memset (&message_item, 0, sizeof (struct message_item));
1932
 
// TODO  LEAK
1933
 
        message_item.mcast = malloc (sizeof (struct mcast));
1934
 
        assert (message_item.mcast);
1935
 
        message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
1936
 
        srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
1937
 
        message_item.mcast->header.encapsulated = MESSAGE_ENCAPSULATED;
1938
 
        message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
1939
 
        assert (message_item.mcast->header.nodeid);
1940
 
        message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
1941
 
        memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
1942
 
                sizeof (struct memb_ring_id));
1943
 
        message_item.iov_len = sort_queue_item->iov_len;
1944
 
        memcpy (&message_item.iovec, &sort_queue_item->iovec,
1945
 
                sizeof (struct iovec) * sort_queue_item->iov_len);
1946
 
                queue_item_add (&instance->retrans_message_queue, &message_item);
1947
 
        }
1948
 
        log_printf (instance->totemsrp_log_level_notice,
 
2031
                        continue;
 
2032
                }
 
2033
                strcat (is_originated, seqno_string_hex);
 
2034
                sort_queue_item = ptr;
 
2035
                messages_originated++;
 
2036
                memset (&message_item, 0, sizeof (struct message_item));
 
2037
        // TODO  LEAK
 
2038
                message_item.mcast = malloc (10000);
 
2039
                assert (message_item.mcast);
 
2040
                message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
 
2041
                srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
 
2042
                message_item.mcast->header.encapsulated = MESSAGE_ENCAPSULATED;
 
2043
                message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
 
2044
                assert (message_item.mcast->header.nodeid);
 
2045
                message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
 
2046
                memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
 
2047
                        sizeof (struct memb_ring_id));
 
2048
                message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
 
2049
                memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
 
2050
                        sort_queue_item->mcast,
 
2051
                        sort_queue_item->msg_len);
 
2052
                cs_queue_item_add (&instance->retrans_message_queue, &message_item);
 
2053
        }
 
2054
        log_printf (instance->totemsrp_log_level_debug,
1949
2055
                "Originated %d messages in RECOVERY.\n", messages_originated);
1950
2056
        strcat (not_originated, "\n");
1951
2057
        strcat (is_originated, "\n");
1952
 
        log_printf (instance->totemsrp_log_level_notice, is_originated);
1953
 
        log_printf (instance->totemsrp_log_level_notice, not_originated);
 
2058
        log_printf (instance->totemsrp_log_level_debug, "%s", is_originated);
 
2059
        log_printf (instance->totemsrp_log_level_debug, "%s", not_originated);
1954
2060
        goto originated;
1955
2061
 
1956
2062
no_originate:
1957
 
        log_printf (instance->totemsrp_log_level_notice,
 
2063
        log_printf (instance->totemsrp_log_level_debug,
1958
2064
                "Did not need to originate any messages in recovery.\n");
1959
2065
 
1960
2066
originated:
1972
2078
        return;
1973
2079
}
1974
2080
 
1975
 
int totemsrp_new_msg_signal (totemsrp_handle handle)
 
2081
int totemsrp_new_msg_signal (hdb_handle_t handle)
1976
2082
{
1977
2083
        struct totemsrp_instance *instance;
1978
2084
        unsigned int res;
1992
2098
}
1993
2099
 
1994
2100
int totemsrp_mcast (
1995
 
        totemsrp_handle handle,
 
2101
        hdb_handle_t handle,
1996
2102
        struct iovec *iovec,
1997
 
        int iov_len,
 
2103
        unsigned int iov_len,
1998
2104
        int guarantee)
1999
2105
{
2000
2106
        int i;
2001
 
        int j;
2002
2107
        struct message_item message_item;
2003
2108
        struct totemsrp_instance *instance;
 
2109
        char *addr;
 
2110
        unsigned int addr_idx;
2004
2111
        unsigned int res;
2005
2112
 
2006
2113
        res = hdb_handle_get (&totemsrp_instance_database, handle,
2008
2115
        if (res != 0) {
2009
2116
                goto error_exit;
2010
2117
        }
2011
 
        
2012
 
        if (queue_is_full (&instance->new_message_queue)) {
2013
 
                log_printf (instance->totemsrp_log_level_warning, "queue full\n");
 
2118
 
 
2119
        if (cs_queue_is_full (&instance->new_message_queue)) {
 
2120
                log_printf (instance->totemsrp_log_level_debug, "queue full\n");
2014
2121
                return (-1);
2015
2122
        }
2016
 
        for (j = 0, i = 0; i < iov_len; i++) {
2017
 
                j+= iovec[i].iov_len;
2018
 
        }
2019
2123
 
2020
2124
        memset (&message_item, 0, sizeof (struct message_item));
2021
2125
 
2022
2126
        /*
2023
2127
         * Allocate pending item
2024
2128
         */
2025
 
// TODO LEAK
2026
 
        message_item.mcast = malloc (sizeof (struct mcast));
 
2129
        message_item.mcast = malloc (10000);
2027
2130
        if (message_item.mcast == 0) {
2028
2131
                goto error_mcast;
2029
2132
        }
2040
2143
        message_item.mcast->guarantee = guarantee;
2041
2144
        srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2042
2145
 
 
2146
        addr = (char *)message_item.mcast;
 
2147
        addr_idx = sizeof (struct mcast);
2043
2148
        for (i = 0; i < iov_len; i++) {
2044
 
// TODO LEAK
2045
 
                message_item.iovec[i].iov_base = malloc (iovec[i].iov_len);
2046
 
 
2047
 
                if (message_item.iovec[i].iov_base == 0) {
2048
 
                        goto error_iovec;
2049
 
                }
2050
 
 
2051
 
                memcpy (message_item.iovec[i].iov_base, iovec[i].iov_base,
2052
 
                        iovec[i].iov_len);
2053
 
 
2054
 
                message_item.iovec[i].iov_len = iovec[i].iov_len;
 
2149
                memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
 
2150
                addr_idx += iovec[i].iov_len;
2055
2151
        }
2056
2152
 
2057
 
        message_item.iov_len = iov_len;
 
2153
        message_item.msg_len = addr_idx;
2058
2154
 
2059
2155
        log_printf (instance->totemsrp_log_level_debug, "mcasted message added to pending queue\n");
2060
 
        queue_item_add (&instance->new_message_queue, &message_item);
 
2156
        cs_queue_item_add (&instance->new_message_queue, &message_item);
2061
2157
 
2062
2158
        hdb_handle_put (&totemsrp_instance_database, handle);
2063
2159
        return (0);
2064
2160
 
2065
 
error_iovec:
2066
 
        for (j = 0; j < i; j++) {
2067
 
                free (message_item.iovec[j].iov_base);
2068
 
        }
2069
 
        
2070
 
        free(message_item.mcast);
2071
 
 
2072
2161
error_mcast:
2073
2162
        hdb_handle_put (&totemsrp_instance_database, handle);
2074
2163
 
2079
2168
/*
2080
2169
 * Determine if there is room to queue a new message
2081
2170
 */
2082
 
int totemsrp_avail (totemsrp_handle handle)
 
2171
int totemsrp_avail (hdb_handle_t handle)
2083
2172
{
2084
2173
        int avail;
2085
2174
        struct totemsrp_instance *instance;
2091
2180
                goto error_exit;
2092
2181
        }
2093
2182
 
2094
 
        queue_avail (&instance->new_message_queue, &avail);
 
2183
        cs_queue_avail (&instance->new_message_queue, &avail);
2095
2184
 
2096
2185
        hdb_handle_put (&totemsrp_instance_database, handle);
2097
2186
 
2104
2193
/*
2105
2194
 * ORF Token Management
2106
2195
 */
2107
 
/* 
 
2196
/*
2108
2197
 * Recast message to mcast group if it is available
2109
2198
 */
2110
2199
static int orf_token_remcast (
2128
2217
                log_printf (instance->totemsrp_log_level_debug, "sq not in range\n");
2129
2218
                return (-1);
2130
2219
        }
2131
 
        
 
2220
 
2132
2221
        /*
2133
2222
         * Get RTR item at seq, if not available, return
2134
2223
         */
2139
2228
 
2140
2229
        sort_queue_item = ptr;
2141
2230
 
2142
 
        totemrrp_mcast_noflush_send (instance->totemrrp_handle,
2143
 
                sort_queue_item->iovec,
2144
 
                sort_queue_item->iov_len);
 
2231
        totemrrp_mcast_noflush_send (
 
2232
                instance->totemrrp_handle,
 
2233
                sort_queue_item->mcast,
 
2234
                sort_queue_item->msg_len);
2145
2235
 
2146
2236
        return (0);
2147
2237
}
2155
2245
        unsigned int token_aru)
2156
2246
{
2157
2247
        struct sort_queue_item *regular_message;
2158
 
        unsigned int i, j;
 
2248
        unsigned int i;
2159
2249
        int res;
2160
2250
        int log_release = 0;
2161
2251
        unsigned int release_to;
2189
2279
                        instance->last_released + i, &ptr);
2190
2280
                if (res == 0) {
2191
2281
                        regular_message = ptr;
2192
 
                        for (j = 0; j < regular_message->iov_len; j++) {
2193
 
                                free (regular_message->iovec[j].iov_base);
2194
 
                        }
 
2282
                        free (regular_message->mcast);
2195
2283
                }
2196
2284
                sq_items_release (&instance->regular_sort_queue,
2197
2285
                        instance->last_released + i);
2251
2339
        int fcc_mcasts_allowed)
2252
2340
{
2253
2341
        struct message_item *message_item = 0;
2254
 
        struct queue *mcast_queue;
 
2342
        struct cs_queue *mcast_queue;
2255
2343
        struct sq *sort_queue;
2256
2344
        struct sort_queue_item sort_queue_item;
2257
2345
        struct sort_queue_item *sort_queue_item_ptr;
2268
2356
        }
2269
2357
 
2270
2358
        for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2271
 
                if (queue_is_empty (mcast_queue)) {
 
2359
                if (cs_queue_is_empty (mcast_queue)) {
2272
2360
                        break;
2273
2361
                }
2274
 
                message_item = (struct message_item *)queue_item_get (mcast_queue);
 
2362
                message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
2275
2363
                /* preincrement required by algo */
2276
2364
                if (instance->old_ring_state_saved &&
2277
2365
                        (instance->memb_state == MEMB_STATE_GATHER ||
2290
2378
                 * Build IO vector
2291
2379
                 */
2292
2380
                memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
2293
 
                sort_queue_item.iovec[0].iov_base = message_item->mcast;
2294
 
                sort_queue_item.iovec[0].iov_len = sizeof (struct mcast);
2295
 
        
2296
 
                mcast = sort_queue_item.iovec[0].iov_base;
2297
 
        
2298
 
                memcpy (&sort_queue_item.iovec[1], message_item->iovec,
2299
 
                        message_item->iov_len * sizeof (struct iovec));
 
2381
                sort_queue_item.mcast = message_item->mcast;
 
2382
                sort_queue_item.msg_len = message_item->msg_len;
 
2383
 
 
2384
                mcast = sort_queue_item.mcast;
2300
2385
 
2301
2386
                memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2302
2387
 
2303
 
                sort_queue_item.iov_len = message_item->iov_len + 1;
2304
 
 
2305
 
                assert (sort_queue_item.iov_len < 16);
2306
 
 
2307
2388
                /*
2308
2389
                 * Add message to retransmit queue
2309
2390
                 */
2310
2391
                sort_queue_item_ptr = sq_item_add (sort_queue,
2311
2392
                        &sort_queue_item, message_item->mcast->seq);
2312
2393
 
2313
 
                totemrrp_mcast_noflush_send (instance->totemrrp_handle,
2314
 
                        sort_queue_item_ptr->iovec,
2315
 
                        sort_queue_item_ptr->iov_len);
2316
 
                
 
2394
                totemrrp_mcast_noflush_send (
 
2395
                        instance->totemrrp_handle,
 
2396
                        message_item->mcast,
 
2397
                        message_item->msg_len);
 
2398
 
2317
2399
                /*
2318
2400
                 * Delete item from pending queue
2319
2401
                 */
2320
 
                queue_item_remove (mcast_queue);
 
2402
                cs_queue_item_remove (mcast_queue);
2321
2403
 
2322
2404
                /*
2323
2405
                 * If messages mcasted, deliver any new messages to totempg
2359
2441
        }
2360
2442
 
2361
2443
        rtr_list = &orf_token->rtr_list[0];
2362
 
        
 
2444
 
2363
2445
        strcpy (retransmit_msg, "Retransmit List: ");
2364
2446
        if (orf_token->rtr_list_entries) {
2365
2447
                log_printf (instance->totemsrp_log_level_debug,
2369
2451
                        strcat (retransmit_msg, value);
2370
2452
                }
2371
2453
                strcat (retransmit_msg, "\n");
2372
 
                log_printf (instance->totemsrp_log_level_notice,
 
2454
                log_printf (instance->totemsrp_log_level_debug,
2373
2455
                        "%s", retransmit_msg);
2374
2456
        }
2375
2457
 
2458
2540
 
2459
2541
static void token_retransmit (struct totemsrp_instance *instance)
2460
2542
{
2461
 
        struct iovec iovec;
2462
 
 
2463
 
        iovec.iov_base = instance->orf_token_retransmit;
2464
 
        iovec.iov_len = instance->orf_token_retransmit_size;
2465
 
 
2466
2543
        totemrrp_token_send (instance->totemrrp_handle,
2467
 
                &iovec,
2468
 
                1);
 
2544
                instance->orf_token_retransmit,
 
2545
                instance->orf_token_retransmit_size);
2469
2546
}
2470
2547
 
2471
2548
/*
2475
2552
 */
2476
2553
static void timer_function_token_retransmit_timeout (void *data)
2477
2554
{
2478
 
        struct totemsrp_instance *instance = (struct totemsrp_instance *)data;
 
2555
        struct totemsrp_instance *instance = data;
2479
2556
 
2480
2557
        switch (instance->memb_state) {
2481
2558
        case MEMB_STATE_GATHER:
2491
2568
 
2492
2569
static void timer_function_token_hold_retransmit_timeout (void *data)
2493
2570
{
2494
 
        struct totemsrp_instance *instance = (struct totemsrp_instance *)data;
 
2571
        struct totemsrp_instance *instance = data;
2495
2572
 
2496
2573
        switch (instance->memb_state) {
2497
2574
        case MEMB_STATE_GATHER:
2507
2584
 
2508
2585
static void timer_function_merge_detect_timeout(void *data)
2509
2586
{
2510
 
        struct totemsrp_instance *instance = (struct totemsrp_instance *)data;
 
2587
        struct totemsrp_instance *instance = data;
2511
2588
 
2512
2589
        instance->my_merge_detect_timeout_outstanding = 0;
2513
2590
 
2532
2609
        struct orf_token *orf_token,
2533
2610
        int forward_token)
2534
2611
{
2535
 
        struct iovec iovec;
2536
2612
        int res = 0;
2537
 
        int iov_len = sizeof (struct orf_token) +
 
2613
        unsigned int orf_token_size;
 
2614
 
 
2615
        orf_token_size = sizeof (struct orf_token) +
2538
2616
                (orf_token->rtr_list_entries * sizeof (struct rtr_item));
2539
2617
 
2540
 
        memcpy (instance->orf_token_retransmit, orf_token, iov_len);
2541
 
        instance->orf_token_retransmit_size = iov_len;
 
2618
        memcpy (instance->orf_token_retransmit, orf_token, orf_token_size);
 
2619
        instance->orf_token_retransmit_size = orf_token_size;
2542
2620
        orf_token->header.nodeid = instance->my_id.addr[0].nodeid;
2543
2621
        assert (orf_token->header.nodeid);
2544
2622
 
2546
2624
                return (0);
2547
2625
        }
2548
2626
 
2549
 
        iovec.iov_base = orf_token;
2550
 
        iovec.iov_len = iov_len;
2551
 
 
2552
2627
        totemrrp_token_send (instance->totemrrp_handle,
2553
 
                &iovec,
2554
 
                1);
 
2628
                orf_token,
 
2629
                orf_token_size);
2555
2630
 
2556
2631
        return (res);
2557
2632
}
2559
2634
static int token_hold_cancel_send (struct totemsrp_instance *instance)
2560
2635
{
2561
2636
        struct token_hold_cancel token_hold_cancel;
2562
 
        struct iovec iovec[2];
2563
2637
 
2564
2638
        /*
2565
2639
         * Only cancel if the token is currently held
2575
2649
        token_hold_cancel.header.type = MESSAGE_TYPE_TOKEN_HOLD_CANCEL;
2576
2650
        token_hold_cancel.header.endian_detector = ENDIAN_LOCAL;
2577
2651
        token_hold_cancel.header.nodeid = instance->my_id.addr[0].nodeid;
 
2652
        memcpy (&token_hold_cancel.ring_id, &instance->my_ring_id,
 
2653
                sizeof (struct memb_ring_id));
2578
2654
        assert (token_hold_cancel.header.nodeid);
2579
2655
 
2580
 
        iovec[0].iov_base = &token_hold_cancel;
2581
 
        iovec[0].iov_len = sizeof (struct token_hold_cancel) -
2582
 
                sizeof (struct memb_ring_id);
2583
 
        iovec[1].iov_base = &instance->my_ring_id;
2584
 
        iovec[1].iov_len = sizeof (struct memb_ring_id);
2585
 
 
2586
 
        totemrrp_mcast_flush_send (instance->totemrrp_handle, iovec, 2);
 
2656
        totemrrp_mcast_flush_send (instance->totemrrp_handle, &token_hold_cancel,
 
2657
                sizeof (struct token_hold_cancel));
2587
2658
 
2588
2659
        return (0);
2589
2660
}
2590
 
//AAA
2591
2661
 
2592
2662
static int orf_token_send_initial (struct totemsrp_instance *instance)
2593
2663
{
2599
2669
        orf_token.header.encapsulated = 0;
2600
2670
        orf_token.header.nodeid = instance->my_id.addr[0].nodeid;
2601
2671
        assert (orf_token.header.nodeid);
2602
 
        orf_token.seq = 0;
2603
2672
        orf_token.seq = SEQNO_START_MSG;
2604
2673
        orf_token.token_seq = SEQNO_START_TOKEN;
2605
2674
        orf_token.retrans_flg = 1;
2606
2675
        instance->my_set_retrans_flg = 1;
2607
2676
 
2608
 
        if (queue_is_empty (&instance->retrans_message_queue) == 1) {
 
2677
        if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
2609
2678
                orf_token.retrans_flg = 0;
2610
2679
                instance->my_set_retrans_flg = 0;
2611
2680
        } else {
2612
2681
                orf_token.retrans_flg = 1;
2613
2682
                instance->my_set_retrans_flg = 1;
2614
2683
        }
2615
 
                
 
2684
 
2616
2685
        orf_token.aru = 0;
2617
2686
        orf_token.aru = SEQNO_START_MSG - 1;
2618
2687
        orf_token.aru_addr = instance->my_id.addr[0].nodeid;
2634
2703
{
2635
2704
        struct srp_addr *addr;
2636
2705
        struct memb_commit_token_memb_entry *memb_list;
 
2706
        unsigned int high_aru;
 
2707
        unsigned int i;
2637
2708
 
2638
2709
        addr = (struct srp_addr *)commit_token->end_of_commit_token;
2639
2710
        memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2655
2726
        instance->my_received_flg =
2656
2727
                (instance->my_aru == instance->my_high_seq_received);
2657
2728
 
 
2729
        memb_list[commit_token->memb_index].received_flg = instance->my_received_flg;
 
2730
 
2658
2731
        memb_list[commit_token->memb_index].high_delivered = instance->my_high_delivered;
2659
 
        memb_list[commit_token->memb_index].received_flg = instance->my_received_flg;
 
2732
        /*
 
2733
         * find high aru up to current memb_index for all matching ring ids
 
2734
         * if any ring id matching memb_index has aru less then high aru set
 
2735
         * received flag for that entry to false
 
2736
         */
 
2737
        high_aru = memb_list[commit_token->memb_index].aru;
 
2738
        for (i = 0; i <= commit_token->memb_index; i++) {
 
2739
                if (memcmp (&memb_list[commit_token->memb_index].ring_id,
 
2740
                        &memb_list[i].ring_id,
 
2741
                        sizeof (struct memb_ring_id)) == 0) {
 
2742
 
 
2743
                        if (sq_lt_compare (high_aru, memb_list[i].aru)) {
 
2744
                                high_aru = memb_list[i].aru;
 
2745
                        }
 
2746
                }
 
2747
        }
 
2748
 
 
2749
        for (i = 0; i <= commit_token->memb_index; i++) {
 
2750
                if (memcmp (&memb_list[commit_token->memb_index].ring_id,
 
2751
                        &memb_list[i].ring_id,
 
2752
                        sizeof (struct memb_ring_id)) == 0) {
 
2753
 
 
2754
                        if (sq_lt_compare (memb_list[i].aru, high_aru)) {
 
2755
                                memb_list[i].received_flg = 0;
 
2756
                                if (i == commit_token->memb_index) {
 
2757
                                        instance->my_received_flg = 0;
 
2758
                                }
 
2759
                        }
 
2760
                }
 
2761
        }
2660
2762
 
2661
2763
        commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
2662
2764
        commit_token->memb_index += 1;
2686
2788
        struct totemsrp_instance *instance,
2687
2789
        struct memb_commit_token *commit_token)
2688
2790
{
2689
 
        struct iovec iovec;
2690
2791
        struct srp_addr *addr;
2691
2792
        struct memb_commit_token_memb_entry *memb_list;
 
2793
        unsigned int commit_token_size;
2692
2794
 
2693
2795
        addr = (struct srp_addr *)commit_token->end_of_commit_token;
2694
2796
        memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2695
2797
 
2696
2798
        commit_token->token_seq++;
2697
 
        iovec.iov_base = commit_token;
2698
 
        iovec.iov_len = sizeof (struct memb_commit_token) +
 
2799
        commit_token_size = sizeof (struct memb_commit_token) +
2699
2800
                ((sizeof (struct srp_addr) +
2700
2801
                        sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
2701
2802
        /*
2702
2803
         * Make a copy for retransmission if necessary
2703
2804
         */
2704
 
        memcpy (instance->orf_token_retransmit, commit_token, iovec.iov_len);
2705
 
        instance->orf_token_retransmit_size = iovec.iov_len;
 
2805
        memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
 
2806
        instance->orf_token_retransmit_size = commit_token_size;
2706
2807
 
2707
2808
        totemrrp_token_send (instance->totemrrp_handle,
2708
 
                &iovec,
2709
 
                1);
 
2809
                commit_token,
 
2810
                commit_token_size);
2710
2811
 
2711
2812
        /*
2712
2813
         * Request retransmission of the commit token in case it is lost
2730
2831
        /*
2731
2832
         * find representative by searching for smallest identifier
2732
2833
         */
2733
 
        
 
2834
 
2734
2835
        lowest_addr = &token_memb[0].addr[0];
2735
2836
        for (i = 1; i < token_memb_entries; i++) {
2736
2837
                if (totemip_compare(lowest_addr, &token_memb[i].addr[0]) > 0) {
2742
2843
 
2743
2844
static int srp_addr_compare (const void *a, const void *b)
2744
2845
{
2745
 
        struct srp_addr *srp_a = (struct srp_addr *)a;
2746
 
        struct srp_addr *srp_b = (struct srp_addr *)b;
 
2846
        const struct srp_addr *srp_a = (const struct srp_addr *)a;
 
2847
        const struct srp_addr *srp_b = (const struct srp_addr *)b;
2747
2848
 
2748
2849
        return (totemip_compare (&srp_a->addr[0], &srp_b->addr[0]));
2749
2850
}
2757
2858
        struct memb_commit_token_memb_entry *memb_list;
2758
2859
        int token_memb_entries = 0;
2759
2860
 
2760
 
        log_printf (instance->totemsrp_log_level_notice,
 
2861
        log_printf (instance->totemsrp_log_level_debug,
2761
2862
                "Creating commit token because I am the rep.\n");
2762
2863
 
2763
2864
        memb_set_subtract (token_memb, &token_memb_entries,
2796
2897
 
2797
2898
static void memb_join_message_send (struct totemsrp_instance *instance)
2798
2899
{
2799
 
        struct memb_join memb_join;
2800
 
        struct iovec iovec[3];
2801
 
        unsigned int iovs;
 
2900
        char memb_join_data[10000];
 
2901
        struct memb_join *memb_join = (struct memb_join *)memb_join_data;
 
2902
        char *addr;
 
2903
        unsigned int addr_idx;
2802
2904
 
2803
 
        memb_join.header.type = MESSAGE_TYPE_MEMB_JOIN;
2804
 
        memb_join.header.endian_detector = ENDIAN_LOCAL;
2805
 
        memb_join.header.encapsulated = 0;
2806
 
        memb_join.header.nodeid = instance->my_id.addr[0].nodeid;
2807
 
        assert (memb_join.header.nodeid);
 
2905
        memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
 
2906
        memb_join->header.endian_detector = ENDIAN_LOCAL;
 
2907
        memb_join->header.encapsulated = 0;
 
2908
        memb_join->header.nodeid = instance->my_id.addr[0].nodeid;
 
2909
        assert (memb_join->header.nodeid);
2808
2910
 
2809
2911
        assert (srp_addr_equal (&instance->my_proc_list[0], &instance->my_proc_list[1]) == 0);
2810
 
        memb_join.ring_seq = instance->my_ring_id.seq;
2811
 
        memb_join.proc_list_entries = instance->my_proc_list_entries;
2812
 
        memb_join.failed_list_entries = instance->my_failed_list_entries;
2813
 
        srp_addr_copy (&memb_join.system_from, &instance->my_id);
2814
 
                
2815
 
        iovec[0].iov_base = &memb_join;
2816
 
        iovec[0].iov_len = sizeof (struct memb_join);
2817
 
        iovec[1].iov_base = &instance->my_proc_list;
2818
 
        iovec[1].iov_len = instance->my_proc_list_entries *
2819
 
                sizeof (struct srp_addr);
2820
 
        if (instance->my_failed_list_entries == 0) {
2821
 
                iovs = 2;
2822
 
        } else {
2823
 
                iovs = 3;
2824
 
                iovec[2].iov_base = instance->my_failed_list;
2825
 
                iovec[2].iov_len = instance->my_failed_list_entries *
2826
 
                        sizeof (struct srp_addr);
2827
 
        }
2828
 
 
2829
 
        if (instance->totem_config->send_join_timeout) {
2830
 
                usleep (random() % (instance->totem_config->send_join_timeout * 1000));
2831
 
        }
2832
 
 
2833
 
        totemrrp_mcast_flush_send (
2834
 
                instance->totemrrp_handle,
2835
 
                iovec,
2836
 
                iovs);
2837
 
}
2838
 
 
2839
 
static void memb_merge_detect_transmit (struct totemsrp_instance *instance) 
 
2912
        memb_join->ring_seq = instance->my_ring_id.seq;
 
2913
        memb_join->proc_list_entries = instance->my_proc_list_entries;
 
2914
        memb_join->failed_list_entries = instance->my_failed_list_entries;
 
2915
        srp_addr_copy (&memb_join->system_from, &instance->my_id);
 
2916
 
 
2917
        /*
 
2918
         * This mess adds the joined and failed processor lists into the join
 
2919
         * message
 
2920
         */
 
2921
        addr = (char *)memb_join;
 
2922
        addr_idx = sizeof (struct memb_join);
 
2923
        memcpy (&addr[addr_idx],
 
2924
                instance->my_proc_list,
 
2925
                instance->my_proc_list_entries *
 
2926
                        sizeof (struct srp_addr));
 
2927
        addr_idx +=
 
2928
                instance->my_proc_list_entries *
 
2929
                sizeof (struct srp_addr);
 
2930
        memcpy (&addr[addr_idx],
 
2931
                instance->my_failed_list,
 
2932
                instance->my_failed_list_entries *
 
2933
                sizeof (struct srp_addr));
 
2934
        addr_idx +=
 
2935
                instance->my_failed_list_entries *
 
2936
                sizeof (struct srp_addr);
 
2937
 
 
2938
 
 
2939
        if (instance->totem_config->send_join_timeout) {
 
2940
                usleep (random() % (instance->totem_config->send_join_timeout * 1000));
 
2941
        }
 
2942
 
 
2943
        totemrrp_mcast_flush_send (
 
2944
                instance->totemrrp_handle,
 
2945
                memb_join,
 
2946
                addr_idx);
 
2947
}
 
2948
 
 
2949
static void memb_leave_message_send (struct totemsrp_instance *instance)
 
2950
{
 
2951
        char memb_join_data[10000];
 
2952
        struct memb_join *memb_join = (struct memb_join *)memb_join_data;
 
2953
        char *addr;
 
2954
        unsigned int addr_idx;
 
2955
        int active_memb_entries;
 
2956
        struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
 
2957
 
 
2958
        log_printf (instance->totemsrp_log_level_debug,
 
2959
                "sending join/leave message\n");
 
2960
 
 
2961
        /*
 
2962
         * add us to the failed list, and remove us from
 
2963
         * the members list
 
2964
         */
 
2965
        memb_set_merge(
 
2966
                       &instance->my_id, 1,
 
2967
                       instance->my_failed_list, &instance->my_failed_list_entries);
 
2968
 
 
2969
        memb_set_subtract (active_memb, &active_memb_entries,
 
2970
                           instance->my_proc_list, instance->my_proc_list_entries,
 
2971
                           &instance->my_id, 1);
 
2972
 
 
2973
 
 
2974
        memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
 
2975
        memb_join->header.endian_detector = ENDIAN_LOCAL;
 
2976
        memb_join->header.encapsulated = 0;
 
2977
        memb_join->header.nodeid = LEAVE_DUMMY_NODEID;
 
2978
 
 
2979
        memb_join->ring_seq = instance->my_ring_id.seq;
 
2980
        memb_join->proc_list_entries = active_memb_entries;
 
2981
        memb_join->failed_list_entries = instance->my_failed_list_entries;
 
2982
        srp_addr_copy (&memb_join->system_from, &instance->my_id);
 
2983
        memb_join->system_from.addr[0].nodeid = LEAVE_DUMMY_NODEID;
 
2984
 
 
2985
        // TODO: CC Maybe use the actual join send routine.
 
2986
        /*
 
2987
         * This mess adds the joined and failed processor lists into the join
 
2988
         * message
 
2989
         */
 
2990
        addr = (char *)memb_join;
 
2991
        addr_idx = sizeof (struct memb_join);
 
2992
        memcpy (&addr[addr_idx],
 
2993
                active_memb,
 
2994
                active_memb_entries *
 
2995
                        sizeof (struct srp_addr));
 
2996
        addr_idx +=
 
2997
                active_memb_entries *
 
2998
                sizeof (struct srp_addr);
 
2999
        memcpy (&addr[addr_idx],
 
3000
                instance->my_failed_list,
 
3001
                instance->my_failed_list_entries *
 
3002
                sizeof (struct srp_addr));
 
3003
        addr_idx +=
 
3004
                instance->my_failed_list_entries *
 
3005
                sizeof (struct srp_addr);
 
3006
 
 
3007
 
 
3008
        if (instance->totem_config->send_join_timeout) {
 
3009
                usleep (random() % (instance->totem_config->send_join_timeout * 1000));
 
3010
        }
 
3011
 
 
3012
        totemrrp_mcast_flush_send (
 
3013
                instance->totemrrp_handle,
 
3014
                memb_join,
 
3015
                addr_idx);
 
3016
}
 
3017
 
 
3018
static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
2840
3019
{
2841
3020
        struct memb_merge_detect memb_merge_detect;
2842
 
        struct iovec iovec[2];
2843
3021
 
2844
3022
        memb_merge_detect.header.type = MESSAGE_TYPE_MEMB_MERGE_DETECT;
2845
3023
        memb_merge_detect.header.endian_detector = ENDIAN_LOCAL;
2846
3024
        memb_merge_detect.header.encapsulated = 0;
2847
3025
        memb_merge_detect.header.nodeid = instance->my_id.addr[0].nodeid;
2848
3026
        srp_addr_copy (&memb_merge_detect.system_from, &instance->my_id);
 
3027
        memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id,
 
3028
                sizeof (struct memb_ring_id));
2849
3029
        assert (memb_merge_detect.header.nodeid);
2850
3030
 
2851
 
        iovec[0].iov_base = &memb_merge_detect;
2852
 
        iovec[0].iov_len = sizeof (struct memb_merge_detect) -
2853
 
                sizeof (struct memb_ring_id);
2854
 
        iovec[1].iov_base = &instance->my_ring_id;
2855
 
        iovec[1].iov_len = sizeof (struct memb_ring_id);
2856
 
 
2857
 
        totemrrp_mcast_flush_send (instance->totemrrp_handle, iovec, 2);
 
3031
        totemrrp_mcast_flush_send (instance->totemrrp_handle,
 
3032
                &memb_merge_detect,
 
3033
                sizeof (struct memb_merge_detect));
2858
3034
}
2859
3035
 
2860
3036
static void memb_ring_id_create_or_load (
2865
3041
        int res;
2866
3042
        char filename[256];
2867
3043
 
2868
 
        sprintf (filename, "%s/ringid_%s",
 
3044
        snprintf (filename, sizeof(filename), "%s/ringid_%s",
2869
3045
                rundir, totemip_print (&instance->my_id.addr[0]));
2870
3046
        fd = open (filename, O_RDONLY, 0700);
2871
3047
        if (fd > 0) {
2888
3064
                log_printf (instance->totemsrp_log_level_warning,
2889
3065
                        "Couldn't open %s %s\n", filename, strerror (errno));
2890
3066
        }
2891
 
        
 
3067
 
2892
3068
        totemip_copy(&memb_ring_id->rep, &instance->my_id.addr[0]);
2893
3069
        assert (!totemip_zero_check(&memb_ring_id->rep));
2894
3070
        instance->token_ring_id_seq = memb_ring_id->seq;
2896
3072
 
2897
3073
static void memb_ring_id_set_and_store (
2898
3074
        struct totemsrp_instance *instance,
2899
 
        struct memb_ring_id *ring_id)
 
3075
        const struct memb_ring_id *ring_id)
2900
3076
{
2901
3077
        char filename[256];
2902
3078
        int fd;
2904
3080
 
2905
3081
        memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
2906
3082
 
2907
 
        sprintf (filename, "%s/ringid_%s",
 
3083
        snprintf (filename, sizeof(filename), "%s/ringid_%s",
2908
3084
                rundir, totemip_print (&instance->my_id.addr[0]));
2909
3085
 
2910
3086
        fd = open (filename, O_WRONLY, 0777);
2918
3094
                assert (0);
2919
3095
                return;
2920
3096
        }
2921
 
        log_printf (instance->totemsrp_log_level_notice,
 
3097
        log_printf (instance->totemsrp_log_level_debug,
2922
3098
                "Storing new sequence id for ring %llx\n", instance->my_ring_id.seq);
2923
3099
        //assert (fd > 0);
2924
3100
        res = write (fd, &instance->my_ring_id.seq, sizeof (unsigned long long));
2927
3103
}
2928
3104
 
2929
3105
int totemsrp_callback_token_create (
2930
 
        totemsrp_handle handle,
 
3106
        hdb_handle_t handle,
2931
3107
        void **handle_out,
2932
3108
        enum totem_callback_token_type type,
2933
3109
        int delete,
2934
 
        int (*callback_fn) (enum totem_callback_token_type type, void *),
2935
 
        void *data)
 
3110
        int (*callback_fn) (enum totem_callback_token_type type, const void *),
 
3111
        const void *data)
2936
3112
{
2937
3113
        struct token_callback_instance *callback_handle;
2938
3114
        struct totemsrp_instance *instance;
2944
3120
                goto error_exit;
2945
3121
        }
2946
3122
 
2947
 
        callback_handle = (struct token_callback_instance *)malloc (sizeof (struct token_callback_instance));
 
3123
        token_hold_cancel_send (instance);
 
3124
 
 
3125
        callback_handle = malloc (sizeof (struct token_callback_instance));
2948
3126
        if (callback_handle == 0) {
2949
3127
                return (-1);
2950
3128
        }
2951
3129
        *handle_out = (void *)callback_handle;
2952
3130
        list_init (&callback_handle->list);
2953
3131
        callback_handle->callback_fn = callback_fn;
2954
 
        callback_handle->data = data;
 
3132
        callback_handle->data = (void *) data;
2955
3133
        callback_handle->callback_type = type;
2956
3134
        callback_handle->delete = delete;
2957
3135
        switch (type) {
2969
3147
        return (0);
2970
3148
}
2971
3149
 
2972
 
void totemsrp_callback_token_destroy (totemsrp_handle handle, void **handle_out)
 
3150
void totemsrp_callback_token_destroy (hdb_handle_t handle, void **handle_out)
2973
3151
{
2974
3152
        struct token_callback_instance *h;
2975
3153
 
2982
3160
        }
2983
3161
}
2984
3162
 
2985
 
void totem_callback_token_type (struct totemsrp_instance *instance, void *handle)
2986
 
{
2987
 
        struct token_callback_instance *token_callback_instance = (struct token_callback_instance *)handle;
2988
 
 
2989
 
        list_del (&token_callback_instance->list);
2990
 
        free (token_callback_instance);
2991
 
}
2992
 
 
2993
3163
static void token_callbacks_execute (
2994
3164
        struct totemsrp_instance *instance,
2995
3165
        enum totem_callback_token_type type)
3011
3181
        default:
3012
3182
                assert (0);
3013
3183
        }
3014
 
        
 
3184
 
3015
3185
        for (list = callback_listhead->next; list != callback_listhead;
3016
3186
                list = list_next) {
3017
3187
 
3045
3215
        unsigned int backlog = 0;
3046
3216
 
3047
3217
        if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3048
 
                backlog = queue_used (&instance->new_message_queue);
 
3218
                backlog = cs_queue_used (&instance->new_message_queue);
3049
3219
        } else
3050
3220
        if (instance->memb_state == MEMB_STATE_RECOVERY) {
3051
 
                backlog = queue_used (&instance->retrans_message_queue);
 
3221
                backlog = cs_queue_used (&instance->retrans_message_queue);
3052
3222
        }
3053
3223
        return (backlog);
3054
3224
}
3124
3294
 */
3125
3295
static int message_handler_orf_token (
3126
3296
        struct totemsrp_instance *instance,
3127
 
        void *msg,
3128
 
        int msg_len,
 
3297
        const void *msg,
 
3298
        size_t msg_len,
3129
3299
        int endian_conversion_needed)
3130
3300
{
3131
3301
        char token_storage[1500];
3145
3315
        timersub (&tv_current, &tv_old, &tv_diff);
3146
3316
        memcpy (&tv_old, &tv_current, sizeof (struct timeval));
3147
3317
 
3148
 
        log_printf (instance->totemsrp_log_level_notice,
 
3318
        log_printf (instance->totemsrp_log_level_debug,
3149
3319
        "Time since last token %0.4f ms\n",
3150
3320
                (((float)tv_diff.tv_sec) * 1000) + ((float)tv_diff.tv_usec)
3151
3321
                        / 1000.0);
3169
3339
         */
3170
3340
        token = (struct orf_token *)token_storage;
3171
3341
        memcpy (token, msg, sizeof (struct orf_token));
3172
 
        memcpy (&token->rtr_list[0], msg + sizeof (struct orf_token),
 
3342
        memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token),
3173
3343
                sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
3174
3344
 
3175
3345
 
3215
3385
        forward_token = 1;
3216
3386
        if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
3217
3387
                if (instance->my_token_held) {
3218
 
                        forward_token = 0;                      
 
3388
                        forward_token = 0;
3219
3389
                }
3220
3390
        }
3221
3391
 
3246
3416
                        if ((forward_token)
3247
3417
                                && instance->use_heartbeat) {
3248
3418
                                reset_heartbeat_timeout(instance);
3249
 
                        } 
 
3419
                        }
3250
3420
                        else {
3251
3421
                                cancel_heartbeat_timeout(instance);
3252
3422
                        }
3276
3446
                        }
3277
3447
 
3278
3448
                        return (0); /* discard token */
3279
 
                }               
 
3449
                }
3280
3450
 
3281
3451
                transmits_allowed = fcc_calculate (instance, token);
3282
3452
                mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
3285
3455
                mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
3286
3456
                fcc_token_update (instance, token, mcasted_retransmit +
3287
3457
                        mcasted_regular);
3288
 
                        
 
3458
 
3289
3459
                if (sq_lt_compare (instance->my_aru, token->aru) ||
3290
3460
                        instance->my_id.addr[0].nodeid ==  token->aru_addr ||
3291
3461
                        token->aru_addr == 0) {
3292
 
                        
 
3462
 
3293
3463
                        token->aru = instance->my_aru;
3294
3464
                        if (token->aru == token->seq) {
3295
3465
                                token->aru_addr = 0;
3305
3475
 
3306
3476
                if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
3307
3477
                        token->aru_addr != instance->my_id.addr[0].nodeid) {
3308
 
                        
 
3478
 
3309
3479
                        log_printf (instance->totemsrp_log_level_error,
3310
3480
                                "FAILED TO RECEIVE\n");
3311
3481
// TODO if we fail to receive, it may be possible to end with a gather
3329
3499
                                 * has recovered all messages it can recover
3330
3500
                                 * (ie: its retrans queue is empty)
3331
3501
                                 */
3332
 
                                if (queue_is_empty (&instance->retrans_message_queue) == 0) {
 
3502
                                if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
3333
3503
 
3334
3504
                                        if (token->retrans_flg == 0) {
3335
3505
                                                token->retrans_flg = 1;
3340
3510
                                        token->retrans_flg = 0;
3341
3511
                                }
3342
3512
                                log_printf (instance->totemsrp_log_level_debug,
3343
 
                                        "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x\n", 
 
3513
                                        "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x\n",
3344
3514
                                        token->retrans_flg, instance->my_set_retrans_flg,
3345
 
                                        queue_is_empty (&instance->retrans_message_queue),
 
3515
                                        cs_queue_is_empty (&instance->retrans_message_queue),
3346
3516
                                        instance->my_retrans_flg_count, token->aru);
3347
 
                                if (token->retrans_flg == 0) { 
 
3517
                                if (token->retrans_flg == 0) {
3348
3518
                                        instance->my_retrans_flg_count += 1;
3349
3519
                                } else {
3350
3520
                                        instance->my_retrans_flg_count = 0;
3380
3550
                                        instance->my_retrans_flg_count = 0;
3381
3551
                                }
3382
3552
                        }
3383
 
        
 
3553
 
3384
3554
                        totemrrp_send_flush (instance->totemrrp_handle);
3385
 
                        token_send (instance, token, forward_token); 
 
3555
                        token_send (instance, token, forward_token);
3386
3556
 
3387
3557
#ifdef GIVEINFO
3388
3558
                        gettimeofday (&tv_current, NULL);
3389
3559
                        timersub (&tv_current, &tv_old, &tv_diff);
3390
3560
                        memcpy (&tv_old, &tv_current, sizeof (struct timeval));
3391
 
                        log_printf (instance->totemsrp_log_level_notice,
 
3561
                        log_printf (instance->totemsrp_log_level_debug,
3392
3562
                                "I held %0.4f ms\n",
3393
3563
                                ((float)tv_diff.tv_usec) / 1000.0);
3394
3564
#endif
3484
3654
 
3485
3655
                sort_queue_item_p = ptr;
3486
3656
 
3487
 
                mcast_in = sort_queue_item_p->iovec[0].iov_base;
 
3657
                mcast_in = sort_queue_item_p->mcast;
3488
3658
                assert (mcast_in != (struct mcast *)0xdeadbeef);
3489
3659
 
3490
3660
                endian_conversion_required = 0;
3519
3689
                /*
3520
3690
                 * Message is locally originated multicast
3521
3691
                 */
3522
 
                if (sort_queue_item_p->iov_len > 1 &&
3523
 
                        sort_queue_item_p->iovec[0].iov_len == sizeof (struct mcast)) {
3524
 
                        instance->totemsrp_deliver_fn (
3525
 
                                mcast_header.header.nodeid,
3526
 
                                &sort_queue_item_p->iovec[1],
3527
 
                                sort_queue_item_p->iov_len - 1,
3528
 
                                endian_conversion_required);
3529
 
                } else {
3530
 
                        sort_queue_item_p->iovec[0].iov_len -= sizeof (struct mcast);
3531
 
                        sort_queue_item_p->iovec[0].iov_base += sizeof (struct mcast);
3532
 
 
3533
 
                        instance->totemsrp_deliver_fn (
3534
 
                                mcast_header.header.nodeid,
3535
 
                                sort_queue_item_p->iovec,
3536
 
                                sort_queue_item_p->iov_len,
3537
 
                                endian_conversion_required);
3538
 
 
3539
 
                        sort_queue_item_p->iovec[0].iov_len += sizeof (struct mcast);
3540
 
                        sort_queue_item_p->iovec[0].iov_base -= sizeof (struct mcast);
3541
 
                }
3542
 
//TODO  instance->stats_delv += 1;
 
3692
                instance->totemsrp_deliver_fn (
 
3693
                        mcast_header.header.nodeid,
 
3694
                        ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
 
3695
                        sort_queue_item_p->msg_len - sizeof (struct mcast),
 
3696
                        endian_conversion_required);
3543
3697
        }
3544
3698
}
3545
3699
 
3548
3702
 */
3549
3703
static int message_handler_mcast (
3550
3704
        struct totemsrp_instance *instance,
3551
 
        void *msg,
3552
 
        int msg_len,
 
3705
        const void *msg,
 
3706
        size_t msg_len,
3553
3707
        int endian_conversion_needed)
3554
3708
{
3555
3709
        struct sort_queue_item sort_queue_item;
3556
3710
        struct sq *sort_queue;
3557
3711
        struct mcast mcast_header;
3558
 
        
 
3712
 
3559
3713
 
3560
3714
        if (endian_conversion_needed) {
3561
3715
                mcast_endian_convert (msg, &mcast_header);
3634
3788
         * otherwise free io vectors
3635
3789
         */
3636
3790
        if (msg_len > 0 && msg_len < FRAME_SIZE_MAX &&
3637
 
                sq_in_range (sort_queue, mcast_header.seq) && 
 
3791
                sq_in_range (sort_queue, mcast_header.seq) &&
3638
3792
                sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
3639
3793
 
3640
3794
                /*
3641
3795
                 * Allocate new multicast memory block
3642
3796
                 */
3643
3797
// TODO LEAK
3644
 
                sort_queue_item.iovec[0].iov_base = malloc (msg_len);
3645
 
                if (sort_queue_item.iovec[0].iov_base == 0) {
 
3798
                sort_queue_item.mcast = malloc (msg_len);
 
3799
                if (sort_queue_item.mcast == NULL) {
3646
3800
                        return (-1); /* error here is corrected by the algorithm */
3647
3801
                }
3648
 
                memcpy (sort_queue_item.iovec[0].iov_base, msg, msg_len);
3649
 
                sort_queue_item.iovec[0].iov_len = msg_len;
3650
 
                assert (sort_queue_item.iovec[0].iov_len > 0);
3651
 
                assert (sort_queue_item.iovec[0].iov_len < FRAME_SIZE_MAX);
3652
 
                sort_queue_item.iov_len = 1;
3653
 
                
 
3802
                memcpy (sort_queue_item.mcast, msg, msg_len);
 
3803
                sort_queue_item.msg_len = msg_len;
 
3804
 
3654
3805
                if (sq_lt_compare (instance->my_high_seq_received,
3655
3806
                        mcast_header.seq)) {
3656
3807
                        instance->my_high_seq_received = mcast_header.seq;
3670
3821
 
3671
3822
static int message_handler_memb_merge_detect (
3672
3823
        struct totemsrp_instance *instance,
3673
 
        void *msg,
3674
 
        int msg_len,
 
3824
        const void *msg,
 
3825
        size_t msg_len,
3675
3826
        int endian_conversion_needed)
3676
3827
{
3677
 
        struct memb_merge_detect *memb_merge_detect = (struct memb_merge_detect *)msg;
 
3828
        struct memb_merge_detect memb_merge_detect;
 
3829
 
3678
3830
 
3679
3831
        if (endian_conversion_needed) {
3680
 
                memb_merge_detect_endian_convert (msg, msg);
 
3832
                memb_merge_detect_endian_convert (msg, &memb_merge_detect);
 
3833
        } else {
 
3834
                memcpy (&memb_merge_detect, msg,
 
3835
                        sizeof (struct memb_merge_detect));
3681
3836
        }
3682
3837
 
3683
3838
        /*
3684
3839
         * do nothing if this is a merge detect from this configuration
3685
3840
         */
3686
 
        if (memcmp (&instance->my_ring_id, &memb_merge_detect->ring_id,
 
3841
        if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id,
3687
3842
                sizeof (struct memb_ring_id)) == 0) {
3688
3843
 
3689
3844
                return (0);
3694
3849
         */
3695
3850
        switch (instance->memb_state) {
3696
3851
        case MEMB_STATE_OPERATIONAL:
3697
 
                memb_set_merge (&memb_merge_detect->system_from, 1,
 
3852
                memb_set_merge (&memb_merge_detect.system_from, 1,
3698
3853
                        instance->my_proc_list, &instance->my_proc_list_entries);
3699
3854
                memb_state_gather_enter (instance, 9);
3700
3855
                break;
3701
3856
 
3702
3857
        case MEMB_STATE_GATHER:
3703
3858
                if (!memb_set_subset (
3704
 
                        &memb_merge_detect->system_from,
 
3859
                        &memb_merge_detect.system_from,
3705
3860
                        1,
3706
3861
                        instance->my_proc_list,
3707
3862
                        instance->my_proc_list_entries)) {
3708
3863
 
3709
 
                        memb_set_merge (&memb_merge_detect->system_from, 1,
 
3864
                        memb_set_merge (&memb_merge_detect.system_from, 1,
3710
3865
                                instance->my_proc_list, &instance->my_proc_list_entries);
3711
3866
                        memb_state_gather_enter (instance, 10);
3712
3867
                        return (0);
3726
3881
 
3727
3882
static int memb_join_process (
3728
3883
        struct totemsrp_instance *instance,
3729
 
        struct memb_join *memb_join)
 
3884
        const struct memb_join *memb_join)
3730
3885
{
3731
3886
        unsigned char *commit_token_storage[TOKEN_SIZE_MAX];
3732
3887
        struct memb_commit_token *my_commit_token =
3748
3903
                instance->my_failed_list_entries)) {
3749
3904
 
3750
3905
                memb_consensus_set (instance, &memb_join->system_from);
3751
 
        
 
3906
 
3752
3907
                if (memb_consensus_agreed (instance) &&
3753
3908
                        memb_lowest_in_config (instance)) {
3754
3909
 
3755
3910
                        memb_state_commit_token_create (instance, my_commit_token);
3756
 
        
 
3911
 
3757
3912
                        memb_state_commit_enter (instance, my_commit_token);
3758
3913
                } else {
3759
3914
                        return (0);
3798
3953
        return (0); /* gather not entered */
3799
3954
}
3800
3955
 
3801
 
static void memb_join_endian_convert (struct memb_join *in, struct memb_join *out)
 
3956
static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
3802
3957
{
3803
3958
        int i;
3804
3959
        struct srp_addr *in_proc_list;
3827
3982
        }
3828
3983
}
3829
3984
 
3830
 
static void memb_commit_token_endian_convert (struct memb_commit_token *in, struct memb_commit_token *out)
 
3985
static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out)
3831
3986
{
3832
3987
        int i;
3833
3988
        struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token;
3866
4021
        }
3867
4022
}
3868
4023
 
3869
 
static void orf_token_endian_convert (struct orf_token *in, struct orf_token *out)
 
4024
static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out)
3870
4025
{
3871
4026
        int i;
3872
4027
 
3890
4045
        }
3891
4046
}
3892
4047
 
3893
 
static void mcast_endian_convert (struct mcast *in, struct mcast *out)
 
4048
static void mcast_endian_convert (const struct mcast *in, struct mcast *out)
3894
4049
{
3895
4050
        out->header.type = in->header.type;
3896
4051
        out->header.endian_detector = ENDIAN_LOCAL;
3907
4062
}
3908
4063
 
3909
4064
static void memb_merge_detect_endian_convert (
3910
 
        struct memb_merge_detect *in,
 
4065
        const struct memb_merge_detect *in,
3911
4066
        struct memb_merge_detect *out)
3912
4067
{
3913
4068
        out->header.type = in->header.type;
3920
4075
 
3921
4076
static int message_handler_memb_join (
3922
4077
        struct totemsrp_instance *instance,
3923
 
        void *msg,
3924
 
        int msg_len,
 
4078
        const void *msg,
 
4079
        size_t msg_len,
3925
4080
        int endian_conversion_needed)
3926
4081
{
3927
 
        struct memb_join *memb_join;
 
4082
        const struct memb_join *memb_join;
3928
4083
        struct memb_join *memb_join_convert = alloca (msg_len);
3929
4084
        int gather_entered;
3930
4085
 
3933
4088
                memb_join_endian_convert (msg, memb_join_convert);
3934
4089
 
3935
4090
        } else {
3936
 
                memb_join = (struct memb_join *)msg;
 
4091
                memb_join = msg;
 
4092
        }
 
4093
        /*
 
4094
         * If the process paused because it wasn't scheduled in a timely
 
4095
         * fashion, flush the join messages because they may be queued
 
4096
         * entries
 
4097
         */
 
4098
        if (pause_flush (instance)) {
 
4099
                return (0);
3937
4100
        }
3938
4101
 
3939
4102
        if (instance->token_ring_id_seq < memb_join->ring_seq) {
3951
4114
                case MEMB_STATE_GATHER:
3952
4115
                        memb_join_process (instance, memb_join);
3953
4116
                        break;
3954
 
        
 
4117
 
3955
4118
                case MEMB_STATE_COMMIT:
3956
4119
                        if (memb_set_subset (&memb_join->system_from,
3957
4120
                                1,
3985
4148
 
3986
4149
static int message_handler_memb_commit_token (
3987
4150
        struct totemsrp_instance *instance,
3988
 
        void *msg,
3989
 
        int msg_len,
 
4151
        const void *msg,
 
4152
        size_t msg_len,
3990
4153
        int endian_conversion_needed)
3991
4154
{
3992
4155
        struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4001
4164
                "got commit token\n");
4002
4165
 
4003
4166
        if (endian_conversion_needed) {
4004
 
                memb_commit_token = memb_commit_token_convert;
4005
 
                memb_commit_token_endian_convert (msg, memb_commit_token);
 
4167
                memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4006
4168
        } else {
4007
 
                memb_commit_token = (struct memb_commit_token *)msg;
 
4169
                memcpy (memb_commit_token_convert, msg, msg_len);
4008
4170
        }
 
4171
        memb_commit_token = memb_commit_token_convert;
4009
4172
        addr = (struct srp_addr *)memb_commit_token->end_of_commit_token;
4010
4173
        memb_list = (struct memb_commit_token_memb_entry *)(addr + memb_commit_token->addr_entries);
4011
4174
 
4023
4186
                        memb_set_subtract (sub, &sub_entries,
4024
4187
                                instance->my_proc_list, instance->my_proc_list_entries,
4025
4188
                                instance->my_failed_list, instance->my_failed_list_entries);
4026
 
                        
 
4189
 
4027
4190
                        if (memb_set_equal (addr,
4028
4191
                                memb_commit_token->addr_entries,
4029
4192
                                sub,
4051
4214
 
4052
4215
                case MEMB_STATE_RECOVERY:
4053
4216
                        if (totemip_equal (&instance->my_id.addr[0], &instance->my_ring_id.rep)) {
4054
 
                                log_printf (instance->totemsrp_log_level_notice,
 
4217
                                log_printf (instance->totemsrp_log_level_debug,
4055
4218
                                        "Sending initial ORF token\n");
4056
4219
 
4057
4220
                                // TODO convert instead of initiate
4066
4229
 
4067
4230
static int message_handler_token_hold_cancel (
4068
4231
        struct totemsrp_instance *instance,
4069
 
        void *msg,
4070
 
        int msg_len,
 
4232
        const void *msg,
 
4233
        size_t msg_len,
4071
4234
        int endian_conversion_needed)
4072
4235
{
4073
 
        struct token_hold_cancel *token_hold_cancel = (struct token_hold_cancel *)msg;
 
4236
        const struct token_hold_cancel *token_hold_cancel = msg;
4074
4237
 
4075
4238
        if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id,
4076
4239
                sizeof (struct memb_ring_id)) == 0) {
4085
4248
 
4086
4249
void main_deliver_fn (
4087
4250
        void *context,
4088
 
        void *msg,
4089
 
        int msg_len)
 
4251
        const void *msg,
 
4252
        unsigned int msg_len)
4090
4253
{
4091
 
        struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
4092
 
        struct message_header *message_header = (struct message_header *)msg;
 
4254
        struct totemsrp_instance *instance = context;
 
4255
        const struct message_header *message_header = msg;
4093
4256
 
4094
4257
        if (msg_len < sizeof (struct message_header)) {
4095
 
                log_printf (instance->totemsrp_log_level_security, "Received message is too short...  ignoring %d.\n", msg_len);
 
4258
                log_printf (instance->totemsrp_log_level_security,
 
4259
                            "Received message is too short...  ignoring %u.\n",
 
4260
                            (unsigned int)msg_len);
4096
4261
                return;
4097
4262
        }
4098
 
        
 
4263
 
4099
4264
        if ((int)message_header->type >= totemsrp_message_handlers.count) {
4100
4265
                log_printf (instance->totemsrp_log_level_security, "Type of received message is wrong...  ignoring %d.\n", (int)message_header->type);
4101
4266
                return;
4102
4267
        }
4103
 
        
 
4268
 
4104
4269
        /*
4105
4270
         * Handle incoming message
4106
4271
         */
4113
4278
 
4114
4279
void main_iface_change_fn (
4115
4280
        void *context,
4116
 
        struct totem_ip_address *iface_addr,
 
4281
        const struct totem_ip_address *iface_addr,
4117
4282
        unsigned int iface_no)
4118
4283
{
4119
 
        struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
 
4284
        struct totemsrp_instance *instance = context;
4120
4285
 
4121
4286
        totemip_copy (&instance->my_id.addr[iface_no], iface_addr);
4122
4287
        assert (instance->my_id.addr[iface_no].nodeid);
4126
4291
        if (instance->iface_changes++ == 0) {
4127
4292
                memb_ring_id_create_or_load (instance, &instance->my_ring_id);
4128
4293
                log_printf (
4129
 
                        instance->totemsrp_log_level_notice,
 
4294
                        instance->totemsrp_log_level_debug,
4130
4295
                        "Created or loaded sequence id %lld.%s for this ring.\n",
4131
4296
                        instance->my_ring_id.seq,
4132
4297
                        totemip_print (&instance->my_ring_id.rep));
4140
4305
void totemsrp_net_mtu_adjust (struct totem_config *totem_config) {
4141
4306
        totem_config->net_mtu -= sizeof (struct mcast);
4142
4307
}
4143
 
 
4144
 
 
4145