4
static const long false = 0;
5
static const long true = 1;
7
extern void USleep(long);
10
extern long async_send(SendQEntry *);
12
static long NextMsgID(long node)
14
Given a nodeid return a unqiue integer constructed by
15
combining it with the value of a counter
19
static long mask = (1<<20)-1;
24
return (node << 20) + id;
27
static long NodeFromMsgID(long msgid)
29
Given an id from NextMsgID extract the node
32
long node = msgid >> 20;
34
if (node < 0 || node > NNODES_())
35
Error("NodeFromMsgID: invalid msgid", msgid);
40
static void flush_send_q_node(long node)
42
Flush as many messages as possible without blocking from
43
the send q to the specified node.
46
while (TCGMSG_proc_info[node].sendq) {
48
if (!async_send(TCGMSG_proc_info[node].sendq)) {
49
/* Send is incomplete ... stop processing this q*/
53
SendQEntry *tmp = TCGMSG_proc_info[node].sendq;
55
TCGMSG_proc_info[node].sendq = (SendQEntry *) TCGMSG_proc_info[node].sendq->next;
56
if (tmp->free_buf_on_completion)
57
(void) free(tmp->buf);
58
tmp->active = false; /* Matches NewSendQEntry() */
65
Flush as many messages as possible without blocking
66
from all of the send q's.
70
long nproc = NNODES_();
72
for (node=0; node<nproc; node++)
73
if (TCGMSG_proc_info[node].sendq)
74
flush_send_q_node(node);
77
long msg_status(msgid)
80
Return 0 if the message operation is incomplete.
81
Return 1 if the message operation is complete.
84
long node = NodeFromMsgID(msgid);
90
/* Attempt to find the msgid in the message q. If it is not
91
there then the send is complete */
93
for (entry=TCGMSG_proc_info[node].sendq; entry; entry=(SendQEntry *) entry->next) {
94
if (entry->msgid == msgid) {
103
void msg_wait(long msgid)
105
Wait for the operation referred to by msgid to complete.
112
long waittim = 10000;
115
long spinlim = 1000000;
117
long waittim = 100000;
121
while (!msg_status(msgid)) {
134
static SendQEntry *NewSendQEntry(void)
136
SendQEntry *new = TCGMSG_sendq_ring;
139
Error("NewSendQEntry: too many outstanding sends\n", 0L);
141
TCGMSG_sendq_ring = (SendQEntry *) TCGMSG_sendq_ring->next_in_ring;
148
long msg_async_snd(type, buf, lenbuf, node)
157
if (node<0 || node>=TCGMSG_nnodes)
158
Error("msg_async_send: node is out of range", node);
160
if (node == TCGMSG_nodeid)
161
Error("msg_async_send: cannot send to self", node);
163
msgid = NextMsgID(node);
164
entry = NewSendQEntry();
166
/* Insert a new entry into the q */
168
entry->tag = TCGMSG_proc_info[node].n_snd++; /* Increment tag */
169
entry->msgid = msgid;
172
/* allignment is critical on T3D (shmem library) */
173
if (((unsigned long) buf) & 7) {
174
printf("%2ld: mallocing unalinged buffer len=%ld\n",
175
TCGMSG_nodeid, lenbuf);
177
if (!(entry->buf = malloc((size_t) lenbuf)))
178
Error("msg_sync_send: malloc failed", lenbuf);
179
(void) memcpy(entry->buf, buf, lenbuf);
180
entry->free_buf_on_completion = 1;
186
entry->free_buf_on_completion = 0;
188
entry->lenbuf= lenbuf;
190
entry->next = (SendQEntry *) 0;
192
entry->buffer_number = 0;
194
/* Attach to the send q */
196
if (!TCGMSG_proc_info[node].sendq)
197
TCGMSG_proc_info[node].sendq = entry;
199
SendQEntry *cur = TCGMSG_proc_info[node].sendq;
206
/* Attempt to flush the send q */
213
void msg_snd(long type, char *buf, long lenbuf, long node)
215
synchronous send of message to a process
217
long *type = user defined integer message type (input)
218
char *buf = data buffer (input)
219
long *lenbuf = length of buffer in bytes (input)
220
long *node = node to send to (input)
222
for zero length messages only the header is sent
225
msg_wait(msg_async_snd(type, buf, lenbuf, node));