430
441
int totemsrp_log_level_debug;
432
void (*totemsrp_log_printf) (char *file, int line, int level, char *format, ...) __attribute__((format(printf, 4, 5)));
443
int totemsrp_subsys_id;
445
void (*totemsrp_log_printf) (
446
unsigned int rec_ident,
447
const char *function,
450
const char *format, ...)__attribute__((format(printf, 5, 6)));;
434
452
enum memb_state memb_state;
436
454
//TODO struct srp_addr next_memb;
438
char iov_buffer[FRAME_SIZE_MAX];
440
struct iovec totemsrp_iov_recv;
442
poll_handle totemsrp_poll_handle;
445
* Function called when new message received
447
int (*totemsrp_recv) (char *group, struct iovec *iovec, int iov_len);
456
hdb_handle_t totemsrp_poll_handle;
449
458
struct totem_ip_address mcast_address;
451
460
void (*totemsrp_deliver_fn) (
452
461
unsigned int nodeid,
463
unsigned int msg_len,
455
464
int endian_conversion_required);
457
466
void (*totemsrp_confchg_fn) (
458
467
enum totem_configuration_type configuration_type,
459
unsigned int *member_list, int member_list_entries,
460
unsigned int *left_list, int left_list_entries,
461
unsigned int *joined_list, int joined_list_entries,
462
struct memb_ring_id *ring_id);
468
const unsigned int *member_list, size_t member_list_entries,
469
const unsigned int *left_list, size_t left_list_entries,
470
const unsigned int *joined_list, size_t joined_list_entries,
471
const struct memb_ring_id *ring_id);
464
473
int global_seqno;
511
522
static int message_handler_orf_token (
512
523
struct totemsrp_instance *instance,
515
526
int endian_conversion_needed);
517
528
static int message_handler_mcast (
518
529
struct totemsrp_instance *instance,
521
532
int endian_conversion_needed);
523
534
static int message_handler_memb_merge_detect (
524
535
struct totemsrp_instance *instance,
527
538
int endian_conversion_needed);
529
540
static int message_handler_memb_join (
530
541
struct totemsrp_instance *instance,
533
544
int endian_conversion_needed);
535
546
static int message_handler_memb_commit_token (
536
547
struct totemsrp_instance *instance,
539
550
int endian_conversion_needed);
541
552
static int message_handler_token_hold_cancel (
542
553
struct totemsrp_instance *instance,
545
556
int endian_conversion_needed);
558
static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
560
static unsigned int main_msgs_missing (void);
562
static void main_token_seqid_get (
565
unsigned int *token_is);
567
static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src);
569
static void srp_addr_to_nodeid (
570
unsigned int *nodeid_out,
571
struct srp_addr *srp_addr_in,
572
unsigned int entries);
574
static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
576
static void memb_leave_message_send (struct totemsrp_instance *instance);
547
578
static void memb_ring_id_create_or_load (struct totemsrp_instance *, struct memb_ring_id *);
549
580
static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
554
585
static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
556
587
static void memb_ring_id_set_and_store (struct totemsrp_instance *instance,
557
struct memb_ring_id *ring_id);
588
const struct memb_ring_id *ring_id);
558
589
static void memb_state_commit_token_update (struct totemsrp_instance *instance, struct memb_commit_token *commit_token);
559
590
static void memb_state_commit_token_target_set (struct totemsrp_instance *instance, struct memb_commit_token *commit_token);
560
591
static int memb_state_commit_token_send (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
561
592
static void memb_state_commit_token_create (struct totemsrp_instance *instance, struct memb_commit_token *commit_token);
562
593
static int token_hold_cancel_send (struct totemsrp_instance *instance);
563
static void orf_token_endian_convert (struct orf_token *in, struct orf_token *out);
564
static void memb_commit_token_endian_convert (struct memb_commit_token *in, struct memb_commit_token *out);
565
static void memb_join_endian_convert (struct memb_join *in, struct memb_join *out);
566
static void mcast_endian_convert (struct mcast *in, struct mcast *out);
594
static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
595
static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
596
static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
597
static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
567
598
static void memb_merge_detect_endian_convert (
568
struct memb_merge_detect *in,
599
const struct memb_merge_detect *in,
569
600
struct memb_merge_detect *out);
570
static void srp_addr_copy_endian_convert (struct srp_addr *out, struct srp_addr *in);
601
static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in);
571
602
static void timer_function_orf_token_timeout (void *data);
603
static void timer_function_pause_timeout (void *data);
572
604
static void timer_function_heartbeat_timeout (void *data);
573
605
static void timer_function_token_retransmit_timeout (void *data);
574
606
static void timer_function_token_hold_retransmit_timeout (void *data);
651
unsigned int main_msgs_missing (void)
686
static unsigned int main_msgs_missing (void)
692
static int pause_flush (struct totemsrp_instance *instance)
696
uint64_t timestamp_msec;
699
gettimeofday (&now, NULL);
700
now_msec = ((now.tv_sec * 1000ULL) + (now.tv_usec / 1000ULL));
701
timestamp_msec = ((instance->pause_timestamp.tv_sec * 1000ULL) + (instance->pause_timestamp.tv_usec/1000ULL));
703
if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) {
704
log_printf (instance->totemsrp_log_level_notice,
705
"Process pause detected for %d ms, flushing membership messages.\n", (unsigned int)(now_msec - timestamp_msec));
707
* -1 indicates an error from recvmsg
710
res = totemrrp_mcast_recv_empty (instance->totemrrp_handle);
658
717
* Exported interfaces
660
719
int totemsrp_initialize (
661
poll_handle poll_handle,
662
totemsrp_handle *handle,
720
hdb_handle_t poll_handle,
721
hdb_handle_t *handle,
663
722
struct totem_config *totem_config,
665
724
void (*deliver_fn) (
666
725
unsigned int nodeid,
727
unsigned int msg_len,
669
728
int endian_conversion_required),
671
730
void (*confchg_fn) (
672
731
enum totem_configuration_type configuration_type,
673
unsigned int *member_list, int member_list_entries,
674
unsigned int *left_list, int left_list_entries,
675
unsigned int *joined_list, int joined_list_entries,
676
struct memb_ring_id *ring_id))
732
const unsigned int *member_list, size_t member_list_entries,
733
const unsigned int *left_list, size_t left_list_entries,
734
const unsigned int *joined_list, size_t joined_list_entries,
735
const struct memb_ring_id *ring_id))
678
737
struct totemsrp_instance *instance;
679
738
unsigned int res;
717
784
totemip_copy (&instance->mcast_address, &totem_config->interfaces[0].mcast_addr);
719
memset (instance->iov_buffer, 0, FRAME_SIZE_MAX);
722
787
* Display totem configuration
724
log_printf (instance->totemsrp_log_level_notice,
789
log_printf (instance->totemsrp_log_level_debug,
725
790
"Token Timeout (%d ms) retransmit timeout (%d ms)\n",
726
791
totem_config->token_timeout, totem_config->token_retransmit_timeout);
727
log_printf (instance->totemsrp_log_level_notice,
792
log_printf (instance->totemsrp_log_level_debug,
728
793
"token hold (%d ms) retransmits before loss (%d retrans)\n",
729
794
totem_config->token_hold_timeout, totem_config->token_retransmits_before_loss_const);
730
log_printf (instance->totemsrp_log_level_notice,
795
log_printf (instance->totemsrp_log_level_debug,
731
796
"join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)\n",
732
797
totem_config->join_timeout,
733
798
totem_config->send_join_timeout,
734
799
totem_config->consensus_timeout,
736
801
totem_config->merge_timeout);
737
log_printf (instance->totemsrp_log_level_notice,
802
log_printf (instance->totemsrp_log_level_debug,
738
803
"downcheck (%d ms) fail to recv const (%d msgs)\n",
739
804
totem_config->downcheck_timeout, totem_config->fail_to_recv_const);
740
log_printf (instance->totemsrp_log_level_notice,
805
log_printf (instance->totemsrp_log_level_debug,
741
806
"seqno unchanged const (%d rotations) Maximum network MTU %d\n", totem_config->seqno_unchanged_const, totem_config->net_mtu);
743
log_printf (instance->totemsrp_log_level_notice,
808
log_printf (instance->totemsrp_log_level_debug,
744
809
"window size per rotation (%d messages) maximum messages per rotation (%d messages)\n",
745
810
totem_config->window_size, totem_config->max_messages);
747
log_printf (instance->totemsrp_log_level_notice,
812
log_printf (instance->totemsrp_log_level_debug,
748
813
"send threads (%d threads)\n", totem_config->threads);
749
log_printf (instance->totemsrp_log_level_notice,
814
log_printf (instance->totemsrp_log_level_debug,
750
815
"RRP token expired timeout (%d ms)\n",
751
816
totem_config->rrp_token_expired_timeout);
752
log_printf (instance->totemsrp_log_level_notice,
817
log_printf (instance->totemsrp_log_level_debug,
753
818
"RRP token problem counter (%d ms)\n",
754
819
totem_config->rrp_problem_count_timeout);
755
log_printf (instance->totemsrp_log_level_notice,
820
log_printf (instance->totemsrp_log_level_debug,
756
821
"RRP threshold (%d problem count)\n",
757
822
totem_config->rrp_problem_count_threshold);
758
log_printf (instance->totemsrp_log_level_notice,
823
log_printf (instance->totemsrp_log_level_debug,
759
824
"RRP mode set to %s.\n", instance->totem_config->rrp_mode);
761
log_printf (instance->totemsrp_log_level_notice,
826
log_printf (instance->totemsrp_log_level_debug,
762
827
"heartbeat_failures_allowed (%d)\n", totem_config->heartbeat_failures_allowed);
763
log_printf (instance->totemsrp_log_level_notice,
828
log_printf (instance->totemsrp_log_level_debug,
764
829
"max_network_delay (%d ms)\n", totem_config->max_network_delay);
767
queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
832
cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
768
833
sizeof (struct message_item));
770
835
sq_init (&instance->regular_sort_queue,
780
845
instance->totemsrp_confchg_fn = confchg_fn;
781
846
instance->use_heartbeat = 1;
848
gettimeofday (&instance->pause_timestamp, NULL);
783
850
if ( totem_config->heartbeat_failures_allowed == 0 ) {
784
log_printf (instance->totemsrp_log_level_notice,
851
log_printf (instance->totemsrp_log_level_debug,
785
852
"HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0\n");
786
853
instance->use_heartbeat = 0;
789
856
if (instance->use_heartbeat) {
790
instance->heartbeat_timeout
791
= (totem_config->heartbeat_failures_allowed) * totem_config->token_retransmit_timeout
857
instance->heartbeat_timeout
858
= (totem_config->heartbeat_failures_allowed) * totem_config->token_retransmit_timeout
792
859
+ totem_config->max_network_delay;
794
861
if (instance->heartbeat_timeout >= totem_config->token_timeout) {
795
log_printf (instance->totemsrp_log_level_notice,
796
"total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)\n",
862
log_printf (instance->totemsrp_log_level_debug,
863
"total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)\n",
797
864
instance->heartbeat_timeout,
798
865
totem_config->token_timeout);
799
log_printf (instance->totemsrp_log_level_notice,
866
log_printf (instance->totemsrp_log_level_debug,
800
867
"heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay\n");
801
log_printf (instance->totemsrp_log_level_notice,
868
log_printf (instance->totemsrp_log_level_debug,
802
869
"heartbeat timeout should be less than the token timeout. HeartBeat is Diabled !!\n");
803
870
instance->use_heartbeat = 0;
806
log_printf (instance->totemsrp_log_level_notice,
873
log_printf (instance->totemsrp_log_level_debug,
807
874
"total heartbeat_timeout (%d ms)\n", instance->heartbeat_timeout);
811
878
totemrrp_initialize (
813
880
&instance->totemrrp_handle,
1415
1521
* Timers used for various states of the membership algorithm
1523
static void timer_function_pause_timeout (void *data)
1525
struct totemsrp_instance *instance = data;
1527
gettimeofday (&instance->pause_timestamp, NULL);
1528
reset_pause_timeout (instance);
1417
1531
static void timer_function_orf_token_timeout (void *data)
1419
struct totemsrp_instance *instance = (struct totemsrp_instance *)data;
1533
struct totemsrp_instance *instance = data;
1421
1535
switch (instance->memb_state) {
1422
1536
case MEMB_STATE_OPERATIONAL:
1423
log_printf (instance->totemsrp_log_level_notice,
1537
log_printf (instance->totemsrp_log_level_debug,
1424
1538
"The token was lost in the OPERATIONAL state.\n");
1539
log_printf (instance->totemsrp_log_level_notice,
1540
"A processor failed, forming new configuration.\n");
1425
1541
totemrrp_iface_check (instance->totemrrp_handle);
1426
1542
memb_state_gather_enter (instance, 2);
1429
1545
case MEMB_STATE_GATHER:
1430
log_printf (instance->totemsrp_log_level_notice,
1546
log_printf (instance->totemsrp_log_level_debug,
1431
1547
"The consensus timeout expired.\n");
1432
1548
memb_state_consensus_timeout_expired (instance);
1433
1549
memb_state_gather_enter (instance, 3);
1436
1552
case MEMB_STATE_COMMIT:
1437
log_printf (instance->totemsrp_log_level_notice,
1553
log_printf (instance->totemsrp_log_level_debug,
1438
1554
"The token was lost in the COMMIT state.\n");
1439
1555
memb_state_gather_enter (instance, 4);
1442
1558
case MEMB_STATE_RECOVERY:
1443
log_printf (instance->totemsrp_log_level_notice,
1559
log_printf (instance->totemsrp_log_level_debug,
1444
1560
"The token was lost in the RECOVERY state.\n");
1445
1561
ring_state_restore (instance);
1446
1562
memb_state_gather_enter (instance, 5);
1513
1629
if (res != 0) {
1516
recovery_message_item = (struct sort_queue_item *)ptr;
1632
recovery_message_item = ptr;
1519
1635
* Convert recovery message into regular message
1521
if (recovery_message_item->iov_len > 1) {
1522
mcast = recovery_message_item->iovec[1].iov_base;
1523
memcpy (®ular_message_item.iovec[0],
1524
&recovery_message_item->iovec[1],
1525
sizeof (struct iovec) * recovery_message_item->iov_len);
1637
mcast = recovery_message_item->mcast;
1638
if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) {
1640
* Message is a recovery message encapsulated
1641
* in a new ring message
1643
regular_message_item.mcast =
1644
(struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
1645
regular_message_item.msg_len =
1646
recovery_message_item->msg_len - sizeof (struct mcast);
1647
mcast = regular_message_item.mcast;
1527
mcast = recovery_message_item->iovec[0].iov_base;
1528
if (mcast->header.encapsulated == 1) {
1530
* Message is a recovery message encapsulated
1531
* in a new ring message
1533
regular_message_item.iovec[0].iov_base =
1534
recovery_message_item->iovec[0].iov_base + sizeof (struct mcast);
1535
regular_message_item.iovec[0].iov_len =
1536
recovery_message_item->iovec[0].iov_len - sizeof (struct mcast);
1537
regular_message_item.iov_len = 1;
1538
mcast = regular_message_item.iovec[0].iov_base;
1540
continue; /* TODO this case shouldn't happen */
1542
* Message is originated on new ring and not
1545
regular_message_item.iovec[0].iov_base =
1546
recovery_message_item->iovec[0].iov_base;
1547
regular_message_item.iovec[0].iov_len =
1548
recovery_message_item->iovec[0].iov_len;
1650
* TODO this case shouldn't happen
1552
1655
log_printf (instance->totemsrp_log_level_debug,
1921
2028
low_ring_aru + i, &ptr);
1922
2029
if (res != 0) {
1923
2030
strcat (not_originated, seqno_string_hex);
1926
strcat (is_originated, seqno_string_hex);
1927
sort_queue_item = ptr;
1928
assert (sort_queue_item->iov_len > 0);
1929
assert (sort_queue_item->iov_len <= MAXIOVS);
1930
messages_originated++;
1931
memset (&message_item, 0, sizeof (struct message_item));
1933
message_item.mcast = malloc (sizeof (struct mcast));
1934
assert (message_item.mcast);
1935
message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
1936
srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
1937
message_item.mcast->header.encapsulated = MESSAGE_ENCAPSULATED;
1938
message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
1939
assert (message_item.mcast->header.nodeid);
1940
message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
1941
memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
1942
sizeof (struct memb_ring_id));
1943
message_item.iov_len = sort_queue_item->iov_len;
1944
memcpy (&message_item.iovec, &sort_queue_item->iovec,
1945
sizeof (struct iovec) * sort_queue_item->iov_len);
1946
queue_item_add (&instance->retrans_message_queue, &message_item);
1948
log_printf (instance->totemsrp_log_level_notice,
2033
strcat (is_originated, seqno_string_hex);
2034
sort_queue_item = ptr;
2035
messages_originated++;
2036
memset (&message_item, 0, sizeof (struct message_item));
2038
message_item.mcast = malloc (10000);
2039
assert (message_item.mcast);
2040
message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
2041
srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2042
message_item.mcast->header.encapsulated = MESSAGE_ENCAPSULATED;
2043
message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
2044
assert (message_item.mcast->header.nodeid);
2045
message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
2046
memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
2047
sizeof (struct memb_ring_id));
2048
message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
2049
memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
2050
sort_queue_item->mcast,
2051
sort_queue_item->msg_len);
2052
cs_queue_item_add (&instance->retrans_message_queue, &message_item);
2054
log_printf (instance->totemsrp_log_level_debug,
1949
2055
"Originated %d messages in RECOVERY.\n", messages_originated);
1950
2056
strcat (not_originated, "\n");
1951
2057
strcat (is_originated, "\n");
1952
log_printf (instance->totemsrp_log_level_notice, is_originated);
1953
log_printf (instance->totemsrp_log_level_notice, not_originated);
2058
log_printf (instance->totemsrp_log_level_debug, "%s", is_originated);
2059
log_printf (instance->totemsrp_log_level_debug, "%s", not_originated);
1954
2060
goto originated;
1957
log_printf (instance->totemsrp_log_level_notice,
2063
log_printf (instance->totemsrp_log_level_debug,
1958
2064
"Did not need to originate any messages in recovery.\n");
2040
2143
message_item.mcast->guarantee = guarantee;
2041
2144
srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2146
addr = (char *)message_item.mcast;
2147
addr_idx = sizeof (struct mcast);
2043
2148
for (i = 0; i < iov_len; i++) {
2045
message_item.iovec[i].iov_base = malloc (iovec[i].iov_len);
2047
if (message_item.iovec[i].iov_base == 0) {
2051
memcpy (message_item.iovec[i].iov_base, iovec[i].iov_base,
2054
message_item.iovec[i].iov_len = iovec[i].iov_len;
2149
memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2150
addr_idx += iovec[i].iov_len;
2057
message_item.iov_len = iov_len;
2153
message_item.msg_len = addr_idx;
2059
2155
log_printf (instance->totemsrp_log_level_debug, "mcasted message added to pending queue\n");
2060
queue_item_add (&instance->new_message_queue, &message_item);
2156
cs_queue_item_add (&instance->new_message_queue, &message_item);
2062
2158
hdb_handle_put (&totemsrp_instance_database, handle);
2066
for (j = 0; j < i; j++) {
2067
free (message_item.iovec[j].iov_base);
2070
free(message_item.mcast);
2073
2162
hdb_handle_put (&totemsrp_instance_database, handle);
2290
2378
* Build IO vector
2292
2380
memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
2293
sort_queue_item.iovec[0].iov_base = message_item->mcast;
2294
sort_queue_item.iovec[0].iov_len = sizeof (struct mcast);
2296
mcast = sort_queue_item.iovec[0].iov_base;
2298
memcpy (&sort_queue_item.iovec[1], message_item->iovec,
2299
message_item->iov_len * sizeof (struct iovec));
2381
sort_queue_item.mcast = message_item->mcast;
2382
sort_queue_item.msg_len = message_item->msg_len;
2384
mcast = sort_queue_item.mcast;
2301
2386
memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2303
sort_queue_item.iov_len = message_item->iov_len + 1;
2305
assert (sort_queue_item.iov_len < 16);
2308
2389
* Add message to retransmit queue
2310
2391
sort_queue_item_ptr = sq_item_add (sort_queue,
2311
2392
&sort_queue_item, message_item->mcast->seq);
2313
totemrrp_mcast_noflush_send (instance->totemrrp_handle,
2314
sort_queue_item_ptr->iovec,
2315
sort_queue_item_ptr->iov_len);
2394
totemrrp_mcast_noflush_send (
2395
instance->totemrrp_handle,
2396
message_item->mcast,
2397
message_item->msg_len);
2318
2400
* Delete item from pending queue
2320
queue_item_remove (mcast_queue);
2402
cs_queue_item_remove (mcast_queue);
2323
2405
* If messages mcasted, deliver any new messages to totempg
2655
2726
instance->my_received_flg =
2656
2727
(instance->my_aru == instance->my_high_seq_received);
2729
memb_list[commit_token->memb_index].received_flg = instance->my_received_flg;
2658
2731
memb_list[commit_token->memb_index].high_delivered = instance->my_high_delivered;
2659
memb_list[commit_token->memb_index].received_flg = instance->my_received_flg;
2733
* find high aru up to current memb_index for all matching ring ids
2734
* if any ring id matching memb_index has aru less then high aru set
2735
* received flag for that entry to false
2737
high_aru = memb_list[commit_token->memb_index].aru;
2738
for (i = 0; i <= commit_token->memb_index; i++) {
2739
if (memcmp (&memb_list[commit_token->memb_index].ring_id,
2740
&memb_list[i].ring_id,
2741
sizeof (struct memb_ring_id)) == 0) {
2743
if (sq_lt_compare (high_aru, memb_list[i].aru)) {
2744
high_aru = memb_list[i].aru;
2749
for (i = 0; i <= commit_token->memb_index; i++) {
2750
if (memcmp (&memb_list[commit_token->memb_index].ring_id,
2751
&memb_list[i].ring_id,
2752
sizeof (struct memb_ring_id)) == 0) {
2754
if (sq_lt_compare (memb_list[i].aru, high_aru)) {
2755
memb_list[i].received_flg = 0;
2756
if (i == commit_token->memb_index) {
2757
instance->my_received_flg = 0;
2661
2763
commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
2662
2764
commit_token->memb_index += 1;
2686
2788
struct totemsrp_instance *instance,
2687
2789
struct memb_commit_token *commit_token)
2690
2791
struct srp_addr *addr;
2691
2792
struct memb_commit_token_memb_entry *memb_list;
2793
unsigned int commit_token_size;
2693
2795
addr = (struct srp_addr *)commit_token->end_of_commit_token;
2694
2796
memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2696
2798
commit_token->token_seq++;
2697
iovec.iov_base = commit_token;
2698
iovec.iov_len = sizeof (struct memb_commit_token) +
2799
commit_token_size = sizeof (struct memb_commit_token) +
2699
2800
((sizeof (struct srp_addr) +
2700
2801
sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
2702
2803
* Make a copy for retransmission if necessary
2704
memcpy (instance->orf_token_retransmit, commit_token, iovec.iov_len);
2705
instance->orf_token_retransmit_size = iovec.iov_len;
2805
memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
2806
instance->orf_token_retransmit_size = commit_token_size;
2707
2808
totemrrp_token_send (instance->totemrrp_handle,
2712
2813
* Request retransmission of the commit token in case it is lost
2797
2898
static void memb_join_message_send (struct totemsrp_instance *instance)
2799
struct memb_join memb_join;
2800
struct iovec iovec[3];
2900
char memb_join_data[10000];
2901
struct memb_join *memb_join = (struct memb_join *)memb_join_data;
2903
unsigned int addr_idx;
2803
memb_join.header.type = MESSAGE_TYPE_MEMB_JOIN;
2804
memb_join.header.endian_detector = ENDIAN_LOCAL;
2805
memb_join.header.encapsulated = 0;
2806
memb_join.header.nodeid = instance->my_id.addr[0].nodeid;
2807
assert (memb_join.header.nodeid);
2905
memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
2906
memb_join->header.endian_detector = ENDIAN_LOCAL;
2907
memb_join->header.encapsulated = 0;
2908
memb_join->header.nodeid = instance->my_id.addr[0].nodeid;
2909
assert (memb_join->header.nodeid);
2809
2911
assert (srp_addr_equal (&instance->my_proc_list[0], &instance->my_proc_list[1]) == 0);
2810
memb_join.ring_seq = instance->my_ring_id.seq;
2811
memb_join.proc_list_entries = instance->my_proc_list_entries;
2812
memb_join.failed_list_entries = instance->my_failed_list_entries;
2813
srp_addr_copy (&memb_join.system_from, &instance->my_id);
2815
iovec[0].iov_base = &memb_join;
2816
iovec[0].iov_len = sizeof (struct memb_join);
2817
iovec[1].iov_base = &instance->my_proc_list;
2818
iovec[1].iov_len = instance->my_proc_list_entries *
2819
sizeof (struct srp_addr);
2820
if (instance->my_failed_list_entries == 0) {
2824
iovec[2].iov_base = instance->my_failed_list;
2825
iovec[2].iov_len = instance->my_failed_list_entries *
2826
sizeof (struct srp_addr);
2829
if (instance->totem_config->send_join_timeout) {
2830
usleep (random() % (instance->totem_config->send_join_timeout * 1000));
2833
totemrrp_mcast_flush_send (
2834
instance->totemrrp_handle,
2839
static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
2912
memb_join->ring_seq = instance->my_ring_id.seq;
2913
memb_join->proc_list_entries = instance->my_proc_list_entries;
2914
memb_join->failed_list_entries = instance->my_failed_list_entries;
2915
srp_addr_copy (&memb_join->system_from, &instance->my_id);
2918
* This mess adds the joined and failed processor lists into the join
2921
addr = (char *)memb_join;
2922
addr_idx = sizeof (struct memb_join);
2923
memcpy (&addr[addr_idx],
2924
instance->my_proc_list,
2925
instance->my_proc_list_entries *
2926
sizeof (struct srp_addr));
2928
instance->my_proc_list_entries *
2929
sizeof (struct srp_addr);
2930
memcpy (&addr[addr_idx],
2931
instance->my_failed_list,
2932
instance->my_failed_list_entries *
2933
sizeof (struct srp_addr));
2935
instance->my_failed_list_entries *
2936
sizeof (struct srp_addr);
2939
if (instance->totem_config->send_join_timeout) {
2940
usleep (random() % (instance->totem_config->send_join_timeout * 1000));
2943
totemrrp_mcast_flush_send (
2944
instance->totemrrp_handle,
2949
static void memb_leave_message_send (struct totemsrp_instance *instance)
2951
char memb_join_data[10000];
2952
struct memb_join *memb_join = (struct memb_join *)memb_join_data;
2954
unsigned int addr_idx;
2955
int active_memb_entries;
2956
struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
2958
log_printf (instance->totemsrp_log_level_debug,
2959
"sending join/leave message\n");
2962
* add us to the failed list, and remove us from
2966
&instance->my_id, 1,
2967
instance->my_failed_list, &instance->my_failed_list_entries);
2969
memb_set_subtract (active_memb, &active_memb_entries,
2970
instance->my_proc_list, instance->my_proc_list_entries,
2971
&instance->my_id, 1);
2974
memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
2975
memb_join->header.endian_detector = ENDIAN_LOCAL;
2976
memb_join->header.encapsulated = 0;
2977
memb_join->header.nodeid = LEAVE_DUMMY_NODEID;
2979
memb_join->ring_seq = instance->my_ring_id.seq;
2980
memb_join->proc_list_entries = active_memb_entries;
2981
memb_join->failed_list_entries = instance->my_failed_list_entries;
2982
srp_addr_copy (&memb_join->system_from, &instance->my_id);
2983
memb_join->system_from.addr[0].nodeid = LEAVE_DUMMY_NODEID;
2985
// TODO: CC Maybe use the actual join send routine.
2987
* This mess adds the joined and failed processor lists into the join
2990
addr = (char *)memb_join;
2991
addr_idx = sizeof (struct memb_join);
2992
memcpy (&addr[addr_idx],
2994
active_memb_entries *
2995
sizeof (struct srp_addr));
2997
active_memb_entries *
2998
sizeof (struct srp_addr);
2999
memcpy (&addr[addr_idx],
3000
instance->my_failed_list,
3001
instance->my_failed_list_entries *
3002
sizeof (struct srp_addr));
3004
instance->my_failed_list_entries *
3005
sizeof (struct srp_addr);
3008
if (instance->totem_config->send_join_timeout) {
3009
usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3012
totemrrp_mcast_flush_send (
3013
instance->totemrrp_handle,
3018
static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
2841
3020
struct memb_merge_detect memb_merge_detect;
2842
struct iovec iovec[2];
2844
3022
memb_merge_detect.header.type = MESSAGE_TYPE_MEMB_MERGE_DETECT;
2845
3023
memb_merge_detect.header.endian_detector = ENDIAN_LOCAL;
2846
3024
memb_merge_detect.header.encapsulated = 0;
2847
3025
memb_merge_detect.header.nodeid = instance->my_id.addr[0].nodeid;
2848
3026
srp_addr_copy (&memb_merge_detect.system_from, &instance->my_id);
3027
memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id,
3028
sizeof (struct memb_ring_id));
2849
3029
assert (memb_merge_detect.header.nodeid);
2851
iovec[0].iov_base = &memb_merge_detect;
2852
iovec[0].iov_len = sizeof (struct memb_merge_detect) -
2853
sizeof (struct memb_ring_id);
2854
iovec[1].iov_base = &instance->my_ring_id;
2855
iovec[1].iov_len = sizeof (struct memb_ring_id);
2857
totemrrp_mcast_flush_send (instance->totemrrp_handle, iovec, 2);
3031
totemrrp_mcast_flush_send (instance->totemrrp_handle,
3033
sizeof (struct memb_merge_detect));
2860
3036
static void memb_ring_id_create_or_load (
3520
3690
* Message is locally originated multicast
3522
if (sort_queue_item_p->iov_len > 1 &&
3523
sort_queue_item_p->iovec[0].iov_len == sizeof (struct mcast)) {
3524
instance->totemsrp_deliver_fn (
3525
mcast_header.header.nodeid,
3526
&sort_queue_item_p->iovec[1],
3527
sort_queue_item_p->iov_len - 1,
3528
endian_conversion_required);
3530
sort_queue_item_p->iovec[0].iov_len -= sizeof (struct mcast);
3531
sort_queue_item_p->iovec[0].iov_base += sizeof (struct mcast);
3533
instance->totemsrp_deliver_fn (
3534
mcast_header.header.nodeid,
3535
sort_queue_item_p->iovec,
3536
sort_queue_item_p->iov_len,
3537
endian_conversion_required);
3539
sort_queue_item_p->iovec[0].iov_len += sizeof (struct mcast);
3540
sort_queue_item_p->iovec[0].iov_base -= sizeof (struct mcast);
3542
//TODO instance->stats_delv += 1;
3692
instance->totemsrp_deliver_fn (
3693
mcast_header.header.nodeid,
3694
((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
3695
sort_queue_item_p->msg_len - sizeof (struct mcast),
3696
endian_conversion_required);
3634
3788
* otherwise free io vectors
3636
3790
if (msg_len > 0 && msg_len < FRAME_SIZE_MAX &&
3637
sq_in_range (sort_queue, mcast_header.seq) &&
3791
sq_in_range (sort_queue, mcast_header.seq) &&
3638
3792
sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
3641
3795
* Allocate new multicast memory block
3644
sort_queue_item.iovec[0].iov_base = malloc (msg_len);
3645
if (sort_queue_item.iovec[0].iov_base == 0) {
3798
sort_queue_item.mcast = malloc (msg_len);
3799
if (sort_queue_item.mcast == NULL) {
3646
3800
return (-1); /* error here is corrected by the algorithm */
3648
memcpy (sort_queue_item.iovec[0].iov_base, msg, msg_len);
3649
sort_queue_item.iovec[0].iov_len = msg_len;
3650
assert (sort_queue_item.iovec[0].iov_len > 0);
3651
assert (sort_queue_item.iovec[0].iov_len < FRAME_SIZE_MAX);
3652
sort_queue_item.iov_len = 1;
3802
memcpy (sort_queue_item.mcast, msg, msg_len);
3803
sort_queue_item.msg_len = msg_len;
3654
3805
if (sq_lt_compare (instance->my_high_seq_received,
3655
3806
mcast_header.seq)) {
3656
3807
instance->my_high_seq_received = mcast_header.seq;