1
/* $Id: elan4.c,v 1.15.2.2 2007-07-02 05:19:42 d3p687 Exp $ */
8
#include <elan/devent.h>
11
#define DEBUG_NOTIFY 0
13
what are we doing here
16
#define DBL_BUF_SIZE 50000
18
static double elan4_serv_bufs[MAX_BUFS][DBL_BUF_SIZE];
20
static ELAN_PGCTRL *_pgctrl;
21
static void *_pgsstate;
23
static void *zero=(void*)0;
24
extern void *pgs_init (ELAN_STATE *state, void *qMem);
25
extern void * pgs_ds_init (ELAN_STATE *state, void *qMem, void *dsqMem, int max);
27
static int _ELAN_SLOTSIZE=700;
28
static int server_can_poll=0;
30
/* TBD: why was VCALLS disabled if NB_NONCONT was defined */
37
#define MIN_OUTSTANDING 6
38
static int max_pending=16; /* throttle number of outstanding nb calls */
42
static int armci_server_terminating=0;
43
static ELAN_MAIN_QUEUE *mq;
44
static int armci_request_from=-1;
45
static int armci_request_to=-1;
51
int** recv_verify_arr;
52
int** recv_verify_smp_arr;
54
armci_verify_wait_t __armci_verify_wait_struct;
55
armci_verify_wait_t *verify_wait = &__armci_verify_wait_struct;
57
static ops_t** armci_elan_fence_arr;
58
static ops_t *ops_pending_ar;
59
ops_t *rdma_ops_pending_ar;
60
static ops_t *ops_done_ar;
62
static ELAN_EVENT* prevnotifydscr = (ELAN_EVENT *)0;
63
static ELAN_EVENT* prevnotifywaitdscr = (ELAN_EVENT *)0;
64
int **mynotify_epochs;
65
static int **notify_epoch_arr;
66
static int *mycurrent_epoch;
67
static int **current_epoch_seq;
68
static int numepochs = 2;
70
#define MSG_DATA_LEN (_ELAN_SLOTSIZE - sizeof(request_header_t))
72
#define MY_PUT2(src,dst,bytes,p) \
73
elan_wait(elan_doput(_pgctrl,(src),(dst),(ELAN_ADDR)elanev[0],(bytes),p,0), elan_base->waitType)
75
#define MY_PUT(src,dst,bytes,p) \
76
elan_wait(elan_put(elan_base->state,(src),(dst),(bytes),p), elan_base->waitType)
77
#define MY_GET(src,dst,len,p)\
78
elan_wait(elan_get(elan_base->state,src,dst,len,p),elan_base->waitType)
85
static ELAN_QUEUE_TX *qtx;
86
static ELAN_QUEUE_RX *qrx;
88
static ELAN_MAIN_QUEUE *mq;
91
#define NEVENTS (armci_nproc*numepochs)
93
static ELAN_EVENT_ELAN *evdelan;
94
static ELAN_EVENT **elanev;
96
extern int elan_devent_completed(int setval, ELAN_EVENT *e);
97
#define DOPUT(_src, _dst, _bidx, _len, _p) \
98
elan_doput(_pgctrl,_src,_dst,elan_destEvent(elanev[_bidx]),_len,_p,_RAIL)
99
#define DOWAIT(_bidx) elan_wait(elanev[_bidx],elan_base->waitType)
100
#define REPRIME(_bidx) elan_setWaitCount(elanev[_bidx],1)
102
void test_put(void *src, void *dst, int bytes, int p)
104
elan_wait(elan_doput(_pgctrl,(src),(dst),elan_destEvent(elanev[0]),(bytes),p,0), elan_base->waitType);
106
elan_wait(elan_doput(_pgctrl,(src),(dst),elan_destEvent(elanev[0]),(bytes),p,0), elan_base->waitType);
108
elan_wait(elan_doput(_pgctrl,(src),(dst),elan_destEvent(elanev[0]),(bytes),p,0), elan_base->waitType);
110
elan_wait(elan_doput(_pgctrl,(src),(dst),elan_destEvent(elanev[0]),(bytes),p,0), elan_base->waitType);
116
while(!elan_poll(elanev[0],0)){
117
_elan_deventDump ("wait",elanev[0]);
118
printf("%d:completed %d\n",armci_me, elan_devent_completed(SETEVAL, elanev[0]));
121
/*elan_wait(elanev[0],0);*/
122
_elan_deventDump ("wait",elanev[0]);
127
_elan_deventDump ("after wait",elanev[0]);
130
/*extern ELAN_EVENT *elan_getbflag(void *pgs,u_int destvp, long *retval);*/
131
extern ELAN_EVENT *elan_getbflag(void *pgs,u_int destvp, int lo, int hi, int w, long *retval);
132
extern void elan_clearbflag(void *pgs, int which);
133
extern void elan_deventDump (const char *label, ELAN_EVENT *e);
135
int armci_getbflag(int p)
137
static long retval=-1;
138
elan_wait(elan_getbflag(_pgsstate,p,0,MAX_BUFS,1,&retval), elan_base->waitType);
143
void armci_clearbflag(int which)
145
elan_clearbflag(_pgsstate, which);
149
/* NOTE: for thread-safety we need to make put this in user buffer */
150
static ELAN_EVENT *event_getbflag=NULL;
153
#define BFLAG_PATH_SIZE (_ELAN_SLOTSIZE-sizeof(request_header_t))
154
#define BFLAG_PATH_SIZE_ 4000
156
int armcill_getbidx(int size, int proc, SERV_BUF_IDX_T *bufidx)
159
if(size > BFLAG_PATH_SIZE){
160
int cluster = armci_clus_id(proc);
161
int proc_serv = armci_clus_info[cluster].master;
162
event_getbflag = elan_getbflag(_pgsstate,proc_serv,0,MAX_BUFS,0,bufidx);
170
#ifdef ARMCI_ENABLE_GPC_CALLS
171
extern gpc_buf_t *gpc_req;
173
void armci_init_connections()
176
int nslots=armci_nproc+256, slotsize;
181
/*_ELAN_SLOTSIZE = elan_queueMaxSlotSize(elan_base->state);*/
182
slotsize=_ELAN_SLOTSIZE;
183
#ifdef ARMCI_ENABLE_GPC_CALLS
184
gpc_req = (gpc_buf_t *)malloc(MAX_GPC_REQ*sizeof(gpc_buf_t)+SIXTYFOUR);
186
if ((q = elan_gallocQueue(elan_base, elan_base->allGroup)) == NULL)
187
armci_die( "elan_gallocElan",0 );
190
if(!(qrx = elan_queueRxInit(elan_base->state, q, nslots, slotsize, R, 0)))
191
armci_die("Failed to initialise elan receive Q",0);
192
if(!(qtx = elan_queueTxInit(elan_base->state, q, R, 0)))
193
/* if(!(qtx = elan_queueTxInit(elan_base->state, q, R, LIBELAN_QUEUEREUSEBUF)))*/
195
if(!(mq = elan_mainQueueInit( elan_base->state, q, nslots, slotsize, 0)))
197
armci_die("Failed to initialise elan Q",0);
200
_qd = elan_gallocElan(elan_base, elan_base->allGroup, ELAN_QUEUE_ALIGN,
201
elan_pgvGlobalMemSize(elan_base->state));
203
if(!_qd) armci_die("failed elan_gallocElan 1",0);
204
elan_gsync(elan_base->allGroup);
205
_pgctrl = elan_putgetInit(elan_base->state, _qd, 16, 4096, 4096, 32, ELAN_PGVINIT);
206
if(!_pgctrl) armci_die("failed elan_gallocElan 2",0);
207
elan_gsync(elan_base->allGroup);
210
evdelan = elan_gallocElan(elan_base,elan_base->allGroup,32,NEVENTS*sizeof(ELAN_EVENT_ELAN));
211
if(!evdelan) armci_die("failed elan_gallocElan for dest events",0);
212
elanev = (ELAN_EVENT **)malloc(sizeof(ELAN_EVENT *)*NEVENTS);
213
for(i=0; i<NEVENTS; i++){
214
elanev[i]= elan_initEvent(elan_base->state,elan_base->state->rail[_RAIL],evdelan+i,0);
215
if(!elanev[i]) armci_die("elan_initEvent failed",i);
216
/*print_event_info(elanev[i]); */
218
elan_gsync(elan_base->allGroup);
219
/*_elan_deventDump ("init",elanev[0]); */
224
qs = elan_gallocElan(elan_base, elan_base->allGroup, ELAN_QUEUE_ALIGN,
225
elan_pgsGlobalMemSize(elan_base->state));
226
/*_pgsstate = pgs_init(elan_base->state, q); */
227
_pgsstate = pgs_ds_init(elan_base->state, qs, q, MAX_BUFS);
229
if(armci_me == armci_master) {
230
if(!(ops_done_ar=(ops_t*)calloc(armci_nproc,sizeof(ops_t))))
231
armci_die("malloc failed for ARMCI ops_done_ar",0);
234
armci_elan_fence_arr = (ops_t**)malloc(armci_nproc*sizeof(ops_t*));
235
if(!armci_elan_fence_arr)armci_die("malloc failed for ARMCI fence array",0);
236
if(PARMCI_Malloc((void**)armci_elan_fence_arr, armci_nclus*sizeof(ops_t)))
237
armci_die("failed to allocate ARMCI fence array",0);
238
bzero(armci_elan_fence_arr[armci_me],armci_nclus*sizeof(ops_t));
240
if(!(rdma_ops_pending_ar=(ops_t*)calloc(armci_nproc,sizeof(ops_t))))
241
armci_die("malloc failed for ARMCI rdma_ops_pending_ar",0);
246
/* initialize control descriptor for put/get */
247
armci_pgctrl = elan_putgetInit(elan_base->state, 32, 8);
248
if(!armci_pgctrl) armci_die("armci_init_con: elan_putgetInit failed",0);
251
/* check if we can poll in the server thread */
252
enval = getenv("ARMCI_SERVER_CAN_POLL");
254
if((enval[0] != 'N') && (enval[0]!='n')) server_can_poll=1;
256
if(armci_clus_info[armci_clus_me].nslave < armci_getnumcpus()) server_can_poll=1;
259
if(MessageSndBuffer){
260
((request_header_t*)MessageSndBuffer)->tag.data_ptr = (void*)0;
261
}else armci_die("armci_init_connections: buf not set",0);
264
void armci_elan_notify_init()
267
if(!(ops_pending_ar=(ops_t*)calloc(armci_nclus,sizeof(ops_t))))
268
armci_die("malloc failed for ARMCI ops_pending_ar",0);
270
notify_epoch_arr = (int **)malloc(sizeof(int *)*armci_nproc);
271
if(!notify_epoch_arr)armci_die("malloc failed for notify_epoch_arr",0);
272
if(PARMCI_Malloc((void**)notify_epoch_arr,armci_nproc*numepochs*sizeof(int)))
273
armci_die("failed to allocate ARMCI fence array",0);
274
bzero(notify_epoch_arr[armci_me],armci_nproc*numepochs*sizeof(int));
275
mynotify_epochs = (int **)calloc(armci_nproc,sizeof(int *));
276
current_epoch_seq = (int **)calloc(armci_nproc,sizeof(int*));
278
for(i=0;i<armci_nproc;i++){
279
mynotify_epochs[i] = notify_epoch_arr[armci_me]+i*numepochs;
280
current_epoch_seq[i] = (int *)calloc(armci_nproc,sizeof(int));
284
if(!(verify_wait->verify_seq_ar=(int*)calloc(armci_nproc,sizeof(int))))
285
armci_die("malloc failed for ARMCI verify_seq_ar",0);
286
/*allocate an array for wait sequence array*/
287
if(!(verify_wait->wait_seq_ar=(int*)calloc(armci_nproc,sizeof(int))))
288
armci_die("malloc failed for ARMCI wait_seq_ar",0);
289
for(i=0;i<armci_nproc;i++){
290
verify_wait->verify_seq_ar[i]=1;
291
verify_wait->wait_seq_ar[i]=1;
294
verify_wait->recv_verify_smp_arr = (int**)malloc(armci_nproc*sizeof(int*));
295
if(!verify_wait->recv_verify_smp_arr)armci_die("malloc-recv_verify_smp",0);
296
bzero(verify_wait->recv_verify_smp_arr ,armci_nproc*sizeof(ops_t*));
298
verify_wait->recv_verify_arr = (int**)malloc(armci_nproc*sizeof(int*));
299
if(!verify_wait->recv_verify_arr)armci_die("malloc fail-recv_verify_arr",0);
300
bzero(verify_wait->recv_verify_arr ,armci_nproc*sizeof(int*));
302
if(PARMCI_Malloc((void**)verify_wait->recv_verify_arr,
303
armci_nproc*3*sizeof(int)))
304
armci_die("failed to allocate recv_verify_arr",0);
306
if(PARMCI_Malloc((void**)verify_wait->recv_verify_smp_arr,
307
armci_nproc*sizeof(int)*2))
308
armci_die("failed to allocate ARMCI fence array",0);
313
/*\ server sends ACK to client when request is processed
315
static void armci_send_ack()
318
ops_t *buf = armci_elan_fence_arr[armci_request_from] + armci_clus_me;
322
printf("%d:server sends ack p=%d fence=%p slot %p got=%d\n", armci_me,
323
armci_request_from, armci_elan_fence_arr[armci_request_from],buf,
324
ops_done_ar[armci_request_from]+1); fflush(stdout);
327
val = ++ops_done_ar[armci_request_from];
328
verify_wait->recv_verify_smp_arr[armci_request_to][armci_request_from]=val;
330
MY_PUT(&val,buf,sizeof(ops_t),armci_request_from);
334
ops_t armci_check_int_val(ops_t *v)
340
void armci_elan_fence(int p)
343
int cluster = armci_clus_id(p);
344
ops_t *buf = armci_elan_fence_arr[armci_me] + cluster;
345
long res = ops_pending_ar[cluster] - armci_check_int_val(buf);
348
if(ops_pending_ar[cluster])
349
printf("%d: client fencing proc=%d fence=%p slot %p pending=%d got=%d\n",
350
armci_me, p, armci_elan_fence_arr[armci_me], buf,
351
ops_pending_ar[cluster], armci_check_int_val(buf)); fflush(stdout);
355
if(++loop == 1000) { loop=0; usleep(1); }
356
armci_util_spin(loop, buf);
357
res = ops_pending_ar[cluster] - armci_check_int_val(buf);
362
void armci_send_q(int p, void *buf, int len)
364
extern ELAN_EVENT * armci_sendq(void *pgs,u_int destvp,void *buf, int len);
365
armci_sendq(_pgsstate, p, buf, len);
370
/*\ server sends data to client buffer
372
void armci_WriteToDirect(int dst, request_header_t *msginfo, void *buffer)
374
armci_die("armci_WriteToDirect: should not be called in this case",0);
377
char *armci_ReadFromDirect(int proc, request_header_t * msginfo, int len)
379
char *buf = (char*) msginfo;
385
void armci_call_data_server()
389
usec_to_poll = server_can_poll? ELAN_POLL_EVENT: 0;
392
printf("%d(server): waiting for request\n",armci_me); fflush(stdout);
394
#ifdef ARMCI_ENABLE_GPC_CALLS
395
unblock_thread_signal(GPC_COMPLETION_SIGNAL);
398
#ifdef ARMCI_ENABLE_GPC_CALLS
399
block_thread_signal(GPC_COMPLETION_SIGNAL);
403
buf = elan_queueRxWait(qrx, NULL, usec_to_poll);
405
char buf[_ELAN_SLOTSIZE];
406
elan_queueWait(mq, buf, usec_to_poll );
408
armci_data_server((char*)buf);
411
/* free the buffer if used */
412
if(bidx>=0) { armci_clearbflag(bidx); bidx =-1; }
413
#ifdef ARMCI_ENABLE_GPC_CALLS
414
unblock_thread_signal(GPC_COMPLETION_SIGNAL);
418
if(DEBUG_) {printf("%d(server): done! closing\n",armci_me); fflush(stdout);}
421
/*\ server receives request
423
void armci_rcv_req(void *mesg, void *phdr, void *pdescr, void *pdata, int *buflen)
426
char *MessageBuffer = MessageRcvBuffer;
427
request_header_t *msginfo = (request_header_t *)mesg;
429
*(void **)phdr = msginfo;
430
armci_request_from = msginfo->from;
431
armci_request_to = msginfo->to;
434
printf("%d(server): got %d req (dscrlen=%d datalen=%d) from %d %p\n",
435
armci_me, msginfo->operation, msginfo->dscrlen,
436
msginfo->datalen, msginfo->from,msginfo->tag.data_ptr); fflush(stdout);
439
*buflen = MSG_BUFLEN - sizeof(request_header_t);
440
*(void **)pdescr = msginfo+1;
441
*(void **)pdata = msginfo->dscrlen + (char*)(msginfo+1);
445
if(msginfo->operation != GET){
446
int payload = msginfo->datalen;
448
char *rembuf = msginfo->tag.data_ptr;
451
bidx = (long)msginfo->tag.ack;
453
printf("%ds bidx=%ld\n",armci_me,bidx); fflush(stdout);
455
if(bidx>MAX_BUFS || bidx<0)
456
armci_die2("got wrong buffer index",(int)bidx,MAX_BUFS);
457
MessageBuffer= (char*) &elan4_serv_bufs[bidx][0];
458
}else MessageBuffer = MessageRcvBuffer;
460
if(msginfo->dscrlen > MSG_DATA_LEN){
461
payload += msginfo->dscrlen;
462
*(void **)pdescr = MessageBuffer;
463
off = msginfo->dscrlen;
464
}else rembuf += msginfo->dscrlen;
466
if((msginfo->dscrlen+msginfo->datalen)> MSG_DATA_LEN){
469
void *flag_to_clear = ((void**)msginfo->tag.ack_ptr);
470
MY_GET(rembuf,MessageBuffer,payload, msginfo->from);
472
/* mark sender buffer as free -- flag is before descriptor */
473
MY_PUT(&zero,flag_to_clear,sizeof(void*),msginfo->from);
475
printf("%d:serv &tag=%p tag=%p dscrlen=%d %d data to %p pdscr=%p pdata=%p\n",
476
armci_me, flag_to_clear, msginfo->tag.ack, msginfo->dscrlen, payload,
477
MessageBuffer,*(void **)pdescr, *(void **)pdata); fflush(stdout);
481
*(void **)pdata = MessageBuffer + off;
485
*(void**)pdescr = NULL;
491
ELAN_EVENT *qtxevent=(ELAN_EVENT*)0;
494
/*\ send request to server thread
496
int armci_send_req_msg(int proc, void *vbuf, int len)
499
char *buf = (char*)vbuf;
500
request_header_t *msginfo = (request_header_t *)buf;
501
int cluster = armci_clus_id(proc);
502
int size=_ELAN_SLOTSIZE;
503
int proc_serv = armci_clus_info[cluster].master;
504
int off =sizeof(request_header_t);
506
if(msginfo->operation==PUT || ARMCI_ACC(msginfo->operation))
507
ops_pending_ar[cluster]++;
508
msginfo->tag.ack_ptr = &msginfo->tag.ack;
510
if(event_getbflag)elan_wait(event_getbflag,elan_base->waitType);
512
printf("%ds: slotsize=%d flag=%d size=%d\n",armci_me,_ELAN_SLOTSIZE,
513
event_getbflag,msginfo->dscrlen+msginfo->datalen);
514
armci_die("protocol inconsitency",(int)(long)msginfo->tag.ack);
519
if(msginfo->operation != GET){
520
if((msginfo->dscrlen+msginfo->datalen)> MSG_DATA_LEN){
521
/* choose remote buffer */
523
extern ELAN_EVENT *armci_sendq(void *,u_int,void*,int,void*,void*, int);
525
payload = msginfo->datalen;
526
if(msginfo->dscrlen > MSG_DATA_LEN){
527
payload += msginfo->dscrlen;
529
else off+= msginfo->dscrlen;
533
Bidx = (long)msginfo->tag.ack;
535
msginfo->inbuf = 0; /* no buf -> take the other path */
538
MessageBuffer= (char*) &elan4_serv_bufs[Bidx][0];
540
printf("%d:SQ %p len=%d tag=%ld\n",armci_me,vbuf,len-payload,Bidx);
543
qtxevent =armci_sendq(_pgsstate, proc_serv, vbuf, len-payload, buf+off,
544
MessageBuffer,payload);
545
buf -= sizeof(ELAN_EVENT*); *(ELAN_EVENT**)buf = qtxevent;
547
return 0; /*********** DONE **********/
551
/* set message tag -> has pointer to client buffer with descriptor+data */
552
msginfo->tag.data_ptr = (void *)(buf + sizeof(request_header_t));
554
printf("%d: SENDing for %d %p to %p %d bytes bidx=%d\n",armci_me,proc_serv,
555
buf+off,MessageBuffer,payload,Bidx); fflush(stdout);
556
MY_PUT(buf+off,MessageBuffer,payload, proc_serv);
558
if(DEBUG_){ printf("%d:in SEND &tag=%p %p tag=%p\n",armci_me,&msginfo->tag.ack,
559
msginfo->tag.ack_ptr,msginfo->tag.data_ptr); fflush(stdout); }
563
msginfo->tag.data_ptr=NULL; /* null tag means sender buffer is free */
564
msginfo->tag.ack=0L; /* tag=0 means sender buffer is free */
569
# ifdef BUF_EXTRA_FIELD_T
571
qtxevent = elan_queueTx(qtx, proc_serv, vbuf, len-payload, 0);
572
buf -= sizeof(ELAN_EVENT*);
573
*(ELAN_EVENT**)buf = qtxevent;
575
if(qtxevent)elan_wait(qtxevent,ELAN_POLL_EVENT);
576
qtxevent = elan_queueTx(qtx, proc_serv, vbuf, len-payload, 0);
580
elan_queueReq(mq, proc_serv, vbuf, len-payload); /* vbuf is sent/copied out */
585
printf("%d sent request %d to (%d,%d)\n",armci_me,ops_pending_ar[proc],
586
proc,proc_serv); fflush(stdout);
594
void armcill_clearbuf(ELAN_EVENT** handle)
596
request_header_t *msginfo = (request_header_t *)(handle+1);
598
elan_wait(*handle, elan_base->waitType);
600
while(msginfo->tag.ack){
601
armci_util_spin(100,msginfo);
602
msginfo->tag.data_ptr=NULL;
607
int armcill_testbuf(ELAN_EVENT** handle)
610
request_header_t *msginfo = (request_header_t *)(handle+1);
613
ret = !elan_poll(handle,1L);
615
ret = (msginfo->tag.ack)? 0: 1;
620
void armci_wait_for_server()
622
armci_server_terminating=1;
625
void armci_transport_cleanup() {
626
_elan_deventDump ("terminate",elanev[0]);
628
void armci_client_connect_to_servers(){}
629
void armci_server_initial_connection(){}
631
void armci_elan_put_with_tracknotify(char *src,char *dst,int n,int proc,
632
ELAN_EVENT **phandle)
636
rdma_ops_pending_ar[proc]++;
637
es = current_epoch_seq[armci_me][proc]+1;
639
*phandle = DOPUT(src,dst,numepochs*armci_me+es,n,proc);
640
if(DEBUG_){printf("\n%d:done put rdma=%d\n",armci_me,rdma_ops_pending_ar[proc]);fflush(stdout);}
643
int armci_inotify_proc(int proc)
645
int *remptr = verify_wait->recv_verify_arr[proc]+3*armci_me;
646
int *myptr = verify_wait->recv_verify_smp_arr[armci_me]+armci_nproc;
649
if(SAMECLUSNODE(proc)){
650
remptr = verify_wait->recv_verify_smp_arr[proc]+armci_me;
651
*(remptr)=verify_wait->verify_seq_ar[proc]++;
657
if(prevnotifydscr)elan_wait(prevnotifydscr,elan_base->waitType);
659
ALIGN_PTR_LONG(int,myptr);
660
myptr[0] = verify_wait->verify_seq_ar[proc]++;
661
myptr[1] = ops_pending_ar[armci_clus_id(proc)];
662
myptr[2] = rdma_ops_pending_ar[proc];
663
rdma_ops_pending_ar[proc]=0;
667
* we wait before ensure that last epoch is complete, we do this because
668
* we want to overlap the time it takes to recv message from server
671
mycurrent_epoch =mynotify_epochs[proc]+current_epoch_seq[armci_me][proc];
674
fprintf(stderr,"%d:waiting for %p from %d at ind %d to be 0",
675
armci_me,mycurrent_epoch,proc,current_epoch_seq[armci_me][proc]);
678
while(armci_check_int_val(mycurrent_epoch)){
679
if(++loop == 1000) { loop=0;usleep(1); }
680
armci_util_spin(loop, mycurrent_epoch);
684
fprintf(stderr,"%d:done waiting for %p from %d at ind %d to be 0",
685
armci_me,mycurrent_epoch,proc,current_epoch_seq[armci_me][proc]);
688
current_epoch_seq[armci_me][proc]++;
689
current_epoch_seq[armci_me][proc]%=numepochs;
690
mycurrent_epoch =mynotify_epochs[proc]+current_epoch_seq[armci_me][proc];
693
/*prevnotifydscr = DOPUT(myptr,remptr,proc,sizeof(int)*3,proc);*/
694
prevnotifydscr = elan_put(elan_base->state,myptr,remptr,sizeof(int)*3,
698
printf("\n%d: sending %d %d %d to %d at %p\n",armci_me,*(myptr),
699
*(myptr+1),*(myptr+2),proc,remptr);
708
int armci_inotify_wait(int proc, int *pval)
710
int *buf_notify,serv_count,rdma_count,*myserv_count;
711
int wait_val,wait_fence=0,zer=0;
712
int res,eventcount,es;
713
int *myptr=verify_wait->recv_verify_smp_arr[armci_me]+armci_nproc;
716
wait_val = verify_wait->wait_seq_ar[proc]++;
718
buf_notify = verify_wait->recv_verify_arr[armci_me]+3*proc;
720
if(SAMECLUSNODE(proc)){
721
buf_notify = verify_wait->recv_verify_smp_arr[armci_me]+proc;
726
printf("\n%d:expecting %d at %p from %d\n",armci_me,wait_val,buf_notify,
730
/*first we wait for sequence to match*/
731
if((wait_val - armci_check_int_val(buf_notify)) > 0) {
733
printf("\n%d:verifyseq expecting%d have %d",armci_me,
734
wait_val,armci_check_int_val(buf_notify));fflush(stdout);
737
res = wait_val - armci_check_int_val(buf_notify);
739
if(++loop == 1000) { loop=0;usleep(1); }
740
armci_util_spin(loop, buf_notify);
742
res = wait_val - armci_check_int_val(buf_notify);
745
printf("\n%d:arrived verifyseq expected %d have %d",armci_me,
746
wait_val,armci_check_int_val(buf_notify));fflush(stdout);
750
if(SAMECLUSNODE(proc))
753
serv_count = verify_wait->recv_verify_arr[armci_me][3*proc+1];
754
rdma_count = verify_wait->recv_verify_arr[armci_me][3*proc+2];
755
myserv_count = verify_wait->recv_verify_smp_arr[armci_me]+proc;
756
wait_fence = serv_count;
760
res = wait_fence - armci_check_int_val(myserv_count);
763
printf("\n%d:fence expecting%d have %d",
764
armci_me,wait_fence,armci_check_int_val(myserv_count));
768
if(!SAMECLUSNODE(proc)){
770
if(++loop == 1000) { loop=0;usleep(1); }
771
armci_util_spin(loop, myserv_count);
773
wait_fence=serv_count =verify_wait->recv_verify_arr[armci_me][3*proc+1];
774
res = wait_fence - armci_check_int_val(myserv_count);
778
rdma_count = verify_wait->recv_verify_arr[armci_me][3*proc+2];
779
current_epoch_seq[proc][armci_me]++;
780
current_epoch_seq[proc][armci_me]%=numepochs;
781
es = proc*numepochs+current_epoch_seq[proc][armci_me];
783
armci_elan_wait_event(elanev[es],rdma_count);
785
*pval = armci_check_int_val(buf_notify);
789
if(prevnotifywaitdscr)elan_wait(prevnotifywaitdscr,elan_base->waitType);
793
fprintf(stderr,"\n%d:clearing %d on %d at %p\n",armci_me,
794
current_epoch_seq[proc][armci_me],proc,
795
notify_epoch_arr[proc]+armci_me*numepochs+current_epoch_seq[proc][armci_me]);
797
prevnotifywaitdscr = elan_put(elan_base->state,myptr,
798
notify_epoch_arr[proc]+armci_me*numepochs+current_epoch_seq[proc][armci_me],
807
/************************************************************************/
808
#if defined(_ELAN_LOCK_H)
811
static ELAN_LOCK *my_locks, *all_locks;
812
static int num_locks=0;
814
/* NOTE that if ELAN is defined the scope of locks is limited to SMP
815
and we do not call the interfaces below */
818
/*\ allocate and initialize num locks on each processor (collective call)
820
void armcill_allocate_locks(int num)
826
if(MAX_LOCKS<num)armci_die2("too many locks",MAX_LOCKS,num);
829
/* allocate memory to hold lock info for all the processors */
830
buf = malloc(armci_nproc*num *sizeof(ELAN_LOCK) + ELAN_LOCK_ALIGN);
831
if(!buf) armci_die("armcill_init_locks: malloc failed",0);
833
mod = ((long)buf) %ELAN_LOCK_ALIGN;
834
all_locks = (ELAN_LOCK*)(buf +ELAN_LOCK_ALIGN-mod);
835
if(((long)all_locks) %ELAN_LOCK_ALIGN)
836
armci_die2("lock alligment failed",mod,ELAN_LOCK_ALIGN);
837
bzero(all_locks,armci_nproc*num *sizeof(ELAN_LOCK));
839
/* initialize local locks */
840
my_locks = all_locks + armci_me * num;
842
elan_lockInit(elan_base->state, my_locks+i, ELAN_LOCK_NORMAL);
844
/* now we use all-reduce to exchange locks info among everybody */
845
elems = (num*armci_nproc*sizeof(ELAN_LOCK))/sizeof(long);
846
if((num*sizeof(ELAN_LOCK))%sizeof(long))
847
armci_die("armcill_init_locks: size mismatch",sizeof(ELAN_LOCK));
848
armci_msg_lgop((long*)all_locks,elems,"+");
851
for(i=0; i<num*armci_nproc; i++) printf("%d:(%d) master=%d type=%d\n",i,elems,(all_locks+i)->lp_master, (all_locks+i)->lp_type);
858
void armcill_lock(int m, int proc)
860
ELAN_LOCK *rem_locks = (ELAN_LOCK*)(all_locks + proc*num_locks);
862
if(m<0 || m>= num_locks) armci_die2("armcill_lock: bad lock id",m,num_locks);
863
if(proc<0 || proc>= armci_nproc) armci_die("armcill_lock: bad proc id",proc);
865
elan_lockLock(elan_base->state, rem_locks + m, ELAN_LOCK_BUSY);
868
void armcill_unlock(int m, int proc)
870
ELAN_LOCK *rem_locks = (ELAN_LOCK*)(all_locks + proc*num_locks);
872
if(m<0 || m>= num_locks) armci_die2("armcill_unlock:bad lockid",m,num_locks);
873
if(proc<0 || proc>=armci_nproc)armci_die("armcill_unlock: bad proc id",proc);
875
elan_lockUnLock(elan_base->state, rem_locks + m);
881
extern ELAN_EVENT *elan_putss (void *pgs, void *src, void *dst, int *src_stride_arr, int *dst_stride_arr, u_int *count, u_int strides, u_int destvp);
884
void armcill_putS(int proc, void* src_ptr, int src_stride_arr[], void* dst_ptr,
885
int dst_stride_arr[], int count[], int stride_levels)
887
elan_wait(elan_putss(_pgsstate,src_ptr,dst_ptr, src_stride_arr,
888
dst_stride_arr, count, stride_levels, proc),elan_base->waitType);
891
ELAN_EVENT * armcill_nbputS(int proc, void* src_ptr, int src_stride_arr[],
892
void* dst_ptr, int dst_stride_arr[], int count[], int stride_levels)
894
return elan_putss(_pgsstate,src_ptr,dst_ptr, src_stride_arr,
895
dst_stride_arr, count, stride_levels, proc);
900
extern ELAN_EVENT *elan_getss (void *pgs, void *src, void *dst, int *src_stride_arr, int *dst_stride_arr, u_int *count, u_int strides, u_int destvp);
901
void armcill_getS(int proc, void* src_ptr, int src_stride_arr[], void* dst_ptr,
902
int dst_stride_arr[], int count[], int stride_levels)
904
elan_wait(elan_getss(_pgsstate,src_ptr,dst_ptr, src_stride_arr,
905
dst_stride_arr, count, stride_levels, proc),elan_base->waitType);
908
ELAN_EVENT* armcill_nbgetS(int proc, void* src_ptr, int src_stride_arr[],
909
void* dst_ptr, int dst_stride_arr[], int count[], int stride_levels)
911
return elan_getss(_pgsstate,src_ptr,dst_ptr, src_stride_arr,
912
dst_stride_arr, count, stride_levels, proc);
916
/************************************************************************/
919
#define MAX_VECS 1024
920
static void* _src[MAX_VECS], *_dst[MAX_VECS];
923
void armci_network_strided(int op, void* scale, int proc,void *src_ptr,
924
int src_stride_arr[], void* dst_ptr, int dst_stride_arr[],
925
int count[], int stride_levels, armci_ihdl_t nb_handle)
929
char *src = (char*)src_ptr, *dst=(char*)dst_ptr;
931
int dsize=3*sizeof(void*);
933
extern ELAN_EVENT *elan_getss (void *pgs, void *src, void *dst, int *src_stride_arr, int *dst_stride_arr, u_int *count, u_int strides, u_int destvp);
934
extern ELAN_EVENT *elan_putss (void *pgs, void *src, void *dst, int *src_stride_arr, int *dst_stride_arr, u_int *count, u_int strides, u_int destvp);
935
if(stride_levels==0){
937
o_cmpl = elan_get(elan_base->state,src,dst,count[0],proc);
940
armci_elan_put_with_tracknotify(src,dst,count[0],proc,&o_cmpl);
942
o_cmpl = elan_put(elan_base->state,src,dst,count[0],proc);
945
else if(stride_levels==1){
947
o_cmpl = elan_getss(_pgsstate,src_ptr,dst_ptr, src_stride_arr,
948
dst_stride_arr, count, stride_levels, proc);
950
o_cmpl = elan_putss(_pgsstate,src_ptr,dst_ptr, src_stride_arr,
951
dst_stride_arr, count, stride_levels, proc);
953
armci_die("network strided called for accumulate",proc);
955
else if(stride_levels==2){
957
o_cmpl = elan_getss(_pgsstate,src_ptr,dst_ptr, src_stride_arr,
958
dst_stride_arr, count, stride_levels, proc);
960
o_cmpl = elan_putss(_pgsstate,src_ptr,dst_ptr, src_stride_arr,
961
dst_stride_arr, count, stride_levels, proc);
963
armci_die("network strided called for accumulate",proc);
966
armci_die("network strided called for stride_levels>=3",proc);
969
elan_wait(o_cmpl,elan_base->waitType);
971
nb_handle->cmpl_info = o_cmpl;
974
void armcill_getv(int proc, int bytes, int count, void* src[], void* dst[])
979
printf("%d: getv %d\n", armci_me, count); fflush(stdout);
981
for (_j = 0; _j < count; _j++ ){
982
_src[issued] = src[_j];
983
_dst[issued] = dst[_j];
985
if(issued == MAX_VECS){
986
elan_wait(elan_getv(_pgctrl,_src,_dst,bytes,issued,proc),elan_base->waitType);
990
if(issued)elan_wait(elan_getv(_pgctrl,_src,_dst,bytes,issued,proc),
991
elan_base->waitType);
995
void armcill_putv(int proc, int bytes, int count, void* src[], void* dst[])
1000
printf("%d: putv %d\n", armci_me, count); fflush(stdout);
1003
for (_j = 0; _j < count; _j++ ){
1004
_src[issued] = src[_j];
1005
_dst[issued] = dst[_j];
1007
if(issued == MAX_VECS){
1008
elan_wait(elan_putv(_pgctrl,_src,_dst,bytes,issued,proc),
1009
elan_base->waitType);
1013
if(issued)elan_wait(elan_putv(_pgctrl,_src,_dst,bytes,issued,proc),
1014
elan_base->waitType);
1024
void armcill_put2D(int proc, int bytes, int count, void* src_ptr,int src_stride,
1025
void* dst_ptr,int dst_stride)
1030
elan_wait(elan_putss(_pgsstate,src_ptr,dst_ptr, &src_stride, &dst_stride, acount,1,proc),elan_base->waitType);
1035
void armcill_put2D(int proc, int bytes, int count, void* src_ptr,int src_stride,
1036
void* dst_ptr,int dst_stride)
1039
char *ps=src_ptr, *pd=dst_ptr;
1042
printf("%d: putv %d\n", armci_me, count); fflush(stdout);
1045
for (_j = 0; _j < count; _j++ ){
1051
if(issued == MAX_VECS){
1052
elan_wait(elan_putv(_pgctrl,_src,_dst,bytes,issued,proc),elan_base->waitType);
1056
if(issued)elan_wait(elan_putv(_pgctrl,_src,_dst,bytes,issued,proc),elan_base->waitType);
1063
void armcill_get2D(int proc, int bytes, int count, void* src_ptr,int src_stride,
1064
void* dst_ptr,int dst_stride)
1069
elan_wait(elan_getss(_pgsstate,src_ptr,dst_ptr, &src_stride, &dst_stride, acount,1,proc),elan_base->waitType);
1073
void armcill_get2D(int proc, int bytes, int count, void* src_ptr,int src_stride,
1074
void* dst_ptr,int dst_stride)
1077
char *ps=src_ptr, *pd=dst_ptr;
1080
printf("%d: getv %d\n", armci_me, count); fflush(stdout);
1082
for (_j = 0; _j < count; _j++ ){
1088
if(issued == MAX_VECS){
1089
elan_wait(elan_getv(_pgctrl,_src,_dst,bytes,issued,proc),elan_base->waitType);
1093
if(issued)elan_wait(elan_getv(_pgctrl,_src,_dst,bytes,issued,proc),elan_base->waitType);
1095
printf("%d: getv count=%d issued=%d\n", armci_me, count,issued); fflush(stdout);
1101
void armcill_wait_get(){}
1102
void armcill_wait_put(){}
1106
#ifdef _ELAN_PUTGET_H
1108
/* might have to use MAX_SLOTS<MAX_PENDING due to throttling a problem in Elan*/
1109
#define MAX_PENDING 6
1110
#define ZR (ELAN_EVENT*)0
1112
static ELAN_EVENT* put_dscr[MAX_SLOTS]= {
1113
ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,
1114
ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR};
1116
static ELAN_EVENT* get_dscr[MAX_SLOTS] = {
1117
ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,
1118
ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR,ZR};
1120
static int cur_get=0;
1121
static int cur_put=0;
1122
static int pending_get=0;
1123
static int pending_put=0;
1126
/*\ strided put, nonblocking
1128
void armcill_put2D(int proc, int bytes, int count, void* src_ptr,int src_stride,
1129
void* dst_ptr,int dst_stride)
1131
int _j, i, batch, issued=0;
1132
char *ps=src_ptr, *pd=dst_ptr;
1135
for (_j = 0; _j < count; ){
1136
/* how big a batch of requests can we issue */
1137
batch = (count - _j )<max_pending ? count - _j : max_pending;
1139
for(i=0; i< batch; i++){
1140
if(put_dscr[cur_put])elan_wait(put_dscr[cur_put],100);
1143
put_dscr[cur_put]= elan_put(elan_base->state,ps, pd,(size_t)bytes,proc);
1145
elan_wait(elan_put(elan_base->state, ps, pd, (size_t)bytes, proc),1000);
1151
if(cur_put>=max_pending)cur_put=0;
1156
armci_die2("armci-elan put:mismatch %d %d \n", count,issued);
1158
for (_j = 0; _j < count; _j++){
1159
elan_wait(elan_put(elan_base->state, ps, pd, (size_t)bytes, proc),1000);
1167
/*\ blocking vector put
1169
void armcill_putv(int proc, int bytes, int count, void* src[], void* dst[])
1171
int _j, i, batch, issued=0;
1175
printf("%d: putv %d\n", armci_me, count); fflush(stdout);
1178
for (_j = 0; _j < count; ){
1179
/* how big a batch of requests can we issue */
1180
batch = (count - _j )<max_pending ? count - _j : max_pending;
1182
for(i=0; i< batch; i++){
1183
if(put_dscr[cur_put])elan_wait(put_dscr[cur_put],100);
1187
put_dscr[cur_put]= elan_put(elan_base->state,ps, pd,(size_t)bytes,proc);
1190
if(cur_put>=max_pending)cur_put=0;
1194
armci_die2("armci-elan putv:mismatch\n", count,issued);
1196
for(i=0; i<max_pending; i++) if(put_dscr[i]){
1197
elan_wait(put_dscr[i],100);
1198
put_dscr[i]=(ELAN_EVENT*)0;
1204
/*\ strided get, nonblocking
1206
void armcill_get2D(int proc, int bytes, int count, void* src_ptr,int src_stride,
1207
void* dst_ptr,int dst_stride)
1209
int _j, i, batch, issued=0;
1210
char *ps=src_ptr, *pd=dst_ptr;
1213
for (_j = 0; _j < count; ){
1214
/* how big a batch of requests can we issue */
1215
batch = (count - _j )<max_pending ? count - _j : max_pending;
1217
for(i=0; i< batch; i++){
1219
if(get_dscr[cur_get])elan_wait(get_dscr[cur_get],100);
1221
get_dscr[cur_get]=elan_get(elan_base->state,ps,pd, (size_t)bytes, proc);
1223
elan_wait(elan_get(elan_base->state, ps, pd, (size_t)bytes, proc),elan_base->waitType);
1229
if(cur_get>=max_pending)cur_get=0;
1234
armci_die2("armci-elan get:mismatch %d %d \n", count,issued);
1236
for (_j = 0; _j < count; _j++){
1237
elan_wait(elan_get(elan_base->state, ps, pd, (size_t)bytes, proc),elan_base->waitType);
1245
/*\ blocking vector get
1247
void armcill_getv(int proc, int bytes, int count, void* src[], void* dst[])
1249
int _j, i, batch, issued=0;
1253
printf("%d: getv %d\n", armci_me, count); fflush(stdout);
1256
for (_j = 0; _j < count; ){
1257
/* how big a batch of requests can we issue */
1258
batch = (count - _j )<max_pending ? count - _j : max_pending;
1260
for(i=0; i< batch; i++){
1261
if(get_dscr[cur_get])elan_wait(get_dscr[cur_get],100);
1265
get_dscr[cur_get]= elan_get(elan_base->state,ps, pd,(size_t)bytes,proc);
1268
if(cur_get>=max_pending)cur_get=0;
1272
armci_die2("armci-elan getv:mismatch %d %d \n", count,issued);
1274
for(i=0; i<max_pending; i++) if(get_dscr[i]){
1275
elan_wait(get_dscr[i],100);
1276
get_dscr[i]=(ELAN_EVENT*)0;
1281
void armcill_wait_get()
1285
if(!pending_get)return;
1287
for(i=0; i<max_pending; i++) if(get_dscr[i]){
1288
elan_wait(get_dscr[i],100);
1289
get_dscr[i]=(ELAN_EVENT*)0;
1294
void armcill_wait_put()
1297
if(!pending_put)return;
1299
for(i=0; i<max_pending; i++) if(put_dscr[i]){
1300
elan_wait(put_dscr[i],100);
1301
put_dscr[i]=(ELAN_EVENT*)0;