6
* mpi2_server.c: MPI_SPAWN Server Code
26
#include "kr_malloc.h"
29
/* Inter-communicators for communicating with clients */
30
MPI_Comm MPI_COMM_SERVER2CLIENT=MPI_COMM_NULL;
32
static int armci_server_me=-1, armci_nserver=-1;
33
static int armci_client_first=-1, armci_nclients=-1;
35
extern Header *armci_get_shmem_ptr(int shmid, long shmoffset, size_t shmsize);
37
/* ====================== MUltiple Buffers Code ======================== */
40
#define MPI2_MAX_BUFS 10
42
typedef struct req_waitlist
47
struct req_waitlist *next;
51
static double _mpi2_rcv_buf[MPI2_MAX_BUFS][MSG_BUFLEN_DBL];
52
static MPI_Request _mpi_request[MPI2_MAX_BUFS];
53
static int *_next_tag=NULL;
54
static int _reqid_ready=0;
55
static req_waitlist_t *_req_waitlist_head = NULL;
56
static req_waitlist_t *_req_waitlist_tail = NULL;
58
/* increment the tag value to maintain flowcontrol */
61
if(++_next_tag[p] > ARMCI_MPI_SPAWN_TAG_END) \
62
_next_tag[p] = ARMCI_MPI_SPAWN_TAG_BEGIN; \
66
static int wlist_get_req(int *p, int *tag, int *reqid) {
68
req_waitlist_t *itr, *prev=NULL;
70
for(itr=_req_waitlist_head; itr != NULL; itr=itr->next)
72
if(itr->tag == _next_tag[itr->p])
74
/* mark the request id as ready to be processed, and update the
75
* tag order and waitlist */
81
/* remove this request from the waiting list */
82
if(itr==_req_waitlist_head) _req_waitlist_head = itr->next;
83
if(itr==_req_waitlist_tail) _req_waitlist_tail = prev;
84
if(prev != NULL) prev->next = itr->next;
95
static void wlist_add_req(int reqid, int p, int tag)
97
req_waitlist_t *node = (req_waitlist_t *) malloc(sizeof(req_waitlist_t));
104
if(_req_waitlist_head == NULL)
106
_req_waitlist_head = node;
107
_req_waitlist_tail = node;
111
/* append the new request at the end of the list */
112
_req_waitlist_tail->next = node;
113
_req_waitlist_tail = node;
117
/* ==================== END: Multiple Buffers Code ====================== */
120
void armci_mpi2_server_debug(int rank, const char *format, ...)
124
if(rank == armci_server_me) {
125
va_start(arg, format);
126
printf("**** Server %d: ", armci_server_me);
127
vprintf(format, arg);
133
# define armci_mpi2_server_debug(x, ...)
137
static inline int MPI_Check (int status)
139
if(status != MPI_SUCCESS)
141
armci_mpi2_server_debug(armci_me, "MPI Check failed.\n");
142
armci_die("MPI_Check failed.", 0);
146
# define MPI_Check(x) x
150
/**************************************************************************
151
* Platform specific server code as required by the ARMCI s/w layer. (BEGIN)
154
/* establish connections with client (i.e compute) processes */
155
void armci_server_initial_connection()
157
armci_mpi2_server_debug(0, "armci_server_initial_connection\n");
161
/* close all open connections, called before terminating/aborting */
162
void armci_transport_cleanup()
164
/* armci_transport_cleanup is called by all procs (clients and servers).
165
Therefore, only in server case we need to finalize MPI before exit. */
166
if(MPI_COMM_SERVER2CLIENT != MPI_COMM_NULL)
168
armci_mpi2_server_debug(0, "Calling MPI_Finalize\n");
170
exit(EXIT_SUCCESS); /* server termination */
174
static void armci_mpi_rcv_strided_data(request_header_t *msginfo,
175
void *vdscr, int from)
180
int stride_levels, *stride_arr, *count;
182
bytes = msginfo->dscrlen;
183
dscr = (char*)(msginfo + 1);
184
*(void**)vdscr = (void *)dscr;
186
ptr = *(void**)dscr; dscr += sizeof(void*);
187
stride_levels = *(int*)dscr; dscr += sizeof(int);
188
stride_arr = (int*)dscr; dscr += stride_levels*sizeof(int);
189
count = (int*)dscr; dscr += (stride_levels+1)*sizeof(int);
191
#ifdef MPI_USER_DEF_DATATYPE
194
armci_mpi_strided2(RECV, ptr, stride_levels, stride_arr, count, from,
195
MPI_COMM_SERVER2CLIENT);
200
armci_mpi_strided(RECV, ptr, stride_levels, stride_arr, count, from,
201
MPI_COMM_SERVER2CLIENT);
205
static void armci_mpi_rcv_vector_data(request_header_t *msginfo,
206
void *vdscr, int proc)
208
armci_die("armci_mpi_rcv_vector_data(): Not yet implemented!", 0);
211
/* server receives request */
212
void armci_rcv_req (void *mesg, void *phdr, void *pdescr,
213
void *pdata, int *buflen)
215
request_header_t *msginfo = NULL;
216
int hdrlen = sizeof(request_header_t);
220
#if !defined(MULTIPLE_BUFS)
222
msginfo = (request_header_t*) MessageRcvBuffer;
226
MPI_Recv(MessageRcvBuffer, MSG_BUFLEN, MPI_BYTE, p, ARMCI_MPI_SPAWN_TAG,
227
MPI_COMM_SERVER2CLIENT, &status)
230
int reqid = _reqid_ready;;/*get request id that is ready to be processed */
232
msginfo = (request_header_t*) _mpi2_rcv_buf[reqid];
234
if(p != msginfo->from)
235
armci_die("armci_rcv_req: invalid client", p);
238
* (void **) phdr = msginfo;
240
if( !(p >= 0 && p < armci_nproc) )
241
armci_die("armci_rcv_req: request from invalid client", p);
243
armci_mpi2_server_debug(armci_server_me,
244
"armci_rcv_req: op=%d mesg=%p, phdr=%p "
245
"pdata=%p, buflen=%p, p=%d\n", msginfo->operation,
246
mesg, phdr, pdata, buflen, p, MSG_BUFLEN);
248
#ifdef MPI_SPAWN_ZEROCOPY
249
if(msginfo->operation==PUT && msginfo->datalen==0)
251
if(msginfo->format==STRIDED)
253
armci_mpi_rcv_strided_data(msginfo, pdescr, p);
255
if(msginfo->format==VECTOR)
257
armci_mpi_rcv_vector_data(msginfo, pdescr, p);
263
*buflen = MSG_BUFLEN - hdrlen;
264
if (msginfo->operation == GET)
266
bytes = msginfo->dscrlen;
270
bytes = msginfo->bytes;
272
armci_die2("armci_rcv_req: message overflowing rcv buf",
273
msginfo->bytes, *buflen);
276
#if MPI_SPAWN_DEBUG && !defined(MPI_SPAWN_ZEROCOPY) && 0
279
MPI_Get_count(&status, MPI_BYTE, &count);
280
if (count != (bytes + hdrlen))
282
armci_mpi2_server_debug(armci_server_me, "armci_rcv_req: "
283
"got %d bytes, expected %d bytes\n",
284
count, bytes + hdrlen);
285
printf("%d: armci_rcv_req: got %d bytes, expected %d bytes (%d)\n",
286
armci_me, count, bytes + hdrlen, msginfo->datalen);
287
armci_die("armci_rcv_req: count check failed.\n", 0);
294
* (void **) pdescr = msginfo + 1;
295
* (void **) pdata = msginfo->dscrlen + (char *) (msginfo+1);
296
*buflen -= msginfo->dscrlen;
298
if (msginfo->operation != GET && msginfo->datalen)
300
*buflen -= msginfo->datalen;
305
* (void**) pdata = msginfo + 1;
306
* (void**) pdescr = NULL;
309
if (msginfo->datalen > 0 && msginfo->operation != GET)
311
if (msginfo->datalen > (MSG_BUFLEN - hdrlen - msginfo->dscrlen)) {
312
armci_die2("armci_rcv_req:data overflowing buffer",
313
msginfo->dscrlen, msginfo->datalen);
315
*buflen -= msginfo->datalen;
319
/* server sends data back to client */
320
void armci_WriteToDirect (int to, request_header_t *msginfo, void *data)
322
armci_mpi2_server_debug(armci_server_me, "armci_WriteToDirect: "
323
"to=%d, msginfo=%p, data=%p, bytes=%d\n",
324
to, msginfo, data, msginfo->datalen);
326
if( !(to >= 0 && to < armci_nproc) )
327
armci_die("armci_WriteToDirect: send request to invalid client", to);
330
MPI_Send(data, msginfo->datalen, MPI_BYTE, to,
331
ARMCI_MPI_SPAWN_TAG, MPI_COMM_SERVER2CLIENT)
335
/*\ server sends strided data back to client
337
void armci_WriteStridedToDirect(int to, request_header_t* msginfo,
338
void *ptr, int strides, int stride_arr[],
341
armci_mpi2_server_debug(armci_server_me, "armci_WriteStridedToDirect: "
342
"to=%d, stride_levels=%d, bytes=%d\n", to, strides,
345
#ifdef MPI_USER_DEF_DATATYPE
348
armci_mpi_strided2(SEND, ptr, strides, stride_arr, count, to,
349
MPI_COMM_SERVER2CLIENT);
354
armci_mpi_strided(SEND, ptr, strides, stride_arr, count, to,
355
MPI_COMM_SERVER2CLIENT);
361
void armci_call_data_server()
366
armci_mpi2_server_debug(0, "armci_call_data_server(): Server main loop\n");
368
#if !defined(MULTIPLE_BUFS)
369
/* server main loop; wait for and service requests until QUIT requested */
373
MPI_Probe(MPI_ANY_SOURCE, ARMCI_MPI_SPAWN_TAG,MPI_COMM_SERVER2CLIENT,
377
p = status.MPI_SOURCE;
378
armci_mpi2_server_debug(armci_server_me,
379
"Processing message from client %d\n", p);
381
armci_data_server(&p);
385
int i, tag, reqid, do_waitlist=0;
387
/* server multiple bufs setup */
388
_req_waitlist_head = NULL;
389
_req_waitlist_tail = NULL;
390
/* Initialize "next tag" array, which manages flow control */
391
if( (_next_tag = (int*) malloc(armci_nproc*sizeof(int)) ) == NULL)
392
armci_die("mpi2_server: _next_tag malloc failed", 0);
393
for(i=0; i<armci_nproc; i++) _next_tag[i] = ARMCI_MPI_SPAWN_TAG_BEGIN;
396
/* server posts multiple receive buffers in advance */
397
for(i=0; i<MPI2_MAX_BUFS; i++)
400
MPI_Irecv(_mpi2_rcv_buf[i], MSG_BUFLEN, MPI_BYTE, MPI_ANY_SOURCE,
401
ARMCI_MPI_SPAWN_TAG, MPI_COMM_SERVER2CLIENT,
408
/* process wait-listed requests, if any */
410
if(_req_waitlist_head != NULL)
412
do_waitlist = wlist_get_req(&p, &tag, &reqid);
417
/* process the first completed incoming request */
419
MPI_Waitany(MPI2_MAX_BUFS, _mpi_request, &reqid, &status)
421
p = status.MPI_SOURCE;
422
/* tag = status.MPI_TAG; */
423
tag = ((request_header_t*) _mpi2_rcv_buf[reqid])->tag;
425
/* check if it is in or out of order request */
426
if(tag == _next_tag[p]) { INCR_TAG(p); }
429
/* out of order req - enforce ordering by waitlisting this req */
430
wlist_add_req(reqid, p, tag);
435
/* mark the request id that is ready to processed */
436
_reqid_ready = reqid;
438
/* server process the incoming (or waitlisted) request */
439
armci_data_server(&p);
441
/* After completing the request (which also frees a buffer), server
442
* posts a receive using this buffer */
444
MPI_Irecv(_mpi2_rcv_buf[reqid], MSG_BUFLEN, MPI_BYTE, MPI_ANY_SOURCE,
445
ARMCI_MPI_SPAWN_TAG, MPI_COMM_SERVER2CLIENT,
446
&_mpi_request[reqid])
453
* Platform specific server code ENDs here.
454
**************************************************************************/
456
static void emulate_armci_init_clusinfo()
460
MPI_Comm_remote_size(MPI_COMM_SERVER2CLIENT, &psize);
462
/* server id (i.e. server's armci_me) is derived from node master's id.
463
Similar to armci_create_server_process() to set SERVER_CONTEXT */
464
armci_me = SOFFSET - armci_client_first;
466
armci_usr_tid = THREAD_ID_SELF(); /*remember the main user thread id */
467
armci_master = armci_client_first;
469
/* ***** emulate armci_init_clusinfo() ***** */
470
armci_clus_me = armci_server_me;
471
armci_nclus = armci_nserver;
472
armci_clus_first = armci_clus_info[armci_clus_me].master;
473
armci_clus_last = (armci_clus_first +
474
armci_clus_info[armci_clus_me].nslave - 1);
476
if(armci_clus_first != armci_client_first ||
477
armci_nclients != armci_clus_info[armci_clus_me].nslave)
479
armci_mpi2_server_debug(armci_server_me,
480
"armci_clus_first=%d, armci_clus_last=%d\n",
481
armci_clus_first, armci_clus_last);
482
armci_die("mpi2_server: armci_clus_info is incorrect.", 0);
486
static void emulate_armci_allocate_locks(long *shm_info)
488
int shmid = (int) shm_info[0];
489
long shmoffset = shm_info[1];
490
size_t shmsize = (size_t) shm_info[2];
492
_armci_int_mutexes = (PAD_LOCK_T*) armci_get_shmem_ptr(shmid, shmoffset,
497
void armci_mpi2_server_init()
500
int namelen, version, subversion;
501
char processor_name[MPI_MAX_PROCESSOR_NAME];
505
MPI_Comm_rank(ARMCI_COMM_WORLD, &armci_server_me);
506
MPI_Comm_size(ARMCI_COMM_WORLD, &armci_nserver);
507
MPI_Get_processor_name(processor_name, &namelen);
508
MPI_Get_version(&version, &subversion);
510
armci_mpi2_server_debug(armci_server_me,
511
"I'm %d of %d SERVERS running on %s (MPI %d.%d)\n",
512
armci_server_me, armci_nserver, processor_name,
513
version, subversion);
515
/* get parent's groupinfo */
516
MPI_Comm_get_parent(&MPI_COMM_SERVER2CLIENT);
517
if (MPI_COMM_SERVER2CLIENT == MPI_COMM_NULL)
519
armci_die("mpi2_server: Invalid spawn. No parent process found.\n",0);
522
/* receive my clients info */
525
MPI_Recv(msg, 3, MPI_INT, MPI_ANY_SOURCE, ARMCI_MPI_SPAWN_INIT_TAG,
526
MPI_COMM_SERVER2CLIENT, &status);
527
if(msg[0]-ARMCI_MPI_SPAWN_INIT_TAG != armci_server_me)
529
armci_die("mpi2_server: Recv failed", msg[0]);
532
armci_client_first = msg[1];
533
armci_nclients = msg[2];
534
armci_mpi2_server_debug(armci_server_me,
535
"My clients are [%d-%d]\n", armci_client_first,
536
armci_client_first+armci_nclients-1);
539
/**********************************************************************
540
* Emulate PARMCI_Init().
541
* Spawned Data server processes emulate PARMCI_Init() to complete the
542
* ARMCI Initalization process similar to clients
545
armci_clus_info =(armci_clus_t*)malloc(armci_nserver*sizeof(armci_clus_t));
546
if(armci_clus_info == NULL)
548
armci_die("mpi2_server: armci_clus_info malloc failed", 0);
551
/* receive and emulate clus info, lock info */
552
MPI_Recv(armci_clus_info, armci_nserver*sizeof(armci_clus_t), MPI_BYTE,
553
armci_client_first, ARMCI_MPI_SPAWN_INIT_TAG,
554
MPI_COMM_SERVER2CLIENT, &status);
556
MPI_Recv(shm_info, 3, MPI_LONG, armci_client_first,
557
ARMCI_MPI_SPAWN_INIT_TAG, MPI_COMM_SERVER2CLIENT, &status);
560
/* server setup clusinfo&locks, exactly as this node's armci master */
561
emulate_armci_init_clusinfo(); /* armci_init_clusinfo() in PARMCI_Init */
562
emulate_armci_allocate_locks(shm_info); /* armci_allocate_locks() */
564
/* Fence data structures should be initialized by server too. see
565
* armci_generic_rmw (called by armci_server_rmw) */
569
* End of PARMCI_Init() emulation.
570
* *******************************************************************/
572
MPI_Barrier(ARMCI_COMM_WORLD);
575
void armci_mpi2_server()
578
armci_mpi2_server_init();
580
armci_server_code(NULL);
582
/* Should never come here */
583
armci_die("mpi2_server: server died in an unexpected manner", 0);