9
extern void qsort(void *base, size_t nmemb, size_t size, int(*compar)(const void *, const void *));
15
extern long MatchShmMessage();
16
extern void msg_wait();
19
#define INVALID_NODE -3333 /* used to stamp completed msg in the queue */
20
#define MAX_Q_LEN MAX_PROC /* Maximum no. of outstanding messages */
21
static volatile long n_in_msg_q = 0; /* actual no. in the message q */
22
static struct msg_q_struct{
30
* Return 1/0 (TRUE/FALSE) if a message of the given type is available
31
* from the given node. If the node is specified as -1, then all nodes
32
* will be examined. Some attempt is made at ensuring fairness.
34
* If node is specified as -1 then this value is overwritten with the
35
* node that we got the message from.
37
long ProbeNode(long *type, long *node)
39
static long next_node = 0;
41
long nproc = NNODES_();
45
int i, proclo, prochi;
48
Error("PROBE_ : cannot recv message from self, msgtype=", *type);
50
if (*node == -1) { /* match anyone */
57
proclo = prochi = cur_node = *node;
59
for(i = proclo; i<= prochi; i++) {
61
if (cur_node != me){ /* can't receive from self */
62
found = MatchShmMessage(cur_node, *type);
65
cur_node = (cur_node +1)%nproc;
69
if(found) *node = cur_node;
71
/* if wildcard node, determine which node we'll start with next time */
72
if(*type == -1) next_node = (cur_node +1)%nproc;
78
* Return 1/0 (TRUE/FALSE) if a message of the given type is available
79
* from the given node. If the node is specified as -1, then all nodes
80
* will be examined. Some attempt is made at ensuring fairness.
82
long PROBE_(long *type, long *node)
87
result = ProbeNode(type, &nnode);
94
* long *type = user defined type of received message (input)
95
* char *buf = data buffer (output)
96
* long *lenbuf = length of buffer in bytes (input)
97
* long *lenmes = length of received message in bytes (output)
98
* (exceeding receive buffer is hard error)
99
* long *nodeselect = node to receive from (input)
100
* -1 implies that any pending message of the specified
101
* type may be received
102
* long *nodefrom = node message is received from (output)
103
* long *sync = flag for sync(1) or async(0) receipt (input)
105
void RCV_(long *type, void *buf, long *lenbuf, long *lenmes, long *nodeselect, long *nodefrom, long *sync)
117
printf("RCV_: node %ld receiving from %ld, len=%ld, type=%ld, sync=%ld\n",
118
(long)me, (long)*nodeselect, (long)*lenbuf, (long)*type, (long)*sync);
122
/* wait for a matching message */
123
if(node==-1) while(ProbeNode(type, &node) == 0);
124
msg_rcv(ttype, buf, *lenbuf, lenmes, node);
128
(void) printf("RCV: me=%ld, from=%ld, len=%ld\n",
129
(long)me, (long)*nodeselect, (long)*lenbuf);
130
(void) fflush(stdout);
136
* long *type = user defined integer message type (input)
137
* char *buf = data buffer (input)
138
* long *lenbuf = length of buffer in bytes (input)
139
* long *node = node to send to (input)
140
* long *sync = flag for sync(1) or async(0) communication (input)
142
void SND_(long *type, void *buf, long *lenbuf, long *node, long *sync)
145
long msg_async_snd();
147
/*asynchronous communication not supported under LAPI */
155
(void)printf("SND_: node %ld sending to %ld, len=%ld, type=%ld, sync=%ld\n",
156
(long)me, (long)*node, (long)*lenbuf, (long)*type, (long)*sync);
157
(void) fflush(stdout);
161
msg_wait(msg_async_snd(*type, buf, *lenbuf, *node));
165
if (n_in_msg_q >= MAX_Q_LEN)
166
Error("SND: overflowing async Q limit", n_in_msg_q);
168
msg_q[n_in_msg_q].msg_id = msg_async_snd(*type, buf, *lenbuf, *node);
169
msg_q[n_in_msg_q].node = *node;
170
msg_q[n_in_msg_q].type = *type;
175
(void) printf("SND: me=%ld, to=%ld, len=%ld \n",
176
(long)me, (long)*node, (long)*lenbuf);
177
(void) fflush(stdout);
182
int compare_msg_q_entries(const void* entry1, const void* entry2)
184
/* nodes are nondistiguishable unless one of them is INVALID_NODE */
185
if( ((struct msg_q_struct*)entry1)->node ==
186
((struct msg_q_struct*)entry2)->node) return 0;
187
if( ((struct msg_q_struct*)entry1)->node == INVALID_NODE) return 1;
188
if( ((struct msg_q_struct*)entry2)->node == INVALID_NODE) return -1;
194
* Wait for all messages (send/receive) to complete between
195
* this node and node *nodesel or everyone if *nodesel == -1.
197
void WAITCOM_(long *nodesel)
201
for (i=0; i<n_in_msg_q; i++) if(*nodesel==msg_q[i].node || *nodesel ==-1){
204
(void) printf("WAITCOM: %ld waiting for msgid %ld, #%ld\n",
205
(long)NODEID_(), (long)msg_q[i].msg_id, (long)i);
206
(void) fflush(stdout);
209
msg_wait(msg_q[i].msg_id);
211
msg_q[i].node = INVALID_NODE;
214
}else if(msg_q[i].node == INVALID_NODE)Error("WAITCOM: invalid node entry",i);
216
/* tidy up msg_q if there were any messages completed */
219
/* sort msg queue only to move the completed msg entries to the end*/
220
/* comparison tests against the INVALID_NODE key */
221
qsort(msg_q, n_in_msg_q, sizeof(struct msg_q_struct),compare_msg_q_entries);
223
/* update msg queue length, = the number of outstanding msg entries left*/
224
for(i = 0; i< n_in_msg_q; i++)if(msg_q[i].node == INVALID_NODE) break;
225
if(i == n_in_msg_q) Error("WAITCOM: inconsitency in msg_q update", i);