5
/* $Header: /tmp/hpctools/ga/tcgmsg/ipcv4.0/snd.c,v 1.21 2004-04-01 02:04:57 manoj Exp $ */
16
#include <sys/select.h>
19
#include <sys/types.h>
22
#if defined(SHMEM) || defined(SYSV)
23
# if (defined(SGI_N32) || defined(SGITFP))
30
#if (defined(SUN) && !defined(SOLARIS))
31
extern char *sprintf();
38
#include "tcgsockets.h"
44
#if defined(SHMEM) || defined(SYSV)
45
#if !defined(SEQUENT) && !defined(CONVEX)
50
#if defined(USE_SRMOVER)
51
extern void SRmover();
53
#define SRmover(a,b,n) memcpy(a,b,n)
61
extern void ListenOnSock(int sock);
62
extern int AcceptConnection(int sock);
66
Print out the SR_proc_info structure array for this process
71
(void) fprintf(stderr,"Process info for node %ld: \n",NODEID_());
73
for (i=0; i<NNODES_(); i++)
74
(void) fprintf(stderr,"[%ld] = {\n\
75
clusid = %-8ld slaveid = %-8ld local = %-8ld\n\
76
sock = %-8d shmem = %-8p shmem_size = %-8ld\n\
77
shmem_id = %-8ld buffer = %-8p buflen = %-8ld\n\
78
header = %-8p semid = %-8ld sem_read = %-8ld\n\
79
sem_written = %-8ld n_rcv = %-8ld nb_rcv = %-8ld\n\
80
t_rcv = %-8ld n_snd = %-8ld nb_snd = %-8ld\n\
81
t_snd = %-8ld, peeked = %-8ld}\n",
83
SR_proc_info[i].clusid,
84
SR_proc_info[i].slaveid,
85
SR_proc_info[i].local,
87
SR_proc_info[i].shmem,
88
SR_proc_info[i].shmem_size,
89
SR_proc_info[i].shmem_id,
90
SR_proc_info[i].buffer,
91
SR_proc_info[i].buflen,
92
SR_proc_info[i].header,
93
SR_proc_info[i].semid,
94
SR_proc_info[i].sem_read,
95
SR_proc_info[i].sem_written,
96
SR_proc_info[i].n_rcv,
97
(long) SR_proc_info[i].nb_rcv,
98
(long) SR_proc_info[i].t_rcv,
99
SR_proc_info[i].n_snd,
100
(long) SR_proc_info[i].nb_snd,
101
(long) SR_proc_info[i].t_snd,
102
SR_proc_info[i].peeked);
104
(void) fflush(stderr);
107
static void PrintMessageHeader(info, header)
109
MessageHeader *header;
111
Print out the contents of a message header along with info message
114
(void) printf("%2ld:%s: type=%ld, from=%ld, to=%ld, len=%ld, tag=%ld\n",
115
NODEID_(),info, header->type, header->nodefrom,
116
header->nodeto, header->length, header->tag);
117
(void) fflush(stdout);
121
#if defined(SHMEM) || defined(SYSV)
123
static int DummyRoutine()
124
{int i, sum=0; for(i=0; i<10; i++) sum += i; return sum;}
129
#if defined(CONVEX) && defined(HPUX)
136
static void Await(p, value)
140
Wait until the value pointed to by p equals value.
141
Since *ptr is volatile but cannot usually declare this
142
include another level of procedure call to protect
143
against compiler optimization.
148
printf("%2ld: Await p=%p, value=%ld\n", NODEID_(), p, value);
152
for (; flag(p) != value; nspin++) {
153
#if defined(NOSPIN) && !defined(PARTIALSPIN)
155
(void) DummyRoutine();
157
USleep((long) 10000);
159
if (nspin < 10000000)
160
(void) DummyRoutine();
162
/* printf("%2ld: Await sleeping\n", NODEID_()); fflush(stdout); */
163
USleep((long) 100000);
169
static void rcv_local(type, buf, lenbuf, lenmes, nodeselect, nodefrom)
178
long node = *nodeselect;
179
MessageHeader *head = SR_proc_info[node].header;
180
long buflen = SR_proc_info[node].buflen;
181
char *buffer = SR_proc_info[node].buffer;
184
long semid = SR_proc_info[node].semid;
185
long sem_read = SR_proc_info[node].sem_read;
186
long sem_written = SR_proc_info[node].sem_written;
187
long semid_to = SR_proc_info[me].semid;
188
long sem_pend = SR_proc_info[me].sem_pend;
190
#if !defined(NOSPIN) || defined(PARTIALSPIN)
191
long *buffer_full = SR_proc_info[node].buffer_full;
196
if ( (buffer == (char *) NULL) || (head == (MessageHeader *) NULL) )
197
Error("rcv_local: invalid shared memory", (long) node);
200
if ( (semid < 0) || (sem_read < 0) || (sem_written < 0) ||
201
(semid_to < 0) || (sem_pend < 0) )
202
Error("rcv_local: invalid semaphore set", (long) node);
206
SemWait(semid_to, sem_pend);
209
Await(&head->nodeto, me); /* Still have this possible spin */
211
#if defined(NOSPIN) && !defined(PARTIALSPIN)
212
SemWait(semid, sem_written);
214
Await(buffer_full, (long) TRUE);
217
/* Now have a message for me ... check the header info and
218
copy the first block of the message. */
221
PrintMessageHeader("rcv_local ",head);
223
nodeto = head->nodeto; /* Always me ... history here */
226
*nodefrom = head->nodefrom;
228
if (head->type != *type) {
229
PrintMessageHeader("rcv_local ",head);
230
/* printf("rcv_local: type mismatch ... strong typing enforced\n"); */
232
Error("rcv_local: type mismatch ... strong typing enforced", (long) *type);
235
*lenmes = len = head->length;
237
if ( *lenmes > *lenbuf )
238
Error("rcv_local: message too long for buffer", (long) *lenmes);
240
Error("rcv_local: message meant for someone else?", (long) nodeto);
243
(void) SRmover(buf, buffer, (len > buflen) ? buflen : len);
245
#if defined(NOSPIN) && !defined(PARTIALSPIN)
246
SemPost(semid, sem_read);
248
*buffer_full = FALSE;
249
# if defined(CONVEX) && defined(HPUX)
258
/* Copy the remainder of the message */
261
#if defined(NOSPIN) && !defined(PARTIALSPIN)
262
SemWait(semid, sem_written);
264
Await(buffer_full, (long) TRUE);
266
(void) SRmover(buf, buffer, (len > buflen) ? buflen : len);
267
#if defined(NOSPIN) && !defined(PARTIALSPIN)
268
SemPost(semid, sem_read);
270
*buffer_full = FALSE;
277
static void snd_local(type, buf, lenbuf, node)
284
MessageHeader *head = SR_proc_info[me].header;
285
long buflen = SR_proc_info[me].buflen;
287
char *buffer = SR_proc_info[me].buffer;
288
long tag = SR_proc_info[*node].n_snd;
290
long semid = SR_proc_info[me].semid;
291
long sem_read = SR_proc_info[me].sem_read;
292
long sem_written = SR_proc_info[me].sem_written;
293
long semid_to = SR_proc_info[*node].semid;
294
long sem_pend = SR_proc_info[*node].sem_pend;
296
#if !defined(NOSPIN) || defined(PARTIALSPIN)
297
long *buffer_full = SR_proc_info[me].buffer_full;
302
if ( (buffer == (char *) NULL) || (head == (MessageHeader *) NULL) )
303
Error("snd_local: invalid shared memory", (long) *node);
306
if ( (semid < 0) || (semid_to < 0) || (sem_read < 0) || (sem_written < 0) )
307
Error("snd_local: invalid semaphore set", (long) *node);
310
/* Check that final segment of last message has been consumed */
312
#if defined(NOSPIN) && !defined(PARTIALSPIN)
313
SemWait(semid, sem_read);
315
Await(buffer_full, (long) FALSE);
318
/* Fill in message header */
320
head->nodefrom = (char) me;
322
head->length = *lenbuf;
324
head->nodeto = (char) *node;
325
#if defined(CONVEX) && defined(HPUX)
330
PrintMessageHeader("snd_local ",head);
331
(void) fflush(stdout);
334
/* Copy the first piece of the message so that send along with
335
header to minimize use of semaphores. Also need to send header
336
even for messages of zero length */
339
(void) SRmover(buffer, buf, (len > buflen) ? buflen : len);
341
#if defined(NOSPIN) && !defined(PARTIALSPIN)
342
SemPost(semid, sem_written);
345
# if defined(CONVEX) && defined(HPUX)
350
SemPost(semid_to, sem_pend);
357
#if defined(NOSPIN) && !defined(PARTIALSPIN)
358
SemWait(semid, sem_read);
360
Await(buffer_full, (long) FALSE);
362
(void) SRmover(buffer, buf, (len > buflen) ? buflen : len);
363
#if defined(NOSPIN) && !defined(PARTIALSPIN)
364
SemPost(semid, sem_written);
367
# if defined(CONVEX) && defined(HPUX)
377
static void snd_remote(type, buf, lenbuf, node)
383
synchronous send to remote process
385
long *type = user defined integer message type (input)
386
char *buf = data buffer (input)
387
long *lenbuf = length of buffer in bytes (input)
388
long *node = node to send to (input)
390
for zero length messages only the header is sent
393
#define SHORT_MSG_BUF_SIZE (2048 + 40)
394
static char fudge[SHORT_MSG_BUF_SIZE];
395
MessageHeader header;
397
int sock=SR_proc_info[*node].sock;
399
#ifdef SOCK_FULL_SYNC
404
Error("snd_remote: sending to process without socket", (long) *node);
406
header.nodefrom = me;
407
header.nodeto = *node;
409
header.length = *lenbuf;
410
header.tag = SR_proc_info[*node].n_snd;
412
/* header.length is the no. of items if XDR is used or just the
416
if ( *type & MSGDBL )
417
header.length = *lenbuf / sizeof(double);
418
else if ( *type & MSGINT )
419
header.length = *lenbuf / sizeof(long);
420
else if ( *type & MSGCHR )
421
header.length = *lenbuf / sizeof(char);
423
header.length = *lenbuf;
425
header.length = *lenbuf;
429
PrintMessageHeader("snd_remote",&header);
432
/* Combine header and messages less than a certain size to avoid
433
* performance problem on (older?) linuxes */
434
if ((*lenbuf + sizeof(header)) <= sizeof(fudge)) {
435
memcpy(fudge,(char *) &header, sizeof(header));
436
memcpy(fudge+sizeof(header), buf, *lenbuf);
437
if ( (len = WriteToSocket(sock, fudge, sizeof(header)+*lenbuf)) !=
438
((long)sizeof(header)+*lenbuf))
439
Error("snd_remote: writing message to socket",
440
(long) (len+100000*(sock + 1000* *node)));
446
(void) WriteXdrLong(sock, (long *) &header,
447
(long) (sizeof(header)/sizeof(long)));
449
if ( (len = WriteToSocket(sock, (char *) &header, (long) sizeof(header)))
451
Error("snd_remote: writing header to socket", len);
456
if ( *type & MSGDBL )
457
(void) WriteXdrDouble(sock, (double *) buf, header.length);
458
else if ( *type & MSGINT )
459
(void) WriteXdrLong(sock, (long *) buf, header.length);
460
else if ( *type & MSGCHR )
461
(void) WriteXdrChar(sock, (char *) buf, header.length);
462
else if ( (len = WriteToSocket(sock, buf, header.length)) !=
464
Error("snd_remote: writing message to socket",
465
(long) (len+100000*(sock + 1000* *node)));
467
if ( (len = WriteToSocket(sock, buf, header.length)) !=
469
Error("snd_remote: writing message to socket",
470
(long) (len+100000*(sock + 1000* *node)));
474
#ifdef SOCK_FULL_SYNC
475
/* this read (and write in rcv_remote) of an acknowledgment
476
forces synchronous */
478
if ( ReadFromSocket(sock, &sync, (long) 1) != 1)
479
Error("snd_remote: reading acknowledgement",
480
(long) (len+100000*(sock + 1000* *node)));
485
void SND_(type, buf, lenbuf, node, sync)
492
mostly syncrhonous send
494
long *type = user defined integer message type (input)
495
void *buf = data buffer (input)
496
long *lenbuf = length of buffer in bytes (input)
497
long *node = node to send to (input)
498
long *sync = flag for sync/async ... IGNORED
500
for zero length messages only the header is sent
504
long nproc=NNODES_();
512
Error("SND_: cannot send message to self", (long) me);
514
if ( (*node < 0) || (*node > nproc) )
515
Error("SND_: out of range node requested", (long) *node);
517
if ( (*lenbuf < 0) || (*lenbuf > (long)BIG_MESSAGE) )
518
Error("SND_: message length out of range", (long) *lenbuf);
521
evlog(EVKEY_BEGIN, EVENT_SND,
522
EVKEY_MSG_LEN, (int) *lenbuf,
523
EVKEY_MSG_FROM, (int) me,
524
EVKEY_MSG_TO, (int) *node,
525
EVKEY_MSG_TYPE, (int) *type,
526
EVKEY_MSG_SYNC, (int) *sync,
530
/* Send via shared memory or sockets */
536
#if defined(SHMEM) || defined(SYSV)
537
if (SR_proc_info[*node].local){
539
KSR_snd_local(type, buf, lenbuf, node);
541
snd_local(type, buf, lenbuf, node);
545
snd_remote(type, buf, lenbuf, node);
546
#if defined(SHMEM) || defined(SYSV)
550
/* Collect statistics */
552
SR_proc_info[*node].n_snd += 1;
553
SR_proc_info[*node].nb_snd += *lenbuf;
556
SR_proc_info[*node].t_snd += TCGTIME_() - start;
560
evlog(EVKEY_END, EVENT_SND, EVKEY_LAST_ARG);
564
static long MatchMessage(header, me, type)
565
MessageHeader *header;
568
Wrapper round check on if header is to me and of required
569
type so that compiler does not optimize out fetching
570
header info from shared memory.
573
return (long) ((header->nodeto == me) && (header->type == type));
576
static long NextReadyNode(type)
579
Select a node from which input is pending ... also match the
582
next_node is maintained as the last node that NextReadyNode chose
583
plus one modulo NNODES_(). This aids in ensuring fairness.
585
First use select to get info about the sockets and then loop
586
through processes looking either at the bit in the fd_set for
587
the socket (remote process) or the message header in the shared
588
memory buffer (local process).
590
This may be an expensive operation but fairness seems important.
592
If only sockets are in use, just block in select until data is
596
static long next_node = 0;
598
long nproc = NNODES_();
602
if (!SR_using_shmem) {
603
int list[MAX_PROCESS];
605
nready = WaitForSockets(SR_nsock,SR_socks,list);
607
Error("NextReadyNode: nready = 0\n", 0);
609
/* Insert here type checking logic ... not yet done */
611
return SR_socks_proc[list[0]];
614
/* With both local and remote processes end up with a busy wait
615
as no way to wait for both a semaphore and a socket.
616
Moderate this slightly by having short timeout in select */
620
for(i=0; i<nproc; i++, next_node = (next_node + 1) % nproc) {
622
if (next_node == me) {
623
; /* can't receive from self */
625
else if (SR_proc_info[next_node].local) {
626
/* Look for local message */
629
if (KSR_MatchMessage(next_node, me, type))
631
if (MatchMessage(SR_proc_info[next_node].header, me, type))
635
else if (SR_proc_info[next_node].sock >= 0) {
636
/* Look for message over socket */
638
int sock = SR_proc_info[next_node].sock;
640
/* Have we already peeked at this socket? */
642
if (SR_proc_info[next_node].peeked) {
643
if (SR_proc_info[next_node].head_peek.type == type)
646
else if (PollSocket(sock)) {
647
/* Data is available ... let's peek at it */
649
(void) ReadXdrLong(sock,
650
(long *) &SR_proc_info[next_node].head_peek,
651
(long) (sizeof(MessageHeader)/sizeof(long)));
653
if (ReadFromSocket(sock,
654
(char *) &SR_proc_info[next_node].head_peek,
655
(long) sizeof(MessageHeader))
656
!= sizeof(MessageHeader) )
657
Error("NextReadyNode: reading header from socket", next_node);
659
SR_proc_info[next_node].peeked = TRUE;
661
PrintMessageHeader("peeked_at ",
662
&SR_proc_info[next_node].head_peek);
664
if (SR_proc_info[next_node].head_peek.type == type)
669
if (i < nproc) /* If found a node skip out of the while loop */
672
nspin++; /* Compromise between low latency and low cpu use */
675
else if (nspin < 100)
677
else if (nspin < 600)
678
USleep((long) 10000);
680
USleep((long) 100000);
684
next_node = (next_node + 1) % nproc;
689
long PROBE_(type, node)
692
Return 1/0 (TRUE/FALSE) if a message of the given type is available
693
from the given node. If the node is specified as -1, then all nodes
694
will be examined. Some attempt is made at ensuring fairness.
696
First use select to get info about the sockets and then loop
697
through processes looking either at the bit in the fd_set for
698
the socket (remote process) or the message header in the shared
699
memory buffer (local process).
701
This may be an expensive operation but fairness seems important.
704
long nproc = NNODES_();
706
int i, proclo, prochi;
709
Error("PROBE_ : cannot recv message from self, msgtype=", *type);
711
if (*node == -1) { /* match anyone */
716
proclo = prochi = *node;
718
for(i=proclo; i<=prochi; i++) {
721
; /* can't receive from self */
723
else if (SR_proc_info[i].local) {
724
/* Look for local message */
727
if (KSR_MatchMessage(i, me, type))
729
if (MatchMessage(SR_proc_info[i].header, me, *type))
733
else if (SR_proc_info[i].sock >= 0) {
734
/* Look for message over socket */
736
int sock = SR_proc_info[i].sock;
738
/* Have we already peeked at this socket? */
740
if (SR_proc_info[i].peeked) {
741
if (SR_proc_info[i].head_peek.type == *type)
744
else if (PollSocket(sock)) {
745
/* Data is available ... let's peek at it */
747
(void) ReadXdrLong(sock,
748
(long *) &SR_proc_info[i].head_peek,
749
(long) (sizeof(MessageHeader)/sizeof(long)));
751
if (ReadFromSocket(sock,
752
(char *) &SR_proc_info[i].head_peek,
753
(long) sizeof(MessageHeader))
754
!= sizeof(MessageHeader) )
755
Error("NextReadyNode: reading header from socket", (long) i);
757
SR_proc_info[i].peeked = TRUE;
759
PrintMessageHeader("peeked_at ",
760
&SR_proc_info[i].head_peek);
762
if (SR_proc_info[i].head_peek.type == *type)
775
static void rcv_remote(type, buf, lenbuf, lenmes, nodeselect, nodefrom)
783
synchronous receive of data
785
long *type = user defined type of received message (input)
786
char *buf = data buffer (output)
787
long *lenbuf = length of buffer in bytes (input)
788
long *lenmes = length of received message in bytes (output)
789
(exceeding receive buffer is hard error)
790
long *nodeselect = node to receive from (input)
791
-1 implies that any pending message may be received
793
long *nodefrom = node message is received from (output)
797
long node = *nodeselect;
798
int sock = SR_proc_info[node].sock;
800
MessageHeader header;
801
#ifdef SOCK_FULL_SYNC
806
Error("rcv_remote: receiving from process without socket", (long) node);
808
/* read the message header and check contents */
810
if (SR_proc_info[node].peeked) {
811
/* Have peeked at this socket ... get message header from buffer */
814
printf("%2ld: rcv_remote message has been peeked at\n", me);
816
(void) memcpy((char *) &header, (char *) &SR_proc_info[node].head_peek,
817
sizeof(MessageHeader));
818
SR_proc_info[node].peeked = FALSE;
822
(void) ReadXdrLong(sock, (long *) &header,
823
(long) (sizeof(header)/sizeof(long)));
825
if ( (len = ReadFromSocket(sock, (char *) &header, (long) sizeof(header)))
827
Error("rcv_remote: reading header from socket", len);
832
PrintMessageHeader("rcv_remote",&header);
834
if (header.nodeto != me) {
835
PrintMessageHeader("rcv_remote",&header);
836
Error("rcv_remote: got message meant for someone else",
837
(long) header.nodeto);
840
*nodefrom = header.nodefrom;
841
if (*nodefrom != node)
842
Error("rcv_remote: got message from someone on incorrect socket",
845
if (header.type != *type) {
846
PrintMessageHeader("rcv_remote",&header);
847
printf("rcv_remote: type mismatch ... strong typing enforced\n");
849
Error("rcv_remote: type mismatch ... strong typing enforced", (long) *type);
853
if ( *type & MSGDBL )
854
*lenmes = header.length * sizeof(double);
855
else if ( *type & MSGINT )
856
*lenmes = header.length * sizeof(long);
857
else if ( *type & MSGCHR )
858
*lenmes = header.length * sizeof(char);
860
*lenmes = header.length;
862
*lenmes = header.length;
865
if ( (*lenmes < 0) || (*lenmes > (long)BIG_MESSAGE) || (*lenmes > *lenbuf) ) {
866
PrintMessageHeader("rcv_remote",&header);
867
(void) fprintf(stderr, "rcv_remote err: lenbuf=%ld\n",*lenbuf);
868
Error("rcv_remote: message length out of range",(long) *lenmes);
873
if ( *type & MSGDBL )
874
(void) ReadXdrDouble(sock, (double *) buf, header.length);
875
else if ( *type & MSGINT )
876
(void) ReadXdrLong(sock, (long *) buf, header.length);
877
else if ( *type & MSGCHR )
878
(void) ReadXdrChar(sock, (char *) buf, header.length);
879
else if ( (len = ReadFromSocket(sock, buf, *lenmes)) != *lenmes)
880
Error("rcv_remote: reading message from socket",
881
(long) (len+100000*(sock+ 1000* *nodefrom)));
883
if ( (len = ReadFromSocket(sock, buf, *lenmes)) != *lenmes)
884
Error("rcv_remote: reading message from socket",
885
(long) (len+100000*(sock+ 1000* *nodefrom)));
889
/* this write (and read in snd_remote) makes the link synchronous */
891
#ifdef SOCK_FULL_SYNC
892
if ( WriteToSocket(sock, &sync, (long) 1) != 1)
893
Error("rcv_remote: writing sync to socket", (long) node);
899
void RCV_(type, buf, lenbuf, lenmes, nodeselect, nodefrom, sync)
908
long *type = user defined type of received message (input)
909
void *buf = data buffer (output)
910
long *lenbuf = length of buffer in bytes (input)
911
long *lenmes = length of received message in bytes (output)
912
(exceeding receive buffer is hard error)
913
long *nodeselect = node to receive from (input)
914
-1 implies that any pending message may be received
916
long *nodefrom = node message is received from (output)
917
long *sync = 0 for asynchronous, 1 for synchronous (NOT USED)
921
long nproc = NNODES_();
928
evlog(EVKEY_BEGIN, EVENT_RCV,
929
EVKEY_MSG_FROM, (int) *nodeselect,
930
EVKEY_MSG_TO, (int) me,
931
EVKEY_MSG_TYPE, (int) *type,
932
EVKEY_MSG_SYNC, (int) *sync,
936
/* Assign the desired node or the next ready node */
942
if (*nodeselect == -1)
943
node = NextReadyNode(*type);
947
/* Check for some errors ... need more checking here ...
948
note that the overall master process has id nproc */
951
Error("RCV_: cannot receive message from self", (long) me);
953
if ( (node < 0) || (node > nproc) )
954
Error("RCV_: out of range node requested", (long) node);
956
/* Receive the message ... use shared memory, switch or socket */
958
#if defined(SHMEM) || defined(SYSV)
959
if (SR_proc_info[node].local){
961
KSR_rcv_local(type, buf, lenbuf, lenmes, &node, nodefrom);
963
rcv_local(type, buf, lenbuf, lenmes, &node, nodefrom);
967
rcv_remote(type, buf, lenbuf, lenmes, &node, nodefrom);
968
#if defined(SHMEM) || defined(SYSV)
972
/* Collect statistics */
974
SR_proc_info[node].n_rcv += 1;
975
SR_proc_info[node].nb_rcv += *lenmes;
978
SR_proc_info[node].t_rcv += TCGTIME_() - start;
982
evlog(EVKEY_END, EVENT_RCV,
983
EVKEY_MSG_FROM, (int) node,
984
EVKEY_MSG_LEN, (int) *lenmes,
989
void RemoteConnect(a, b, c)
992
Make a socket connection between processes a and b via the
993
process c to which both are already connected.
997
long nproc = NNODES_();
998
long type = TYPE_CONNECT; /* Overriden below */
1000
long tmp, lenmes, nodefrom, clusid, lenbuf, sync=1;
1004
if ((a == b) || (a == c) || (b == c) )
1005
return; /* Gracefully ignore redundant connections */
1007
if ( (me != a) && (me != b) && (me != c) )
1008
return; /* I'm not involved in this connection */
1012
tmp = a; a = b; b = tmp;
1015
type = (a + nproc*b) | MSGINT; /* Create a unique type */
1018
(void) printf("RC a=%ld, b=%ld, c=%ld, me=%ld\n",a,b,c,me);
1019
(void) fflush(stdout);
1023
CreateSocketAndBind(&sock, &port); /* Create port */
1025
(void) printf("RC node=%ld, sock=%d, port=%d\n",me, sock, port);
1026
(void) fflush(stdout);
1029
lenbuf = sizeof lport;
1031
SND_(&type, (char *) &lport, &lenbuf, &c, &sync); /* Port to intermediate */
1032
SR_proc_info[b].sock = AcceptConnection(sock); /* Accept connection
1033
and save socket info */
1036
clusid = SR_proc_info[a].clusid;
1037
lenbuf = sizeof lport;
1038
RCV_(&type, (char *) &lport, &lenbuf, &lenmes, &c, &nodefrom, &sync);
1040
(void) sprintf(cport,"%d",port);
1041
lenbuf = strlen(cport) + 1;
1042
if (lenbuf > (long)sizeof(cport))
1043
Error("RemoteConnect: cport too small", (long) lenbuf);
1044
SR_proc_info[a].sock =
1045
CreateSocketAndConnect(SR_clus_info[clusid].hostname, cport);
1048
lenbuf = sizeof lport;
1049
RCV_(&type, (char *) &lport, &lenbuf, &lenmes, &a, &nodefrom, &sync);
1050
SND_(&type, (char *) &lport, &lenbuf, &b, &sync);