6
* MPI_SPAWN: ARMCI on top of MPI.
30
#define ARMCI_ROOT 0 /* root process */
32
/* Inter-communicators for communicating between clients and data servers */
33
MPI_Comm MPI_COMM_CLIENT2SERVER=MPI_COMM_NULL;
35
static int armci_nserver=-1;
36
static int *_armci_mpi_tag=NULL;
38
extern char ***_armci_argv;
39
extern int armci_get_shmem_info(char *addrp, int* shmid, long *shmoffset,
43
void armci_mpi2_debug(int rank, const char *format, ...)
49
va_start(arg, format);
57
#define armci_mpi2_debug(x, ...)
61
static inline int MPI_Check (int status)
63
if(status != MPI_SUCCESS)
65
armci_mpi2_debug(armci_me, "MPI Check failed.\n");
66
armci_die("MPI_Check failed.", 0);
70
# define MPI_Check(x) x
74
/**************************************************************************
75
* Platform specific server code as required by the ARMCI s/w layer. (BEGIN)
78
/* Create connections between clients and servers */
79
void armci_init_connections()
81
armci_mpi2_debug(0, "armci_init_connections\n");
82
_armci_buf_init(); /* CHECK: Is this correct ? */
85
void armci_wait_for_server()
87
armci_mpi2_debug(0, "armci_wait_for_server: wait for server to quit\n");
88
if (armci_me == armci_master)
94
void armci_client_connect_to_servers()
96
armci_mpi2_debug(0, "armci_client_connect_to_servers\n");
99
/* NOTE: armci_mpi_strided and armci_mpi_strided2 are the only 2 functions
100
* that are common to client and server part */
101
void armci_mpi_strided(int op, void *ptr, int stride_levels, int stride_arr[],
102
int count[], int proc, MPI_Comm comm)
105
long idx; /* index offset of current block position to ptr */
106
int n1dim; /* number of 1 dim block */
107
int bvalue[MAX_STRIDE_LEVEL], bunit[MAX_STRIDE_LEVEL];
110
/* number of n-element of the first dimension */
112
for(i=1; i<=stride_levels; i++)
115
/* calculate the destination indices */
116
bvalue[0] = 0; bvalue[1] = 0; bunit[0] = 1; bunit[1] = 1;
117
for(i=2; i<=stride_levels; i++)
120
bunit[i] = bunit[i-1] * count[i-1];
123
for(i=0; i<n1dim; i++)
126
for(j=1; j<=stride_levels; j++)
128
idx += bvalue[j] * stride_arr[j-1];
129
if((i+1) % bunit[j] == 0) bvalue[j]++;
130
if(bvalue[j] > (count[j]-1)) bvalue[j] = 0;
136
MPI_Send(((char*)ptr)+idx, count[0], MPI_BYTE, proc,
137
ARMCI_MPI_SPAWN_DATA_TAG, comm)
140
else /* ( op == RECV) */
143
MPI_Recv(((char*)ptr)+idx, count[0], MPI_BYTE, proc,
144
ARMCI_MPI_SPAWN_DATA_TAG, comm, &status)
150
/* This is the only function that is common to client and server part */
151
void armci_mpi_strided2(int op, void *ptr, int stride_levels, int stride_arr[],
152
int count[], int proc, MPI_Comm comm)
156
MPI_Datatype type[MAX_STRIDE_LEVEL];
158
if(stride_levels == 0)
160
armci_mpi_strided(op, ptr, stride_levels, stride_arr, count, proc,
165
/* convert stided data desciption to MPI type */
167
for(i=1; i<=stride_levels; i++)
169
stride *= stride_arr[i-1];
170
MPI_Check( MPI_Type_hvector(count[i], count[i-1], stride,
171
type[i-1], &type[i]) );
173
MPI_Check( MPI_Type_commit(&type[stride_levels]) );
177
MPI_Check( MPI_Send(ptr, 1, type[stride_levels], proc,
178
ARMCI_MPI_SPAWN_VDATA_TAG, comm) );
180
else /* ( op == RECV) */
182
MPI_Check( MPI_Recv(ptr, 1, type[stride_levels], proc,
183
ARMCI_MPI_SPAWN_VDATA_TAG, comm, &status) );
187
/*\ client sends request message to server
189
int armci_send_req_msg (int proc, void *buf, int bytes)
191
int server = armci_clus_id(proc);
193
armci_mpi2_debug(armci_me, "armci_send_req_msg(): proc=%d, server=%d, "
194
"buf=%p, bytes=%d\n", proc, server, buf, bytes);
196
if( !(server >= 0 && server < armci_nserver) )
197
armci_die("armci_send_req_msg: Invalid server.", 0);
201
* Sequentially ordered tags to ensure flow control at the server side.
202
* For example, a put followed by get from a client should be processed in
203
* ORDER at the server side. If we don't have the flow control, the server
204
* might process the get request first instead of put (and thus violating
205
* ARMCI's ordering semantics.
207
((request_header_t*)buf)->tag = _armci_mpi_tag[server];
209
MPI_Send(buf, bytes, MPI_BYTE, server, ARMCI_MPI_SPAWN_TAG,
210
MPI_COMM_CLIENT2SERVER)
213
_armci_mpi_tag[server]++;
214
if(_armci_mpi_tag[server] > ARMCI_MPI_SPAWN_TAG_END)
215
_armci_mpi_tag[server] = ARMCI_MPI_SPAWN_TAG_BEGIN;
219
MPI_Send(buf, bytes, MPI_BYTE, server, ARMCI_MPI_SPAWN_TAG,
220
MPI_COMM_CLIENT2SERVER)
223
armci_mpi2_debug(armci_me, "armci_send_req_msg(): send msg to server(%d), to"
224
"fwd to client %d\n", server, proc);
229
/*\ client sends strided data + request to server
231
int armci_send_req_msg_strided(int proc, request_header_t *msginfo,char *ptr,
232
int strides, int stride_arr[], int count[])
234
int server = armci_clus_id(proc);
237
armci_mpi2_debug(armci_me, "armci_send_req_msg_strided: proc=%d server=%d "
238
"bytes=%d (op=%d)\n", proc, server, msginfo->datalen,
241
THREAD_LOCK(armci_user_threads.net_lock);
243
/* we write header + descriptor of strided data */
244
bytes = sizeof(request_header_t) + msginfo->dscrlen;
245
armci_send_req_msg(proc, msginfo, bytes);
247
#ifdef MPI_USER_DEF_DATATYPE
250
armci_mpi_strided2(SEND, ptr, strides, stride_arr, count, server,
251
MPI_COMM_CLIENT2SERVER);
256
/* for larger blocks write directly thus avoiding memcopy */
257
armci_mpi_strided(SEND, ptr, strides, stride_arr, count, server,
258
MPI_COMM_CLIENT2SERVER);
261
THREAD_UNLOCK(armci_user_threads.net_lock);
263
armci_mpi2_debug(armci_me, "armci_send_req_msg_strided(): send msg to "
264
"server(%d), to fwd to client %d\n", server, proc);
269
/*\ client receives data from server
271
char *armci_ReadFromDirect (int proc, request_header_t *msginfo, int len)
274
int server = armci_clus_id(proc);
277
armci_mpi2_debug(armci_me, "armci_ReadFromDirect: proc=%d, server=%d, "
278
"msginfo=%p, bytes=%d (op=%d)\n", proc, server, msginfo,
279
len, msginfo->operation);
281
if( !(server >= 0 && server < armci_nserver) )
282
armci_die("armci_ReadFromDirect: Invalid server.", 0);
285
MPI_Recv(msginfo + 1, len, MPI_BYTE, server, ARMCI_MPI_SPAWN_TAG,
286
MPI_COMM_CLIENT2SERVER, &status)
290
armci_mpi2_debug(armci_me, "recv msg from server(%d), fwd by client %d\n",
296
MPI_Get_count(&status, MPI_BYTE, &count);
299
armci_mpi2_debug(armci_me, "armci_ReadFromDirect: got %d bytes, "
300
"expected %d bytes\n", count, len);
301
armci_die("armci_ReadFromDirect: MPI_Recv failed.", count);
306
return (char *) (msginfo+1);
309
/*\ client receives strided data from server
311
void armci_ReadStridedFromDirect(int proc, request_header_t* msginfo,
312
void *ptr, int strides, int stride_arr[],
316
int server=armci_clus_id(proc);
318
armci_mpi2_debug(armci_me, "armci_ReadStridedFromDirect: proc=%d "
319
"stride_levels=%d, server=%d bytes=%d (op=%d)\n",
320
proc, strides, server, msginfo->datalen,
324
if( !(server >= 0 && server < armci_nserver) )
325
armci_die("armci_ReadStridedFromDirect: Invalid server.", 0);
327
#ifdef MPI_USER_DEF_DATATYPE
330
armci_mpi_strided2(RECV, ptr, strides, stride_arr, count, server,
331
MPI_COMM_CLIENT2SERVER);
336
armci_mpi_strided(RECV, ptr, strides, stride_arr, count, server,
337
MPI_COMM_CLIENT2SERVER);
344
* Platform specific server code ENDs here. (END)
345
**************************************************************************/
347
static void armci_gather_hostnames(char **hostname_arr)
349
int i, j, k, namelen, is_master;
350
char hostname[MPI_MAX_PROCESSOR_NAME], *hostnames=NULL;
351
int *master_arr=NULL;
353
master_arr = (int*) malloc(armci_nproc * sizeof(int));
354
hostnames = (char*) malloc(armci_nproc * MPI_MAX_PROCESSOR_NAME *
357
if(hostnames==NULL || master_arr==NULL)
359
armci_die("armci_gather_hostnames: malloc failed.", 0);
363
MPI_Get_processor_name(hostname, &namelen);
365
MPI_Allgather(hostname, MPI_MAX_PROCESSOR_NAME, MPI_CHAR,
366
hostnames, MPI_MAX_PROCESSOR_NAME, MPI_CHAR,
370
if(armci_me == armci_master)
379
MPI_Check(MPI_Allgather(&is_master, 1, MPI_INT, master_arr, 1, MPI_INT,
383
/* get only the hostname of armci master processes */
384
for(i=0,j=0,k=0; i<armci_nproc; i++)
386
if(master_arr[i] == 1)
389
armci_die("armci_gather_hostnames: Invalid masters.",0);
390
strncpy(hostname_arr[j++], &hostnames[k], MPI_MAX_PROCESSOR_NAME);
392
k += MPI_MAX_PROCESSOR_NAME;
401
void select_server_program(char *server_program,
405
strcpy(server_program, (*_armci_argv)[0]);
411
static void armci_mpi2_spawn()
415
char server_program[100];
416
char **command_arr=NULL, **hostname_arr=NULL, **nid_arr=NULL;
420
/* we need to start 1 data server process on each node. So a total of
421
"armci_nclus" data servers */
422
armci_nserver = armci_nclus;
423
select_server_program(server_program, armci_nserver);
425
armci_mpi2_debug(0, "armci_mpi2_init(): Spawning %d data server processes "
426
"running %s\n", armci_nserver, server_program);
428
/* allocate necessary data structures */
430
command_arr = (char**) malloc(armci_nserver * sizeof(char*));
431
size_arr = (int*) malloc(armci_nserver * sizeof(int));
432
info_arr = (MPI_Info*) malloc(armci_nserver * sizeof(MPI_Info));
433
hostname_arr = (char**) malloc(armci_nserver * sizeof(char*));
435
nid_arr = (char**) malloc(armci_nserver * sizeof(char*));;
437
for(i=0; i<armci_nserver; i++)
439
hostname_arr[i] = (char*)malloc(MPI_MAX_PROCESSOR_NAME*sizeof(char));
442
if(command_arr==NULL || size_arr==NULL || info_arr==NULL ||
445
armci_die("armci_mpi2_spawn: malloc failed.", 0);
450
* 1. root process collects hostnames (i.e. machine names) of where to
451
* spawn dataservers. ARMCI masters of respective node will return their
454
armci_gather_hostnames(hostname_arr);
457
/** 2. initialize MPI_Comm_spawn_multiple() arguments */
459
for(i=0; i<armci_nserver; i++)
461
command_arr[i] = (*_armci_argv)[0]; /*CHECK: path needs fix */
462
size_arr[i] = 1; /* 1 data server in each node */
463
MPI_Info_create(&info_arr[i]);
465
asprintf(&nid_arr[i], "%d", atoi((hostname_arr[i] + 3)));
466
MPI_Info_set(info_arr[i], "host", nid_arr[i]); /*portability? */
468
MPI_Info_set(info_arr[i], "host", hostname_arr[i]); /*portability? */
475
* 3. MPI_Comm_spawn_multiple(): This is a collective call.
476
* Intercommunicator "ds_intercomm" contains only new dataserver processes.
479
MPI_Comm_spawn_multiple(armci_nserver, command_arr, MPI_ARGVS_NULL,
480
size_arr, info_arr, ARMCI_ROOT, ARMCI_COMM_WORLD,
481
&MPI_COMM_CLIENT2SERVER, MPI_ERRCODES_IGNORE)
486
for(i=0; i<armci_nserver; i++) free(hostname_arr[i]);
499
* Create server processes. This is called in armci_start_server.
500
* Must be called after armci_init_clusinfo().
502
void armci_create_server_MPIprocess ()
504
int rank, size, flag, i;
506
MPI_Initialized(&flag);
508
armci_die("ARMCI error: MPI_Init must be called before PARMCI_Init()",0);
510
MPI_Comm_rank(ARMCI_COMM_WORLD, &rank);
511
MPI_Comm_size(ARMCI_COMM_WORLD, &size);
513
/* spawn one data server process (i.e. additional MPI proc) on each node */
517
* Armci masters send the following info to their corresponding server as
518
* the server was not part of the initialization step in PARMCI_Init()
519
* 1. cluster info ( i.e. armci_init_clusinfo() )
520
* 2. lock info ( i.e.armci_allocate_locks() )
523
if(armci_me == armci_master) {
525
long shm_info[3], shmoffset;
532
msg[0] = ARMCI_MPI_SPAWN_INIT_TAG + armci_clus_me; /* for validation */
534
msg[2] = armci_clus_info[armci_clus_me].nslave;
535
MPI_Send(msg, 3, MPI_INT, armci_clus_me, ARMCI_MPI_SPAWN_INIT_TAG,
536
MPI_COMM_CLIENT2SERVER);
538
/* send the entire clus info to its data server */
539
MPI_Send(armci_clus_info, armci_nclus*sizeof(armci_clus_t), MPI_BYTE,
540
armci_clus_me, ARMCI_MPI_SPAWN_INIT_TAG,
541
MPI_COMM_CLIENT2SERVER);
546
armci_get_shmem_info((char*)_armci_int_mutexes, &shmid, &shmoffset,
548
shm_info[0] = (long) shmid;
549
shm_info[1] = (long) shmoffset;
550
shm_info[2] = (long) shmsize;
552
MPI_Send(shm_info, 3, MPI_LONG, armci_clus_me, ARMCI_MPI_SPAWN_INIT_TAG,
553
MPI_COMM_CLIENT2SERVER);
557
/* initialize tags for flow control */
558
_armci_mpi_tag = (int*) malloc(armci_nserver*sizeof(int));
559
for(i=0; i<armci_nserver; i++)
560
_armci_mpi_tag[i]=ARMCI_MPI_SPAWN_TAG_BEGIN;
562
/* makesure all processes sync here. CHECK: does it ensure global sync ? */
563
MPI_Barrier(ARMCI_COMM_WORLD);
565
armci_mpi2_debug(0, "armci_create_server_MPIprocess: Servers spawned!\n");
571
1. MPI_Info portability issue. e.g. MPI_Info_set
573
2. For sockets with server process option (i.e. not thread, but an actual
574
process as data server), all clients call armci_init_connections() in
575
armci_start_server(), which ic called by PARMCI_Init(). Should this init
576
connections be done for MPI_SPAWN as well
577
NOTE: armci_init_connections call _armci_buf_init(). In our MPI_SPAWN case we
578
never call _armci_buf_init() either in clients or in (spawned) dataservers.
580
3. Implement non-blocking in future for sure. For example:
581
armci_save_strided_dscr() is disabled in armci_rem_strided (request.c) and
582
armci_complete_vector_get is enabled in armci_rev_vector (request.c)