9
extern void exit(int status);
13
static const Integer false = 0;
14
static const Integer true = 1;
16
extern void Busy(int);
17
extern void flush_send_q(void);
19
/* All data movement to/from shared memory is done using the
20
COPY_TO/FROM_SHMEM macros */
21
#define COPY_TO_LOCAL(src, dest, n, destnode) (void) memcpy(dest, src, n)
22
#define COPY_FROM_LOCAL(src, dest, n) (void)memcpy(dest, src, n)
23
#define COPY_FROM_REMOTE(src,dest,n,p) (void)memcpy(dest, src, n)
24
#define COPY_TO_REMOTE(src,dest,n,p) (void)memcpy(dest, src, n)
28
#ifndef FLUSH_CACHE_LINE
29
# define FLUSH_CACHE_LINE(x)
32
/* #define TCG_ABS(a) (((a) >= 0) ? (a) : (-(a))) */
36
* Return the value of a volatile variable in shared memory
37
* that is REMOTE to this processor
39
static Integer remote_flag(Integer *p, Integer node)
43
/* FLUSH_CACHE;*/ /* no need to flush for one word only*/
44
COPY_FROM_REMOTE(p, &tmp, sizeof(tmp), node);
50
* Return the value of a volatile variable in shared memory
51
* that is LOCAL to this processor
53
static Integer local_flag(Integer *p)
61
* Wait for (*p == value)
63
static void local_await(Integer *p, Integer value)
67
Integer spinlim = 100000000;
69
while ((pval = local_flag(p)) != value) {
71
if (pval && (pval != value)) {
72
fprintf(stdout,"%2ld: invalid value=%ld, local_flag=%p %ld\n",
73
TCGMSG_nodeid, (long)value, p, (long)pval);
78
if((nspin&7)==0)flush_send_q();
88
* Entry points to info about a message ... determine which
89
* transport mechanism is appropriate and send as much as
90
* possible without blocking.
92
* Right now just shared memory ... when sockets are working this
93
* routine will become async_shmem_send.
95
* Shared-memory protocol aims for low latency. Each process has
96
* one buffer for every other process. Thus, to send a message U
97
* merely have to determine if the receivers buffer for you is empty
98
* and copy directly into the receivers buffer.
100
* Return 0 if more data is to be sent, 1 if the send is complete.
102
Integer async_send(SendQEntry *entry)
104
Integer node = entry->node;
105
ShmemBuf *sendbuf= TCGMSG_proc_info[node].sendbuf;
106
Integer nleft, ncopy;
111
(void) fprintf(stdout,"%2ld: sending to %ld buf=%lx len=%ld\n",
112
TCGMSG_nodeid, node, entry->buf, entry->lenbuf);
113
(void) fprintf(stdout,"%2ld: sendbuf=%lx\n", TCGMSG_nodeid, sendbuf);
114
(void) fflush(stdout);
117
if ((pval = remote_flag(&sendbuf->info[3], node))) {
122
COPY_FROM_REMOTE(sendbuf->info, info, sizeof(info), node);
123
fprintf(stdout,"%2ld: snd info after full = %ld %ld %ld\n",
124
TCGMSG_nodeid, info[0], info[1], info[2]);
133
info[0] = entry->type; info[1] = entry->lenbuf; info[2] = entry->tag;
135
/* Copy over the first buffer load of the message */
137
nleft = entry->lenbuf - entry->written;
138
ncopy = (Integer) ((nleft <= SHMEM_BUF_SIZE) ? nleft : SHMEM_BUF_SIZE);
142
printf("%2ld: rounding buffer up %ld->%ld\n",
143
TCGMSG_nodeid, ncopy, ncopy + 8 - (ncopy&7));
146
ncopy = ncopy + 8 - (ncopy&7);
150
COPY_TO_REMOTE(entry->buf+entry->written, sendbuf->buf, ncopy, node);
154
/* NOTE that SHMEM_BUF_SIZE is a multiple of 8 by construction so that
155
this ncopy is only rounded up on the last write */
157
ncopy = (Integer) ((nleft <= SHMEM_BUF_SIZE) ? nleft : SHMEM_BUF_SIZE);
158
entry->written += ncopy;
159
entry->buffer_number++;
161
/* Copy over the header information include buffer full flag */
163
info[3] = entry->buffer_number;
164
COPY_TO_REMOTE(info, sendbuf->info, sizeof(info), node);
166
return (Integer) (entry->written == entry->lenbuf);
171
* Receive a message of given type from the specified node, returning
172
* the message and length of the message.
174
* Right now just shared memory ... when sockets are working this
175
* routine will become msg_shmem_rcv
177
* Shared-memory protocol aims for low latency. Each process has
178
* one buffer for every other process. Thus, to send a message U
179
* merely have to determine if the receivers buffer for you is empty
180
* and copy directly into the receivers buffer.
182
* Return 0 if more data is to be sent, 1 if the send is complete.
184
void msg_rcv(Integer type, char *buf, Integer lenbuf, Integer *lenmes, Integer node)
186
Integer me = TCGMSG_nodeid;
187
ShmemBuf *recvbuf; /* Points to receving buffer */
189
Integer msg_type, msg_tag, msg_len;
190
Integer buffer_number = 1;
191
Integer expected_tag = TCGMSG_proc_info[node].tag_rcv++;
193
if (node<0 || node>=TCGMSG_nnodes)
194
Error("msg_rcv: node is out of range", node);
196
recvbuf = TCGMSG_proc_info[node].recvbuf;
198
/* Wait for first part message to be written */
201
(void) fprintf(stdout,"%2ld: receiving from %ld buf=%lx len=%ld\n",
202
me, node, buf, lenbuf);
204
(void) fprintf(stdout,"%2ld: recvbuf=%lx\n", me, recvbuf);
205
(void) fflush(stdout);
208
local_await(&recvbuf->info[3], buffer_number);
210
/* Copy over the header information */
213
msg_type = recvbuf->info[0];
214
msg_len = recvbuf->info[1];
215
msg_tag = recvbuf->info[2];
217
/* Check type and size information */
219
if (msg_tag != expected_tag) {
220
(void) fprintf(stdout,
221
"rcv: me=%ld from=%ld type=%ld, tag=%ld, expectedtag=%ld\n",
222
(long)me, (long)node, (long)type, (long)msg_tag, (long)expected_tag);
224
Error("msg_rcv: tag mismatch ... transport layer failed????", 0L);
227
if (msg_type != type) {
228
(void) fprintf(stdout,
229
"rcv: me=%ld from=%ld type=(%ld != %ld) tag=%ld len=%ld\n",
230
(long)me, (long)node, (long)type, (long)msg_type, (long)msg_tag, (long)msg_len);
232
Error("msg_rcv: type mismatch ... strong typing enforced\n", 0L);
235
if (msg_len > lenbuf) {
236
(void) fprintf(stderr,
237
"rcv: me=%ld from=%ld type=%ld tag=%ld len=(%ld > %ld)\n",
238
(long)me, (long)node, (long)type, (long)msg_tag, (long)msg_len, (long)lenbuf);
239
Error("msg_rcv: message too Integer for buffer\n", 0L);
242
nleft = *lenmes = msg_len;
244
recvbuf->info[3] = false;
248
Integer ncopy = (Integer) ((nleft <= SHMEM_BUF_SIZE) ? nleft : SHMEM_BUF_SIZE);
252
for(line = 0; line < ncopy; line+=32)
253
FLUSH_CACHE_LINE(recvbuf->buf+line);
258
/* if (buffer_number > 1) FLUSH_CACHE;*/
259
COPY_FROM_LOCAL(recvbuf->buf, buf, ncopy);
261
recvbuf->info[3] = false;
268
local_await(&recvbuf->info[3], buffer_number);
274
Integer MatchShmMessage(Integer node, Integer type)
279
recvbuf = TCGMSG_proc_info[node].recvbuf;
281
if(recvbuf->info[3] == false) return (0); /* no message to receive */
283
/* we have a message but let's see if want it */
285
FLUSH_CACHE_LINE(recvbuf->info);
286
COPY_FROM_LOCAL(recvbuf->info, &msg_type, sizeof(Integer));
287
if(type == msg_type) return (1);