1
/* $Id: openib.c,v 1.4.2.9 2007-10-18 06:08:03 d3h325 Exp $
3
* File organized as follows
16
#include "armci-vapi.h"
19
#define DEBUG_FINALIZE 0
20
#define DEBUG_SERVER 0
23
# define VAPIDEV_NAME "InfiniHost0"
24
# define INVAL_HNDL 0xFFFFFFFF
27
/*Debug macros used to tune what is being tested -- mostly openib calls*/
32
u_int32_t armci_max_num_sg_ent;
33
u_int32_t armci_max_qp_ous_swr;
34
u_int32_t armci_max_qp_ous_rwr;
38
uint32_t sqpnum; /*we need to exchng qp nums,arr for that*/
40
uint32_t *rqpnum; /*we need rqp nums,arr for that*/
44
armci_connect_t *CLN_con, *SRV_con;
45
static uint32_t *SRV_rqpnums, *CLN_rqpnums; /*relevant rqp num arrs, to connect to svr and client*/
46
static uint32_t *CLN_rqpnumtmpbuf=NULL; /*temporary buf used during connection setup*/
48
* datastrucure for infinihost NIC
51
uint16_t *lid_arr; /*we need to exchange lids, arr for that*/
52
struct ibv_context *handle; /*device context/handle*/
54
struct ibv_device_attr attr; /*device properties*/
55
struct ibv_port_attr hca_port; /*mostly for getting lid*/
57
struct ibv_pd *ptag; /*protection tag*/
59
struct ibv_cq *scq; /*send completion queue*/
60
struct ibv_cq *rcq; /*recv completion queue*/
61
struct ibv_comp_channel *sch; /*send completion channel*/
62
struct ibv_comp_channel *rch; /*recv completion channel*/
63
void *scq_cntx; /*send context for completion queue*/
64
void *rcq_cntx; /*recv context for completion queue*/
65
int scv; /*send completion vector*/
66
int rcv; /*recv completion vector*/
70
armci_vapi_memhndl_t *prem_handle; /*address server to store memory handle*/
71
armci_vapi_memhndl_t handle;
74
armci_vapi_memhndl_t *CLN_handle;
75
armci_vapi_memhndl_t serv_memhandle, client_memhandle;
76
armci_vapi_memhndl_t *handle_array;
77
armci_vapi_memhndl_t *pinned_handle;
79
static vapi_nic_t nic_arr[3];
80
static vapi_nic_t *SRV_nic= nic_arr;
81
static vapi_nic_t *CLN_nic= nic_arr+1;
82
static int armci_server_terminating;
85
static int armci_ack_proc=NONE;
87
static int armci_vapi_server_ready;
88
static int armci_vapi_server_stage1=0;
89
static int armci_vapi_client_stage1=0;
90
static int armci_vapi_server_stage2=0;
91
static int armci_vapi_client_ready;
93
static int server_can_poll=0;
94
static int armci_vapi_max_inline_size=-1;
95
#define CLIENT_STAMP 101
98
static char * client_tail;
99
static char * serv_tail;
100
static ack_t *SRV_ack;
102
#if defined(PEND_BUFS)
103
typedef immbuf_t vapibuf_t;
104
typedef pendbuf_t vapibuf_pend_t;
107
struct ibv_recv_wr dscr;
108
struct ibv_sge sg_entry;
114
struct ibv_send_wr snd_dscr;
115
struct ibv_sge ssg_entry;
116
struct ibv_recv_wr rcv_dscr;
117
struct ibv_sge rsg_entry;
122
struct ibv_send_wr rmw_dscr;
123
struct ibv_sge rmw_entry;
127
static vapibuf_t **serv_buf_arr;
128
#if !defined(PEND_BUFS)
129
/*These are typically used as spare buffers for communication. Since
130
we do not wait on completion anymore, we need to ensure things work
131
fine when these have in-flight messages. Disabled for now.*/
132
static vapibuf_t *spare_serv_buf, *spare_serv_bufptr;
133
static vapibuf_ext_t *serv_buf;
136
static vapirmw_t rmw[64];
138
static int *flag_arr; /* flag indicates its receiving scatter data */
146
struct ibv_recv_wr *descr;
149
static int* _gtmparr;
150
static void* test_ptr;
151
static int test_stride_arr[1];
152
static int test_count[2];
153
static int test_stride_levels;
154
char *MessageRcvBuffer;
156
extern void armci_util_wait_int(volatile int *,int,int);
157
void armci_send_data_to_client(int proc, void *buf,int bytes,void *dbuf);
158
void armci_server_register_region(void *,long,ARMCI_MEMHDL_T *);
159
static descr_pool_t serv_descr_pool = {MAX_DESCR,NULL,NULL};
160
static descr_pool_t client_descr_pool = {MAX_DESCR,NULL,NULL};
162
/**Buffer (long[1] used to set msginfo->tag.ack_ptr in
163
client-side. See usage in SERVER_SEND_ACK macro*/
164
static long *ack_buf;
166
#define GET_DATA_PTR(buf) (sizeof(request_header_t) + (char*)buf)
168
#define BUF_TO_SDESCR(buf) ((struct ibv_send_wr *)(&((armci_vapi_field_t *)((char *)(buf) - sizeof(armci_vapi_field_t)))->sdscr))
170
#define BUF_TO_RDESCR(buf) ((struct ibv_recv_wr *)(&((armci_vapi_field_t *)((char *)(buf) - sizeof(armci_vapi_field_t)))->rdscr))
172
#define BUF_TO_SSGLST(buf) ((struct ibv_sge *)(&((armci_vapi_field_t *)((char *)(buf) - sizeof(armci_vapi_field_t)))->ssg_entry))
174
#define BUF_TO_RSGLST(buf) ((struct ibv_sge *)(&((armci_vapi_field_t *)((char *)(buf) - sizeof(armci_vapi_field_t)))->rsg_entry))
176
#define BUF_TO_EVBUF(buf) (vapibuf_ext_t*)(((char*)buf) - (sizeof(struct ibv_send_wr)+sizeof(struct ibv_recv_wr)+2*sizeof(struct ibv_sge)))
178
#define SERVER_SEND_ACK(p) do { \
179
assert(*ack_buf == ARMCI_STAMP); \
181
armci_send_data_to_client((p),ack_buf, \
182
sizeof(long),msginfo->tag.ack_ptr); \
184
/* #define SERVER_SEND_ACK(p) {assert(serv_buf!=NULL);assert(msginfo->from==(p));*((long *)serv_buf->buf)=ARMCI_STAMP;armci_send_data_to_client((p),serv_buf->buf,sizeof(long),msginfo->tag.ack_ptr);} */
186
#define SERVER_SEND_DATA(_SS_proc,_SS_src,_SS_dst,_SS_size) {armci_send_data_to_client(_SS_proc,_SS_src,_SS_size,_SS_dst);}
187
#define SERVER_GET_DATA(_SG_proc,_SG_src,_SG_dst,_SG_size) {armci_get_data_from_client(_SG_proc,_SG_src,_SG_size,_SG_dst);}
190
/*\ descriptors will have unique ID's for the wait on descriptor routine to
191
* complete a descriptor and know where it came from
194
#define NUMOFBUFFERS (MAX_BUFS+MAX_SMALL_BUFS)
195
#define DSCRID_FROMBUFS 1
196
#define DSCRID_FROMBUFS_END (DSCRID_FROMBUFS+NUMOFBUFFERS)
198
#define DSCRID_NBDSCR 10000
199
#define DSCRID_NBDSCR_END (10000+MAX_PENDING)
201
#define DSCRID_SCATGAT 20000
202
#define DSCRID_SCATGAT_END 20000+MAX_PENDING
204
#define DSCRID_RMW 30000
205
#define DSCRID_RMW_END 30000+9999
207
#if defined(PEND_BUFS)
208
#define DSCRID_PENDBUF (40000)
209
#define DSCRID_PENDBUF_END (DSCRID_PENDBUF + 2*PENDING_BUF_NUM+1)
211
#define DSCRID_IMMBUF_RECV (200000)
212
#define DSCRID_IMMBUF_RECV_END (600000)
214
#define DSCRID_IMMBUF_RESP (600000)
215
#define DSCRID_IMMBUF_RESP_END (1000000)
218
extern double MPI_Wtime();
219
static double inittime0=0,inittime1=0,inittime2=0,inittime3=0,inittime4=0;
221
static int mark_buf_send_complete[NUMOFBUFFERS+1];
222
static sr_descr_t armci_vapi_client_nbsdscr_array[MAX_PENDING];
223
static sr_descr_t armci_vapi_client_nbrdscr_array[MAX_PENDING];
224
static sr_descr_t armci_vapi_serv_nbsdscr_array[MAX_PENDING];
225
static sr_descr_t armci_vapi_serv_nbrdscr_array[MAX_PENDING];
227
void armci_server_transport_cleanup();
228
/********************FUNCTIONS TO CHECK OPENIB RETURN STATUS*******************/
229
void armci_check_status(int debug, int rc,char *msg)
231
dassertp(debug,rc==0,("%d: %s, rc=%d\n",armci_me,msg,rc));
232
/* if(debug)printf("%d:%s, rc = %d\n", armci_me,msg, rc); */
233
/* if(rc!=0)armci_die(msg,rc); */
236
void armci_vapi_check_return(int debug, int ret, const char *ss)
240
printf("\n%d:from %s ret=%d str=%s str_sym=%s\n",armci_me,ss,ret,
241
VAPI_strerror(ret),VAPI_strerror_sym(ret));
245
printf("\n%d:from %s ret=%d str=%s str_sym=%s\n",armci_me,ss,ret,
246
VAPI_strerror(ret),VAPI_strerror_sym(ret));
251
void armci_vapi_print_dscr_info(struct ibv_send_wr *sr, struct ibv_recv_wr *rr)
255
printf("\n%d:print_dscr rr id=%ld sg_lst_len=%d",
256
armci_me, rr->wr_id, rr->num_sge);
257
for (i = 0; i < rr->num_sge; i++) {
258
printf("\n\t:sg_entry=%d addr=%p len=%d",
259
i, rr->sg_list[i].addr, rr->sg_list[i].length);
264
printf("\n%d:print_dscr sr id=%d opcode=%d sg_lst_len=%d",
265
armci_me, sr->wr_id, sr->opcode, sr->num_sge);
266
for (i = 0; i < sr->num_sge; i++) {
267
printf("\n\t:sg_entry=%d addr=%p len=%d",
268
i, sr->sg_list[i].addr, sr->sg_list[i].length);
274
/*****************END FUNCTIONS TO CHECK VAPI RETURN STATUS********************/
276
void armci_recv_complete(struct ibv_recv_wr *rcv_dscr, char *from, int numofrecvs) /*needs work*/
279
struct ibv_wc pdscr1;
280
struct ibv_wc *pdscr = &pdscr1;
281
sr_descr_t *rdscr_arr;
286
rdscr_arr = armci_vapi_serv_nbrdscr_array;
288
debug = DEBUG_SERVER;
291
rdscr_arr = armci_vapi_client_nbrdscr_array;
296
printf("\n%d%s:recv_complete called from %s id=%ld\n",armci_me,
297
((SERVER_CONTEXT)?"(s)":" "),from,rcv_dscr->wr_id);fflush(stdout);
299
for(i=0;i<numofrecvs;i++){
302
rc = ibv_poll_cq(nic->rcq, 1, pdscr);
304
dassertp(DBG_POLL|DBG_ALL,rc>=0,
305
("%d: rc=%d id=%d status=%d (%d/%d)\n",
306
armci_me,rc,pdscr->wr_id,pdscr->status,i,numofrecvs));
307
dassert1(1,pdscr->status==IBV_WC_SUCCESS,pdscr->status);
309
if(pdscr->wr_id >= DSCRID_SCATGAT && pdscr->wr_id < DSCRID_SCATGAT_END)
310
printf("\n%d:recv from %s complete id=%d num=%d",armci_me,
311
from,pdscr->wr_id,rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofrecvs);
313
if(pdscr->wr_id >= DSCRID_SCATGAT && pdscr->wr_id < DSCRID_SCATGAT_END){
314
rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofrecvs--;
315
if(rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofrecvs==0)
316
rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].tag=0;
318
else if(pdscr->wr_id == (DSCRID_SCATGAT + MAX_PENDING)){
319
/*this was from a blocking call, do nothing*/
323
armci_die("\nclient should be posting only one kind of recv",armci_me);
326
}while(pdscr->wr_id!=rcv_dscr->wr_id);
332
void armci_vapi_set_mark_buf_send_complete(int id)
334
mark_buf_send_complete[id]=0;
337
void armci_send_complete(struct ibv_send_wr *snd_dscr, char *from,int numoftimes)
340
struct ibv_wc pdscr1;
341
struct ibv_wc *pdscr = &pdscr1;
342
sr_descr_t *sdscr_arr;
346
pdscr1.status = IBV_WC_SUCCESS;
347
/* bzero(&pdscr1, sizeof(pdscr1)); */
348
/* printf("%d: Waiting for send with wr_id=%d to complete\n", armci_me, snd_dscr->wr_id); */
349
/* fflush(stdout); */
352
sdscr_arr = armci_vapi_serv_nbsdscr_array;
354
debug = DEBUG_SERVER;
357
sdscr_arr = armci_vapi_client_nbsdscr_array;
363
printf("\n%d%s:send_complete called from %s id=%ld nt=%d\n",armci_me,
364
((SERVER_CONTEXT)?"(s)":" "),from,snd_dscr->wr_id,numoftimes);
367
for(i=0;i<numoftimes;i++){
370
#if defined(PEND_BUFS)
372
rc = ibv_poll_cq(nic->rcq,1,pdscr);
375
rc = ibv_poll_cq(nic->scq,1, pdscr);
377
dassertp(DBG_POLL|DBG_ALL,rc>=0,
378
("%d:rc=%d status=%d id=%d (%d/%d)",armci_me,
379
rc,pdscr->status,(int)pdscr->wr_id,i,numoftimes));
380
dassert1(1,pdscr->status==IBV_WC_SUCCESS,pdscr->status);
381
/* printf("%d: Obtained completion of wr_id=%d\n", armci_me, pdscr->wr_id); */
382
/* fflush(stdout); */
384
if(debug)printf("%d:completed id %d i=%d\n",armci_me,pdscr->wr_id,i);
385
if(pdscr->wr_id >=DSCRID_SCATGAT && pdscr->wr_id < DSCRID_SCATGAT_END){
386
sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends--;
387
if(sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends==0)
388
sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].tag=0;
390
else if(pdscr->wr_id >=armci_nproc && pdscr->wr_id < 2*armci_nproc){
391
/*its coming from send_data_to_client just return*/
393
#if defined(PEND_BUFS)
394
else if(pdscr->wr_id >= DSCRID_IMMBUF_RESP && pdscr->wr_id>DSCRID_IMMBUF_RESP_END) {
395
/*send from server to client completed*/
398
else armci_die("server send complete got weird id",pdscr->wr_id);
401
if(debug)printf("%d:completed id %d i=%d\n",armci_me,pdscr->wr_id,i);
402
if(pdscr->wr_id >=DSCRID_FROMBUFS && pdscr->wr_id < DSCRID_FROMBUFS_END) {
403
/* printf("%d: marking send buffer %d as complete\n", armci_me, pdscr->wr_id);*/
404
mark_buf_send_complete[pdscr->wr_id]=1;
406
else if(pdscr->wr_id >=DSCRID_NBDSCR && pdscr->wr_id < DSCRID_NBDSCR_END){
407
sdscr_arr[pdscr->wr_id-DSCRID_NBDSCR].numofsends--;
408
if(sdscr_arr[pdscr->wr_id-DSCRID_NBDSCR].numofsends==0)
409
sdscr_arr[pdscr->wr_id-DSCRID_NBDSCR].tag=0;
411
else if(pdscr->wr_id >=DSCRID_SCATGAT && pdscr->wr_id < DSCRID_SCATGAT_END){
412
sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends--;
413
if(sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends==0)
414
sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].tag=0;
416
else if(pdscr->wr_id == (DSCRID_SCATGAT + MAX_PENDING)){
417
/* printf("%d: completed a blocking scatgat descriptor\n", armci_me); */
418
/*this was from a blocking call, do nothing*/
421
else armci_die("client send complete got weird id",pdscr->wr_id);
424
}while(pdscr->wr_id!=snd_dscr->wr_id);
430
void armci_dscrlist_recv_complete(int tag, char* from,sr_descr_t *dscr)
433
sr_descr_t *retdscr,*rdscr_arr;
436
rdscr_arr = armci_vapi_serv_nbrdscr_array;
438
rdscr_arr = armci_vapi_client_nbrdscr_array;
440
for(i=0;i<MAX_PENDING;i++){
441
if(rdscr_arr[i].tag==tag)
445
if(i==MAX_PENDING)return;
446
retdscr = &rdscr_arr[i];
451
nr = retdscr->numofrecvs;
452
armci_recv_complete(&(retdscr->rdescr),"(s)list_send_complete",nr);
456
void armci_dscrlist_send_complete(int tag,char *from, sr_descr_t *dscr)
459
sr_descr_t *retdscr,*sdscr_arr;
462
sdscr_arr = armci_vapi_serv_nbsdscr_array;
464
sdscr_arr = armci_vapi_client_nbsdscr_array;
466
for(i=0;i<MAX_PENDING;i++){
467
if(sdscr_arr[i].tag==tag)
470
if(i==MAX_PENDING)return;
471
retdscr=&sdscr_arr[i];
476
ns = retdscr->numofsends;
478
armci_send_complete(&(retdscr->sdescr),"dscrlist_send_complete",ns);
482
void armci_client_nbcall_complete(sr_descr_t *dscr, int tag, int op)
484
if(tag != dscr->tag)return;
486
THREAD_LOCK(armci_user_threads.net_lock);
490
if(dscr->numofrecvs>0)
491
armci_dscrlist_recv_complete(tag,"armci_client_nbcall_complete recv",
495
if(dscr->numofsends>0)
496
armci_dscrlist_send_complete(tag,"armci_client_nbcall_complete send",
501
if(dscr->numofsends>0)
502
armci_dscrlist_send_complete(tag,"armci_client_nbcall_complete send",
506
THREAD_UNLOCK(armci_user_threads.net_lock);
510
static int cur_serv_pend_descr;
511
static int cur_client_pend_descr;
513
sr_descr_t *armci_vapi_get_next_rdescr(int nbtag,int sg)
515
static int serverthreadavail=-1; /*client thread can't touch this*/
516
static int clientthreadavail=-1; /*server thread can't touch this*/
518
sr_descr_t *retdscr,*rdscr_arr;
521
rdscr_arr = armci_vapi_serv_nbrdscr_array;
522
avail = serverthreadavail;
523
/*printf("\n%d:serv thread avail=%d",armci_me,serverthreadavail);*/
526
rdscr_arr = armci_vapi_client_nbrdscr_array;
527
avail = clientthreadavail;
531
for(i=0;i<MAX_PENDING;i++){
533
bzero(&rdscr_arr[i].rdescr,sizeof(struct ibv_recv_wr));
535
rdscr_arr[i].rdescr.wr_id = DSCRID_SCATGAT + i;
537
rdscr_arr[i].rdescr.wr_id = DSCRID_NBDSCR + i;
542
if(rdscr_arr[avail].tag!=0){
543
armci_dscrlist_recv_complete(rdscr_arr[avail].tag,
544
"armci_vapi_get_next_rdescr",&rdscr_arr[avail]);
547
rdscr_arr[avail].tag=nbtag;
548
rdscr_arr[avail].issg=sg;
549
retdscr= (rdscr_arr+avail);
551
memset(&retdscr->rdescr,0,sizeof(struct ibv_recv_wr));
554
retdscr->rdescr.wr_id = DSCRID_SCATGAT + avail;
556
retdscr->rdescr.wr_id = DSCRID_NBDSCR + avail;
557
retdscr->numofrecvs=1;
560
newavail = (avail+1)%MAX_PENDING;
563
cur_serv_pend_descr = avail;
564
serverthreadavail=newavail;
567
cur_client_pend_descr = avail;
568
clientthreadavail=newavail;
575
sr_descr_t *armci_vapi_get_next_sdescr(int nbtag,int sg)
577
static int serverthreadavail=-1; /*client thread can't touch this*/
578
static int clientthreadavail=-1; /*server thread can't touch this*/
580
sr_descr_t *retdscr,*sdscr_arr;
583
sdscr_arr = armci_vapi_serv_nbsdscr_array;
584
avail = serverthreadavail;
587
sdscr_arr = armci_vapi_client_nbsdscr_array;
588
avail = clientthreadavail;
591
if(avail==-1){ /*first call*/
593
for(i=0;i<MAX_PENDING;i++){
595
bzero(&sdscr_arr[i].sdescr,sizeof(struct ibv_send_wr));
597
sdscr_arr[i].sdescr.wr_id = DSCRID_SCATGAT+i;
599
sdscr_arr[i].sdescr.wr_id = DSCRID_NBDSCR + i;
604
if(sdscr_arr[avail].tag!=0){
605
armci_dscrlist_send_complete(sdscr_arr[avail].tag,
606
"armci_vapi_get_next_sdescr",&sdscr_arr[avail]);
609
sdscr_arr[avail].tag=nbtag;
610
sdscr_arr[avail].issg=sg;
611
retdscr= (sdscr_arr+avail);
613
memset(&retdscr->sdescr,0,sizeof(struct ibv_recv_wr));
616
retdscr->sdescr.wr_id = DSCRID_SCATGAT + avail;
618
retdscr->sdescr.wr_id = DSCRID_NBDSCR + avail;
619
retdscr->numofsends=1;
622
newavail = (avail+1)%MAX_PENDING;
625
cur_serv_pend_descr = avail;
626
serverthreadavail=newavail;
629
cur_client_pend_descr = avail;
630
clientthreadavail=newavail;
633
printf("\n%d:avail=%d newavail=%d cln=%d serv=%d",armci_me,avail,
634
newavail,clientthreadavail,serverthreadavail);
639
void armci_wait_for_server()
641
armci_server_terminating = 1;
645
/* ibv_create_qp does not use separate structure to return properties,
646
seems it is all inside ibv_qp */
647
static void armci_create_qp(vapi_nic_t *nic, struct ibv_qp **qp)
649
struct ibv_qp_init_attr initattr;
651
bzero(&initattr, sizeof(struct ibv_qp_init_attr));
655
initattr.cap.max_send_wr = armci_max_qp_ous_swr;
656
initattr.cap.max_recv_wr = armci_max_qp_ous_rwr;
657
initattr.cap.max_recv_sge = armci_max_num_sg_ent;
658
initattr.cap.max_send_sge = armci_max_num_sg_ent;
659
#if defined(PEND_BUFS)
661
initattr.send_cq = nic->rcq;
662
initattr.recv_cq = nic->rcq;
667
initattr.send_cq = nic->scq;
668
initattr.recv_cq = nic->rcq;
670
initattr.qp_type = IBV_QPT_RC;
672
*qp = ibv_create_qp(nic->ptag, &initattr);
673
dassert(1,*qp!=NULL);
675
if(!armci_vapi_max_inline_size){
676
armci_vapi_max_inline_size = initattr.cap.max_inline_data;
682
void armci_openib_env_init()
686
if ((value = getenv("ARMCI_OPENIB_SL")) != NULL){
687
armci_openib_sl = atoi(value);
690
/* AV: default based on Chinook */
694
/* Similarly other constants can be changes to runtime */
697
static void armci_init_nic(vapi_nic_t *nic, int scq_entries, int
701
struct ibv_device **devs=NULL;
702
struct ibv_context *cxt;
704
if (nic == SRV_nic) {
705
/* Initialize OpenIB runtime variables only once*/
706
armci_openib_env_init();
709
bzero(nic,sizeof(vapi_nic_t));
710
nic->lid_arr = (uint16_t *)calloc(armci_nproc,sizeof(uint16_t));
711
dassert(1,nic->lid_arr!=NULL);
713
devs = ibv_get_device_list(&ndevs);
715
nic->handle = ibv_open_device(*devs);
717
nic->maxtransfersize = MAX_RDMA_SIZE;
719
nic->vendor = ibv_get_device_name(*devs);
721
rc = ibv_query_device(nic->handle, &nic->attr);
723
int down_port_count_check = 0;
724
for (i = 1; i <= 2; i++) {
725
rc = ibv_query_port(nic->handle, (uint8_t)i, &nic->hca_port);
726
if (IBV_PORT_ACTIVE == nic->hca_port.state) {
727
nic->active_port = i;
731
down_port_count_check++;
735
/* Assert that the number of inactive ports is not equal to the number
736
* of down ports on any adapter */
737
assert(down_port_count_check != 2);
739
/*save the lid for doing a global exchange later */
740
nic->lid_arr[armci_me] = nic->hca_port.lid;
742
/*allocate tag (protection domain) */
743
nic->ptag = ibv_alloc_pd(nic->handle);
745
/* properties of scq and rcq required for the cq number, this also needs
746
* to be globally exchanged
750
nic->scq = nic->rcq = NULL;
753
nic->sch = ibv_create_comp_channel(nic->handle);
754
nic->scq = ibv_create_cq(nic->handle, 16000,
755
nic->scq_cntx,nic->sch, 0);
759
nic->rch = ibv_create_comp_channel(nic->handle);
760
nic->rcq = ibv_create_cq(nic->handle, 32768,
761
nic->rcq_cntx,nic->rch, 0);
764
ibv_free_device_list(devs);
766
armci_max_num_sg_ent = 29;
767
armci_max_qp_ous_swr = 100;
768
armci_max_qp_ous_rwr = 50;
770
if(armci_max_qp_ous_rwr + armci_max_qp_ous_swr>nic->attr.max_qp_wr){
771
armci_max_qp_ous_swr = nic->attr.max_qp_wr/16;
772
armci_max_qp_ous_rwr = nic->attr.max_qp_wr - armci_max_qp_ous_swr;
774
if(armci_max_num_sg_ent >= nic->attr.max_sge){
775
armci_max_num_sg_ent = nic->attr.max_sge - 1;
780
/****************MEMORY ALLOCATION REGISTRATION DEREGISTRATION****************/
781
static char * serv_malloc_buf_base;
782
/* #ifdef ARMCI_ENABLE_GPC_CALLS */
783
/* extern gpc_buf_t *gpc_req; */
785
void armci_server_alloc_bufs()
788
int mod, bytes, total, extra =sizeof(struct ibv_recv_wr)*MAX_DESCR+SIXTYFOUR;
789
int mhsize = armci_nproc*sizeof(armci_vapi_memhndl_t); /* ack */
792
#if defined(PEND_BUFS)
793
int clients = (IMM_BUF_NUM+1)*armci_nproc;
795
int clients = armci_nproc;
798
/* allocate memory for the recv buffers-must be alligned on 64byte bnd */
799
/* note we add extra one to repost it for the client we are received req */
800
bytes = (clients+1)*sizeof(vapibuf_t)+sizeof(vapibuf_ext_t) + extra+ mhsize
801
/* #ifdef ARMCI_ENABLE_GPC_CALLS */
802
/* + MAX_GPC_REQ * sizeof(gpc_buf_t) */
804
#if defined(PEND_BUFS)
805
+ (clients+1)*IMM_BUF_LEN
806
+ PENDING_BUF_NUM*(sizeof(vapibuf_pend_t)+PENDING_BUF_LEN)
810
total = bytes + SIXTYFOUR;
812
total = total - (total%4096) + 4096;
813
tmp0=tmp = malloc(total);
814
serv_malloc_buf_base = tmp0;
816
dassert1(1,tmp!=NULL,(int)total);
817
/* stamp the last byte */
818
serv_tail= tmp + bytes+SIXTYFOUR-1;
819
*serv_tail=SERV_STAMP;
820
/* allocate memory for client memory handle to support put response
821
* in dynamic memory registration protocols */
822
CLN_handle = (armci_vapi_memhndl_t*)tmp;
823
memset(CLN_handle,0,mhsize); /* set it to zero */
826
/* #ifdef ARMCI_ENABLE_GPC_CALLS */
827
/* /\* gpc_req memory*\/ */
828
/* tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR); */
829
/* gpc_req = (gpc_buf_t *)tmp; */
830
/* tmp += MAX_GPC_REQ * sizeof(gpc_buf_t); */
833
/* setup descriptor memory */
834
tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
835
serv_descr_pool.descr= (struct ibv_recv_wr *)(tmp);
838
/* setup ack buffer*/
839
tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
840
ack_buf = (long *)(tmp);
841
*ack_buf=ARMCI_STAMP;
844
/* setup buffer pointers */
845
tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
846
serv_buf_arr = (vapibuf_t **)malloc(sizeof(vapibuf_t*)*clients);
847
for(i=0;i<clients;i++){
848
serv_buf_arr[i] = (vapibuf_t*)(tmp) + i;
850
tmp = (char *)(serv_buf_arr[0]+clients);
852
#if defined(PEND_BUFS)
853
/*setup buffers in immediate buffers*/
854
tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
855
for(i=0; i<clients; i++) {
856
serv_buf_arr[i]->buf = tmp + i*IMM_BUF_LEN;
858
tmp += clients*IMM_BUF_LEN;
860
/*setup pending buffers*/
861
tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
862
serv_pendbuf_arr = (vapibuf_pend_t *)(tmp);
863
tmp=(char *)(serv_pendbuf_arr+PENDING_BUF_NUM);
864
tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
865
for(i=0; i<PENDING_BUF_NUM; i++) {
866
serv_pendbuf_arr[i].buf = tmp+i*PENDING_BUF_LEN;
867
assert(serv_pendbuf_arr[i].buf != NULL);
869
tmp += PENDING_BUF_NUM*PENDING_BUF_LEN;
870
MessageRcvBuffer = NULL;
872
tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
873
spare_serv_buf = (vapibuf_t *)tmp; /* spare buffer is at the end */
874
spare_serv_bufptr = spare_serv_buf; /* save the pointer for later */
875
serv_buf =(vapibuf_ext_t*)(spare_serv_buf+1);
876
tmp = (char *)(serv_buf+1);
878
MessageRcvBuffer = serv_buf->buf;
881
flag_arr = (int *)malloc(sizeof(int)*armci_nproc);
882
for (i =0; i<armci_nproc; i++) flag_arr[i] = 9999;
885
printf("\n%d(s):registering mem %p %dbytes ptag=%ld handle=%d\n",
886
armci_me, tmp0,total,CLN_nic->ptag,CLN_nic->handle);fflush(stdout);
889
serv_memhandle.memhndl = ibv_reg_mr(CLN_nic->ptag, tmp0, total,
890
IBV_ACCESS_LOCAL_WRITE |
891
IBV_ACCESS_REMOTE_WRITE |
892
IBV_ACCESS_REMOTE_READ);
893
dassert1(1,serv_memhandle.memhndl!=NULL,total);
894
serv_memhandle.lkey=serv_memhandle.memhndl->lkey;
895
serv_memhandle.rkey=serv_memhandle.memhndl->rkey;
897
/* exchange address of ack/memhandle flag on servers */
899
printf("%d(s):registered mem %p %dbytes mhandle=%d mharr starts%p\n",
900
armci_me, tmp0, total, serv_memhandle.memhndl,CLN_handle);
905
static char * client_malloc_buf_base;
906
char * armci_vapi_client_mem_alloc(int size)
910
int extra = MAX_DESCR*sizeof(struct ibv_recv_wr)+SIXTYFOUR;
913
/*we use the size passed by the armci_init_bufs routine instead of bytes*/
915
total = size + extra + 2*SIXTYFOUR;
918
total = total - (total%4096) + 4096;
919
tmp0 = tmp = malloc(total);
920
dassert1(1,tmp!=NULL,total);
921
client_malloc_buf_base = tmp;
923
/*SK: could this lead to a problem at ibv_reg_mr() because of unfixed 'total'?*/
924
if(ALIGN64ADD(tmp0))tmp0+=ALIGN64ADD(tmp0);
926
/* stamp the last byte */
927
client_tail= tmp + extra+ size +2*SIXTYFOUR-1;
928
*client_tail=CLIENT_STAMP;
930
/* we also have a place to store memhandle for zero-copy get */
931
pinned_handle =(armci_vapi_memhndl_t *) (tmp + extra+ size +SIXTYFOUR-16);
933
mod = ((ssize_t)tmp)%SIXTYFOUR;
934
client_descr_pool.descr= (struct ibv_recv_wr*)(tmp+SIXTYFOUR-mod);
937
client_memhandle.memhndl = ibv_reg_mr(SRV_nic->ptag, tmp0, total,
938
IBV_ACCESS_LOCAL_WRITE |
939
IBV_ACCESS_REMOTE_WRITE |
940
IBV_ACCESS_REMOTE_READ);
941
dassert(1,client_memhandle.memhndl!=NULL);
943
client_memhandle.lkey = client_memhandle.memhndl->lkey;
944
client_memhandle.rkey = client_memhandle.memhndl->rkey;
945
handle_array[armci_me].lkey = client_memhandle.lkey;
946
handle_array[armci_me].rkey = client_memhandle.rkey;
948
handle_array[armci_me].memhndl = client_memhandle.memhndl;
951
printf("%d: registered client memory %p %dsize tmp=%p \n",
952
armci_me,tmp0, total, tmp);
955
/*now that we have the handle array, we get every body elses RDMA handle*/
956
total = (sizeof(armci_vapi_memhndl_t)*armci_nproc)/sizeof(int);
957
armci_msg_gop_scope(SCOPE_ALL,handle_array,total,"+",ARMCI_INT);
963
void armci_server_register_region(void *ptr,long bytes, ARMCI_MEMHDL_T *memhdl)
965
bzero(memhdl,sizeof(ARMCI_MEMHDL_T));
967
memhdl->memhndl = ibv_reg_mr(CLN_nic->ptag, ptr, bytes,
968
IBV_ACCESS_LOCAL_WRITE |
969
IBV_ACCESS_REMOTE_WRITE |
970
IBV_ACCESS_REMOTE_READ);
971
dassert(1,memhdl->memhndl!=NULL);
973
memhdl->lkey=memhdl->memhndl->lkey;
974
memhdl->rkey=memhdl->memhndl->rkey;
977
printf("\n%d(s):registered lkey=%d rkey=%d ptr=%p end=%p %p\n",armci_me,
978
memhdl->lkey,memhdl->rkey,ptr,(char *)ptr+bytes,memhdl);
983
int armci_pin_contig_hndl(void *ptr, size_t bytes, ARMCI_MEMHDL_T *memhdl)
985
memhdl->memhndl = ibv_reg_mr(SRV_nic->ptag, ptr, bytes,
986
IBV_ACCESS_LOCAL_WRITE |
987
IBV_ACCESS_REMOTE_WRITE |
988
IBV_ACCESS_REMOTE_READ);
989
dassert(1,memhdl->memhndl!=NULL);
990
memhdl->lkey=memhdl->memhndl->lkey;
991
memhdl->rkey=memhdl->memhndl->rkey;
993
printf("\n%d:registered lkey=%d rkey=%d ptr=%p end=%p\n",armci_me,
994
memhdl->lkey,memhdl->rkey,ptr,(char *)ptr+bytes);fflush(stdout);
1000
void armci_network_client_deregister_memory(ARMCI_MEMHDL_T *mh)
1003
rc = ibv_dereg_mr(mh->memhndl);
1004
dassert1(1,rc==0,rc);
1005
armci_vapi_check_return(DEBUG_FINALIZE,rc,
1006
"armci_network_client_deregister_memory:deregister_mr");
1008
void armci_network_server_deregister_memory(ARMCI_MEMHDL_T *mh)
1011
return; /* ??? why ??? */
1012
printf("\n%d:deregister ptr=%p",armci_me,mh);fflush(stdout);
1013
rc = ibv_dereg_mr(mh->memhndl);
1014
dassert1(1,rc==0,rc);
1015
armci_vapi_check_return(DEBUG_FINALIZE,rc,
1016
"armci_network_server_deregister_memory:deregister_mr");
1019
# define armci_network_client_deregister_memory(mh) \
1020
armci_vapi_check_return(DEBUG_FINALIZE, \
1021
ibv_dereg_mr(mh->memhndl), \
1022
"armci_network_client_deregister_memory:deregister_mr")
1023
# define armci_network_server_deregister_memory(mh) \
1024
armci_vapi_check_return(DEBUG_FINALIZE, \
1025
ibv_dereg_mr(mh->memhndl), \
1026
"armci_network_server_deregister_memory:deregister_mr")
1029
void armci_set_serv_mh()
1031
int s, ratio = sizeof(ack_t)/sizeof(int);
1032
/* first collect addrresses on all masters */
1033
if(armci_me == armci_master){
1034
SRV_ack[armci_clus_me].prem_handle=CLN_handle;
1035
SRV_ack[armci_clus_me].handle =serv_memhandle;
1036
armci_msg_gop_scope(SCOPE_MASTERS,SRV_ack,ratio*armci_nclus,"+",
1039
/* next master broadcasts the addresses within its node */
1040
armci_msg_bcast_scope(SCOPE_NODE,SRV_ack,armci_nclus*sizeof(ack_t),
1043
/* Finally save address corresponding to my id on each server */
1044
for(s=0; s< armci_nclus; s++){
1045
SRV_ack[s].prem_handle += armci_me;
1049
/**********END MEMORY ALLOCATION REGISTRATION AND DEREGISTRATION**************/
1052
* init_connections, client_connect_to_servers -- client code
1053
* server_initial_connection, all_data_server -- server code
1055
void armci_init_connections()
1061
if(TIME_INIT)inittime0 = MPI_Wtime();
1063
#if defined(PEND_BUFS)
1064
armci_pbuf_init_buffer_env();
1066
/* initialize nic connection for qp numbers and lid's */
1067
armci_init_nic(SRV_nic,1,1);
1068
for(c=0; c<NUMOFBUFFERS+1; c++) {
1069
mark_buf_send_complete[c]=1;
1071
_gtmparr = (int *)calloc(armci_nproc,sizeof(int));
1073
/*qp_numbers and lids need to be exchanged globally*/
1074
tmparr = (int *)calloc(armci_nproc,sizeof(int));
1075
tmparr[armci_me] = SRV_nic->lid_arr[armci_me];
1077
armci_msg_gop_scope(SCOPE_ALL,tmparr,sz,"+",ARMCI_INT);
1078
for(c=0;c<armci_nproc;c++){
1079
SRV_nic->lid_arr[c]=tmparr[c];
1082
/*SRV_con is for client to connect to servers */
1083
SRV_con=(armci_connect_t *)malloc(sizeof(armci_connect_t)*armci_nclus);
1084
dassert1(1,SRV_con!=NULL,sizeof(armci_connect_t)*armci_nclus);
1085
bzero(SRV_con,sizeof(armci_connect_t)*armci_nclus);
1087
CLN_con=(armci_connect_t*)malloc(sizeof(armci_connect_t)*armci_nproc);
1088
dassert1(1,CLN_con!=NULL,sizeof(armci_connect_t)*armci_nproc);
1089
bzero(CLN_con,sizeof(armci_connect_t)*armci_nproc);
1091
/*every client creates a qp with every server other than the one on itself*/
1092
SRV_rqpnums = (uint32_t*)malloc(sizeof(uint32_t)*armci_nproc);
1093
dassert(1,SRV_rqpnums);
1094
tmpbuf = (uint32_t*)calloc(armci_nproc,sizeof(uint32_t));
1097
sz = armci_nproc*(sizeof(uint32_t)/sizeof(int));
1098
armci_vapi_max_inline_size = 0;
1099
for(s = 0; s < armci_nclus; s++){
1100
armci_connect_t *con = SRV_con + s;
1101
armci_create_qp(SRV_nic,&con->qp);
1102
con->sqpnum = con->qp->qp_num;
1103
tmpbuf[armci_clus_info[s].master] = con->qp->qp_num;
1104
con->lid = SRV_nic->lid_arr[s];
1106
MPI_Alltoall(tmpbuf,sizeof(uint32_t),MPI_CHAR,SRV_rqpnums,
1107
sizeof(uint32_t),MPI_CHAR,MPI_COMM_WORLD);
1109
if(armci_me != armci_master) {
1115
SRV_ack = (ack_t*)calloc(armci_nclus,sizeof(ack_t));
1116
dassert1(1,SRV_ack!=NULL,armci_nclus*sizeof(ack_t));
1118
handle_array = (armci_vapi_memhndl_t *)calloc(sizeof(armci_vapi_memhndl_t),
1120
dassert1(1,handle_array!=NULL,sizeof(armci_vapi_memhndl_t)*armci_nproc);
1123
static void vapi_connect_client()
1125
int i, start, sz=0, c, rc;
1126
struct ibv_qp_attr qp_attr;
1127
struct ibv_qp_cap qp_cap;
1128
enum ibv_qp_attr_mask qp_attr_mask;
1130
if (TIME_INIT) inittime0 = MPI_Wtime();
1131
if (armci_me == armci_master)
1132
armci_util_wait_int(&armci_vapi_server_stage1, 1, 10);
1133
if (TIME_INIT) printf("\n%d:wait for server to get to stage 1 time for "
1134
"vapi_connect_client is %f",
1135
armci_me, (inittime1 = MPI_Wtime()) - inittime0);
1137
if (armci_me == armci_master) {
1138
armci_msg_gop_scope(SCOPE_MASTERS, _gtmparr, sz, "+", ARMCI_INT);
1139
for (c=0; c<armci_nproc; c++) {
1140
CLN_nic->lid_arr[c] = _gtmparr[c];
1144
printf("\n%d(svc): mylid = %d",armci_me,CLN_nic->lid_arr[armci_me]);
1149
armci_vapi_client_stage1 = 1;
1151
/* allocate and initialize connection structs */
1152
sz = armci_nproc*sizeof(uint32_t)/sizeof(int);
1154
if (armci_me == armci_master)
1155
armci_util_wait_int(&armci_vapi_server_stage2, 1, 10);
1157
for (c = 0; c < armci_nproc; c++){
1158
armci_connect_t *con = CLN_con + c;
1159
if (armci_me != armci_master) {
1162
ptrr = malloc(8 + sizeof(uint32_t) * armci_nproc);
1163
extra = ALIGNLONGADD(ptrr);
1164
ptrr = ptrr + extra;
1165
con->rqpnum = (uint32_t *)ptrr;
1166
bzero(con->rqpnum, sizeof(uint32_t) * armci_nproc);
1168
armci_msg_gop_scope(SCOPE_ALL, con->rqpnum, sz, "+", ARMCI_INT);
1171
CLN_rqpnums = (uint32_t*)malloc(sizeof(uint32_t)*armci_nproc);
1172
if(armci_me != armci_master) {
1174
CLN_rqpnumtmpbuf = (uint32_t*)malloc(sizeof(uint32_t)*armci_nproc);
1176
dassert(1, CLN_rqpnumtmpbuf);
1177
MPI_Alltoall(CLN_rqpnumtmpbuf, sizeof(uint32_t), MPI_CHAR,
1178
CLN_rqpnums, sizeof(uint32_t), MPI_CHAR, MPI_COMM_WORLD);
1179
free(CLN_rqpnumtmpbuf);
1180
CLN_rqpnumtmpbuf=NULL;
1183
if (TIME_INIT) printf("\n%d:wait for server tog et to stage 2 time for "
1184
"vapi_connect_client is %f",
1185
armci_me, (inittime2 = MPI_Wtime()) - inittime1);
1186
/*armci_set_serv_mh();*/
1189
printf("%d:all connections ready\n", armci_me);
1194
memset(&qp_attr, 0, sizeof qp_attr);
1195
/* Modifying QP to INIT */
1196
qp_attr_mask = IBV_QP_STATE
1199
| IBV_QP_ACCESS_FLAGS;
1201
qp_attr.qp_state = IBV_QPS_INIT;
1202
qp_attr.pkey_index = DEFAULT_PKEY_IX;
1203
qp_attr.port_num = SRV_nic->active_port;
1204
qp_attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
1206
/* start from from server on my_node -1 */
1207
start = (armci_clus_me == 0) ? armci_nclus - 1 : armci_clus_me - 1;
1208
for (i = 0; i < armci_nclus; i++) {
1209
armci_connect_t *con;
1211
rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
1212
dassertp(1,!rc,("%d: client RST->INIT i=%d rc=%d\n",armci_me,i,rc));
1215
if (TIME_INIT) printf("\n%d:to init time for vapi_connect_client is %f",
1216
armci_me, (inittime1 = MPI_Wtime()) - inittime2);
1217
qp_attr_mask = IBV_QP_STATE
1218
| IBV_QP_MAX_DEST_RD_ATOMIC
1221
| IBV_QP_MIN_RNR_TIMER;
1222
memset(&qp_attr, 0, sizeof qp_attr);
1224
qp_attr.qp_state = IBV_QPS_RTR;
1225
qp_attr.max_dest_rd_atomic = 4;
1226
qp_attr.path_mtu = IBV_MTU_1024;
1228
qp_attr.min_rnr_timer = RNR_TIMER;
1230
/* AV: Adding the service level parameter */
1231
qp_attr.ah_attr.sl = armci_openib_sl;
1233
start = (armci_clus_me == 0) ? armci_nclus - 1 : armci_clus_me - 1;
1234
for (i = 0; i < armci_nclus; i++) {
1235
armci_connect_t *con;
1236
armci_connect_t *conS;
1239
conS = CLN_con + armci_me;
1241
qp_attr_mask |= IBV_QP_AV | IBV_QP_DEST_QPN;
1243
qp_attr.dest_qp_num = conS->rqpnum[armci_clus_info[i].master];
1245
qp_attr.dest_qp_num = CLN_rqpnums[armci_clus_info[i].master];
1247
qp_attr.ah_attr.dlid = SRV_nic->lid_arr[armci_clus_info[i].master];
1248
qp_attr.ah_attr.port_num = SRV_nic->active_port;
1250
qp_attr.ah_attr.sl = armci_openib_sl;
1252
rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
1253
dassertp(1,!rc,("%d: INIT->RTR client i=%d rc=%d\n",armci_me,i,rc));
1256
/*to to to RTS, other side must be in RTR*/
1257
armci_msg_barrier();
1258
if (TIME_INIT) printf("\n%d:init to rtr time for vapi_connect_client is %f",
1259
armci_me, (inittime2 = MPI_Wtime()) - inittime1);
1260
armci_vapi_client_ready=1;
1262
qp_attr_mask = IBV_QP_STATE
1267
| IBV_QP_MAX_QP_RD_ATOMIC;
1269
memset(&qp_attr, 0, sizeof qp_attr);
1271
qp_attr.qp_state = IBV_QPS_RTS;
1273
qp_attr.timeout = 18;
1274
qp_attr.retry_cnt = 20;
1275
qp_attr.rnr_retry = 7;
1276
qp_attr.max_rd_atomic = 4;
1278
start = (armci_clus_me == 0) ? armci_nclus - 1 : armci_clus_me - 1;
1279
for (i = 0; i < armci_nclus; i++){
1280
armci_connect_t *con;
1282
rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
1283
dassertp(1,!rc,("%d: client RTR->RTS i=%d rc=%d\n",armci_me,i,rc));
1285
if (TIME_INIT) printf("\n%d:rtr to rts time for vapi_connect_client is %f",
1286
armci_me, (inittime1 = MPI_Wtime()) - inittime2);
1292
void armci_client_connect_to_servers()
1294
extern void armci_util_wait_int(volatile int *,int,int);
1295
if (TIME_INIT) inittime0 = MPI_Wtime();
1298
vapi_connect_client();
1299
if (armci_me == armci_master)
1300
armci_util_wait_int(&armci_vapi_server_ready,1,10);
1301
armci_msg_barrier();
1302
if (DEBUG_CLN && armci_me == armci_master) {
1303
printf("\n%d:server_ready=%d\n",armci_me,armci_vapi_server_ready);
1306
if (TIME_INIT) printf("\n%d:time for client_connect_to_s is %f",
1307
armci_me,MPI_Wtime()-inittime0);
1311
void armci_init_vapibuf_recv(struct ibv_recv_wr *rd, struct ibv_sge *sg_entry,
1312
char *buf, int len, armci_vapi_memhndl_t *mhandle)
1314
memset(rd,0,sizeof(struct ibv_recv_wr));
1317
rd->sg_list = sg_entry;
1320
sg_entry->lkey = mhandle->lkey;
1321
sg_entry->addr = (uint64_t)buf;
1322
sg_entry->length = len;
1326
void armci_init_vapibuf_send(struct ibv_send_wr *sd, struct ibv_sge *sg_entry,
1327
char *buf, int len, armci_vapi_memhndl_t *mhandle)
1329
sd->opcode = IBV_WR_SEND;
1331
sd->send_flags = IBV_SEND_SIGNALED;
1333
sd->sg_list = sg_entry;
1335
sg_entry->lkey = mhandle->lkey;
1336
sg_entry->addr = (uint64_t)buf;
1337
sg_entry->length = len;
1341
static void armci_init_vbuf_srdma(struct ibv_send_wr *sd, struct ibv_sge *sg_entry,
1342
char *lbuf, char *rbuf, int len,
1343
armci_vapi_memhndl_t *lhandle,
1344
armci_vapi_memhndl_t *rhandle)
1346
/* NOTE: sd->wr is a union, sr->wr.ud might conflict with sr->wr.rdma */
1347
sd->opcode = IBV_WR_RDMA_WRITE;
1348
sd->send_flags = IBV_SEND_SIGNALED;
1351
sd->sg_list = sg_entry;
1352
if (rhandle) sd->wr.rdma.rkey = rhandle->rkey;
1353
sd->wr.rdma.remote_addr = (uint64_t)rbuf;
1355
if (lhandle) sg_entry->lkey = lhandle->lkey;
1356
sg_entry->addr = (uint64_t)lbuf;
1357
sg_entry->length = len;
1361
static void armci_init_vbuf_rrdma(struct ibv_send_wr *sd, struct ibv_sge
1362
*sg_entry, char *lbuf, char *rbuf, int len, armci_vapi_memhndl_t
1363
*lhandle, armci_vapi_memhndl_t *rhandle)
1365
sd->opcode = IBV_WR_RDMA_READ;
1367
sd->send_flags = IBV_SEND_SIGNALED;
1369
sd->sg_list = sg_entry;
1370
sd->wr.ud.remote_qkey = 0;
1371
if (rhandle) sd->wr.rdma.rkey = rhandle->rkey;
1372
sd->wr.rdma.remote_addr = (uint64_t)rbuf;
1374
if (lhandle) sg_entry->lkey = lhandle->lkey;
1375
sg_entry->addr = (uint64_t)lbuf;
1376
sg_entry->length = len;
1377
/* sd->wr is a union, sr->wr.ud might conflict with sr->wr.rdma */
1381
void armci_server_initial_connection()
1384
struct ibv_qp_attr qp_attr;
1385
struct ibv_qp_init_attr qp_init_attr;
1386
struct ibv_qp_cap qp_cap;
1387
enum ibv_qp_attr_mask qp_attr_mask;
1389
struct ibv_recv_wr *bad_wr;
1392
inittime0 = MPI_Wtime();
1395
printf("in server after fork %d (%d)\n",armci_me,getpid());
1399
#if defined(PEND_BUFS) && !defined(SERVER_THREAD)
1400
armci_pbuf_init_buffer_env();
1402
armci_init_nic(CLN_nic,1,1);
1404
_gtmparr[armci_me] = CLN_nic->lid_arr[armci_me];
1405
armci_vapi_server_stage1 = 1;
1406
armci_util_wait_int(&armci_vapi_client_stage1, 1, 10);
1408
CLN_rqpnumtmpbuf = (uint32_t*)malloc(sizeof(uint32_t)*armci_nproc);
1409
dassert(1, CLN_rqpnumtmpbuf);
1410
for (c = 0; c < armci_nproc; c++) {
1413
armci_connect_t *con = CLN_con + c;
1414
armci_create_qp(CLN_nic, &con->qp);
1415
con->sqpnum = con->qp->qp_num;
1416
con->lid = CLN_nic->lid_arr[c];
1417
CLN_rqpnumtmpbuf[c] = con->qp->qp_num;
1420
armci_vapi_server_stage2 = 1;
1422
qp_attr_mask = IBV_QP_STATE
1425
| IBV_QP_ACCESS_FLAGS;
1427
memset(&qp_attr, 0, sizeof qp_attr);
1428
qp_attr.qp_state = IBV_QPS_INIT;
1429
qp_attr.pkey_index = DEFAULT_PKEY_IX;
1430
qp_attr.port_num = CLN_nic->active_port;
1431
qp_attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE |
1432
IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
1434
for (c = 0; c < armci_nproc; c++) {
1435
armci_connect_t *con = CLN_con + c;
1436
rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
1437
dassertp(1,!rc,("%d: RTS->INIT server c=%d rc=%d\n",armci_me,c,rc));
1440
memset(&qp_attr, 0, sizeof qp_attr);
1441
qp_attr_mask = IBV_QP_STATE
1442
| IBV_QP_MAX_DEST_RD_ATOMIC
1445
| IBV_QP_MIN_RNR_TIMER;
1446
qp_attr.qp_state = IBV_QPS_RTR;
1447
qp_attr.path_mtu = IBV_MTU_1024;
1448
qp_attr.max_dest_rd_atomic = 4;
1449
qp_attr.min_rnr_timer = RNR_TIMER;
1452
for(c = 0; c < armci_nproc; c++) {
1453
armci_connect_t *con = CLN_con + c;
1454
qp_attr_mask |= IBV_QP_DEST_QPN | IBV_QP_AV;
1455
qp_attr.dest_qp_num = SRV_rqpnums[c];
1456
qp_attr.ah_attr.dlid = SRV_nic->lid_arr[c];
1457
qp_attr.ah_attr.port_num = CLN_nic->active_port;
1459
rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
1460
dassertp(1,!rc,("%d: INIT->RTR server cln=%d rc=%d\n",armci_me,c,rc));
1463
armci_util_wait_int(&armci_vapi_client_ready,1,10);
1464
memset(&qp_attr, 0, sizeof qp_attr);
1466
qp_attr_mask = IBV_QP_STATE
1471
| IBV_QP_MAX_QP_RD_ATOMIC;
1473
qp_attr.qp_state = IBV_QPS_RTS;
1475
qp_attr.timeout = 18;
1476
qp_attr.retry_cnt = 20;
1477
qp_attr.rnr_retry = 7;
1478
qp_attr.max_rd_atomic = 4;
1480
for (c = 0; c < armci_nproc; c++) {
1481
armci_connect_t *con = CLN_con + c;
1482
rc = ibv_modify_qp(con->qp, &qp_attr,qp_attr_mask);
1483
dassertp(1,!rc,("%d: server RTR->RTS cln=%d rc=%d\n",armci_me,c,rc));
1489
armci_server_alloc_bufs();
1491
/* setup descriptors and post nonblocking receives */
1492
#if defined(PEND_BUFS)
1493
assert(armci_nproc*(IMM_BUF_NUM+1)<DSCRID_IMMBUF_RECV_END-DSCRID_IMMBUF_RECV);
1494
for(i = 0; i < armci_nproc; i++) {
1495
for(j=0; j<IMM_BUF_NUM+1; j++) {
1497
vbuf = serv_buf_arr[i*(IMM_BUF_NUM+1)+j];
1498
armci_init_vapibuf_recv(&vbuf->dscr, &vbuf->sg_entry, vbuf->buf,
1499
IMM_BUF_LEN, &serv_memhandle);
1500
/* we use index of the buffer to identify the buffer, this index is
1501
* returned with a call to ibv_poll_cq inside the ibv_wr */
1502
vbuf->dscr.wr_id = i*(IMM_BUF_NUM+1)+j + DSCRID_IMMBUF_RECV;
1504
printf("\n%d(s):posted rr with lkey=%d",armci_me,vbuf->sg_entry.lkey);
1507
rc = ibv_post_recv((CLN_con+i)->qp, &vbuf->dscr, &bad_wr);
1508
dassert1(1,rc==0,rc);
1512
for(i = 0; i < armci_nproc; i++) {
1514
vbuf = serv_buf_arr[i];
1515
armci_init_vapibuf_recv(&vbuf->dscr, &vbuf->sg_entry, vbuf->buf,
1516
VBUF_DLEN, &serv_memhandle);
1517
/* we use index of the buffer to identify the buffer, this index is
1518
* returned with a call to ibv_poll_cq inside the ibv_wr */
1519
vbuf->dscr.wr_id = i+armci_nproc;
1521
printf("\n%d(s):posted rr with lkey=%d",armci_me,vbuf->sg_entry.lkey);
1524
rc = ibv_post_recv((CLN_con+i)->qp, &vbuf->dscr, &bad_wr);
1525
dassert1(1,rc==0,rc);
1529
if (TIME_INIT) printf("\n%d:post time for server_initial_conn is %f",
1530
armci_me, MPI_Wtime() - inittime4);
1532
armci_vapi_server_ready=1;
1533
/* check if we can poll in the server thread */
1534
enval = getenv("ARMCI_SERVER_CAN_POLL");
1536
if((enval[0] != 'N') && (enval[0]!='n')) server_can_poll=1;
1538
if(armci_clus_info[armci_clus_me].nslave < armci_getnumcpus())
1541
/* server_can_poll=0; */
1544
printf("%d: server connected to all clients\n",armci_me); fflush(stdout);
1546
if (TIME_INIT) printf("\n%d:time for server_initial_conn is %f",
1547
armci_me, MPI_Wtime() - inittime0);
1550
static void armci_finalize_nic(vapi_nic_t *nic)
1554
ret = ibv_destroy_cq(nic->scq);
1555
dassert1(1,ret==0,ret);
1556
armci_vapi_check_return(DEBUG_FINALIZE,ret,"armci_finalize_nic:destroy_scq");
1558
ret = ibv_destroy_comp_channel(nic->sch);
1559
dassert1(1,ret==0,ret);
1560
armci_vapi_check_return(DEBUG_FINALIZE,ret,"armci_finalize_nic:destroy_sch");
1562
ret = ibv_destroy_cq(nic->rcq);
1563
dassert1(1,ret==0,ret);
1564
armci_vapi_check_return(DEBUG_FINALIZE,ret,"armci_finalize_nic:destroy_rcq");
1566
ret = ibv_destroy_comp_channel(nic->rch);
1567
dassert1(1,ret==0,ret);
1568
armci_vapi_check_return(DEBUG_FINALIZE,ret,"armci_finalize_nic:destroy_rch");
1570
ret = ibv_close_device(nic->handle);
1571
dassert1(1,ret==0,ret);
1573
armci_vapi_check_return(DEBUG_FINALIZE,ret,"armci_finalize_nic:release_hca");
1578
void armci_server_transport_cleanup()
1583
/*first we have empty send/recv queues TBD*/
1584
if(serv_malloc_buf_base){
1585
rc = ibv_dereg_mr(serv_memhandle.memhndl);
1586
dassert1(1,rc==0,rc);
1587
armci_vapi_check_return(DEBUG_FINALIZE,rc,
1588
"armci_server_transport_cleanup:deregister_mr");
1590
free(serv_malloc_buf_base);
1592
/*now deregister all my regions from regionskk.c*/
1593
armci_server_region_destroy();
1595
for (s = 0; s < armci_nproc; s++) {
1596
armci_connect_t *con = CLN_con + s;
1598
rc = ibv_destroy_qp(con->qp);
1599
armci_vapi_check_return(DEBUG_FINALIZE,rc,
1600
"armci_server_transport_cleanup:destroy_qp");
1608
armci_finalize_nic(CLN_nic);
1611
void armci_transport_cleanup()
1616
/*first deregister buffers memory */
1617
if (client_malloc_buf_base) {
1618
rc = ibv_dereg_mr(client_memhandle.memhndl);
1619
dassert1(1,rc==0,rc);
1620
armci_vapi_check_return(DEBUG_FINALIZE,rc,"armci_client_transport_cleanup:deregister_mr");
1622
free(client_malloc_buf_base);
1624
/*now deregister all my regions from regions.c*/
1625
armci_region_destroy();
1627
for (s = 0; s < armci_nclus; s++) {
1628
armci_connect_t *con = SRV_con + s;
1630
rc = ibv_destroy_qp(con->qp);
1631
dassert1(1,rc==0,rc);
1632
armci_vapi_check_return(DEBUG_FINALIZE,rc,"armci_client_transport_cleanup:destroy_qp");
1640
armci_finalize_nic(SRV_nic);
1643
/** Post an immediate buffer back for the client to send.
1645
static void _armci_pendbuf_post_immbuf(vapibuf_t *vbuf, int to) {
1647
struct ibv_recv_wr *bad_wr;
1648
#if defined(PEND_BUFS)
1649
assert(vbuf->dscr.wr_id == vbuf-serv_buf_arr[0]+DSCRID_IMMBUF_RECV);
1651
rc = ibv_post_recv((CLN_con+to)->qp, &(vbuf->dscr), &bad_wr);
1652
dassert1(1,rc==0,rc);
1655
#if defined(PEND_BUFS)
1656
#define DSCRID_TO_IMMBUFID(x) (x-DSCRID_IMMBUF_RECV)
1658
#define DSCRID_TO_IMMBUFID(x) ((x)-armci_nproc)
1661
#if defined(PEND_BUFS)
1663
/**Obtain a message receive buffer to receive a message. Used in place
1664
* of MessageRcvBuffer. Should not be used.
1666
char *armci_openib_get_msg_rcv_buf(int proc)
1668
armci_die("PEND_BUFS in OPENIB: MessageRcvBuffer not available. Should use the in-place buffers to receive data", proc);
1672
/** Check that the data is in a server allocated buffer. This is
1673
* guaranteed to be pinned. Ideally, this should always be true. Any
1674
* operation that request alternative support will have to fix this
1675
* function and possibly @armci_openib_get_msg_rcv_buf().
1676
* @param br IN Buffer pointer being checked
1677
* @return 1 if it is a server-allocated buffer. 0 otherwise.
1679
int armci_data_in_serv_buf(void *br)
1681
if(br>=(void *)serv_malloc_buf_base && br<(void *)serv_tail)
1684
printf("%d:: serv_bufs=%p<->%p. br=%p out of range\n",
1685
armci_me, serv_malloc_buf_base, serv_tail, br);
1691
#define PBUF_BUFID_TO_PUT_WRID(_pbufid) (DSCRID_PENDBUF+(_pbufid)*2)
1692
#define PBUF_BUFID_TO_GET_WRID(_pbufid) (DSCRID_PENDBUF+(_pbufid)*2+1)
1693
#define PBUF_WRID_TO_PBUFID(_id) (((_id)-DSCRID_PENDBUF)/2)
1694
#define PBUF_IS_GET_WRID(_id) (((_id)-DSCRID_PENDBUF)&1)
1695
#define PBUF_IS_PUT_WRID(_id) (!(((_id)-DSCRID_PENDBUF)&1))
1697
/**Complete processing this immediate buffer. Parameters is void *,
1698
* since vapibuf_t*|immbuf_t* is not available in armci-vapi.h
1700
void armci_complete_immbuf(void *buf) {
1701
vapibuf_t *vbuf = (vapibuf_t*)buf;
1702
request_header_t *msginfo=(request_header_t*)vbuf->buf;
1705
vbuf->send_pending = 0;
1707
_armci_pendbuf_post_immbuf(vbuf,msginfo->from);
1709
armci_data_server(vbuf);
1710
if(msginfo->operation==PUT || ARMCI_ACC(msginfo->operation)) {
1711
SERVER_SEND_ACK(msginfo->from);
1714
if(!vbuf->send_pending) {
1715
_armci_pendbuf_post_immbuf(vbuf,msginfo->from);
1720
/**Complete processing this pending buffer. Parameters is void *,
1721
* since vapibuf_t*|immbuf_t* is not available in armci-vapi.h. Note
1722
* that the pending buffer may not yet be available for reuse. This
1723
* will depend on the state of the pending buffer (which might have to
1724
* wait for a communication innitiated by armci_data_server() to
1727
void armci_complete_pendbuf(void *buf) {
1728
vapibuf_pend_t *pbuf = (vapibuf_pend_t *)buf;
1729
request_header_t *msginfo=(request_header_t*)pbuf->buf;
1733
pbuf->vbuf->send_pending=0;
1735
_armci_pendbuf_post_immbuf(pbuf->vbuf,msginfo->from);
1737
armci_data_server(pbuf);
1738
if(msginfo->operation==PUT || ARMCI_ACC(msginfo->operation)) {
1739
SERVER_SEND_ACK(msginfo->from);
1743
assert(!pbuf->vbuf->send_pending);
1744
_armci_pendbuf_post_immbuf(pbuf->vbuf,msginfo->from);
1748
void _armci_get_data_from_client(int proc, struct ibv_send_wr *sdscr,
1749
int dscrid, struct ibv_sge *ssg_entry,
1750
void *rbuf, void *lbuf, int bytes) ;
1751
void _armci_send_data_to_client_pbuf(int proc, struct ibv_send_wr *sdscr,
1752
int dscrid, struct ibv_sge *ssg_entry,
1753
void *rbuf, void *lbuf, int bytes);
1755
int no_srv_copy_nsegs_ulimit() {
1756
return armci_max_qp_ous_swr*armci_max_num_sg_ent/10;
1759
/** Initiate a get operation to progress a pending buffer.
1760
* @param msginfo Request header for any additional processing
1761
* @param src Pointer to src of data (remote for GET)
1762
* @param dst Pointer to dst
1763
* @param bytes #bytes to transfer
1764
* @param proc proc to transfer from(for get)/to(for put)
1765
* @param pbufid Index of pending buffer
1767
void armci_pbuf_start_get(void *msg_info, void *src, void *dst,
1768
int bytes, int proc, int pbufid) {
1769
struct ibv_send_wr sdscr;
1770
struct ibv_sge sg_entry;
1771
int wrid = PBUF_BUFID_TO_GET_WRID(pbufid);
1772
request_header_t *msginfo=(request_header_t *)msg_info;
1773
void armci_server_rdma_contig_to_strided(char *src_ptr, int proc,
1775
int dst_stride_arr[],
1778
request_header_t *msginfo);
1781
#if defined(PUT_NO_SRV_COPY)
1782
if(msginfo->operation==PUT && msginfo->format==STRIDED
1783
&& !msginfo->pinned && src==msginfo->tag.data_ptr) {
1784
char *loc_ptr, *rem_ptr;
1785
int stride_levels, *count;
1786
int *loc_stride_arr;
1787
char *dscr = (char *)(msginfo+1);
1788
ARMCI_MEMHDL_T *mhloc=NULL;
1791
/* unpack descriptor record */
1792
loc_ptr = *(void**)dscr; dscr += sizeof(void*);
1793
stride_levels = *(int*)dscr; dscr += sizeof(int);
1794
loc_stride_arr = (int*)dscr; dscr += stride_levels*sizeof(int);
1797
rem_ptr = msginfo->tag.data_ptr;
1800
for(i=0; i<stride_levels; i++)
1801
nsegs *= count[i+1];
1803
dassert(1,proc==msginfo->from);
1804
if(nsegs<no_srv_copy_nsegs_ulimit() &&
1805
get_armci_region_local_hndl(loc_ptr,armci_clus_id(armci_me),&mhloc)) {
1806
/* printf("%d(s): direct rdma from client buffers to server-side memory\n",armci_me); */
1807
/* fflush(stdout); */
1809
armci_server_rdma_contig_to_strided(rem_ptr, proc,
1810
loc_ptr,loc_stride_arr,
1811
count, stride_levels,
1817
/* printf("%d(s): rdma from client buffers to pending buffers\n",armci_me); */
1818
/* fflush(stdout); */
1819
_armci_get_data_from_client(proc,&sdscr,wrid,&sg_entry,src,dst,bytes);
1822
/** Initiate a put operation to progress a pending buffer.
1823
* @param src Pointer to src of data (local for PUT)
1824
* @param dst Pointer to dst
1825
* @param bytes #bytes to transfer
1826
* @param proc proc to transfer from(for get)/to(for put)
1827
* @param pbufid Index of pending buffer
1829
void armci_pbuf_start_put(void *src, void *dst, int bytes, int proc,
1831
struct ibv_send_wr sdscr;
1832
struct ibv_sge sg_entry;
1833
int wrid = PBUF_BUFID_TO_PUT_WRID(pbufid);
1835
_armci_send_data_to_client_pbuf(proc,&sdscr,wrid,&sg_entry,src,dst,bytes);
1839
* function to get data from remote client called by data
1840
* server. Note that this is only called for pending buffers.
1841
* @param proc IN the id of remote client
1842
* @param sdscr IN/OUT Descriptor to be used to post the get
1843
* @param dscrid IN ID to be used for the descriptor
1844
* @param ssg_entry IN Scatter/gather list
1845
* @param rbuf IN the remote buffer to get from
1846
* @param lbuf IN local buf to get the data into, this is the queue buffer for SERVER_QUEUE path
1847
* @param bytes IN the size of get
1849
* @see armci_send_data_to_client
1851
/*static*/ void _armci_get_data_from_client(int proc, struct ibv_send_wr *sdscr,
1852
int dscrid, struct ibv_sge *ssg_entry,
1853
void *rbuf, void *lbuf, int bytes)
1858
printf("\n%d(s):sending data to client %d at %p flag = %p bytes=%d\n",
1860
proc,lbuf,(char *)lbuf+bytes-sizeof(int),bytes);fflush(stdout);
1863
memset(sdscr,0,sizeof(struct ibv_send_wr));
1864
armci_init_vbuf_rrdma(sdscr,ssg_entry,lbuf,rbuf,bytes,
1865
&serv_memhandle,(handle_array+proc));
1868
printf("\n%d(s):handle_array[%d]=%p lbuf=%p flag=%p bytes=%d\n",armci_me,
1869
proc,&handle_array[proc],(char *)lbuf,
1870
(char *)lbuf+bytes-sizeof(int),bytes);
1874
assert(sizeof(request_header_t)+bytes<PENDING_BUF_LEN);
1876
sdscr->wr_id = dscrid;
1877
struct ibv_send_wr *bad_wr;
1878
rc = ibv_post_send((CLN_con+proc)->qp, sdscr, &bad_wr);
1879
dassert1(1,rc==0,rc);
1882
void _armci_send_data_to_client_pbuf(int proc, struct ibv_send_wr *sdscr,
1883
int dscrid, struct ibv_sge *ssg_entry,
1884
void *rbuf, void *lbuf, int bytes) {
1888
printf("\n%d(s):sending data to client %d at %p flag = %p bytes=%d\n",
1890
proc,rbuf,(char *)rbuf+bytes-sizeof(int),bytes);fflush(stdout);
1892
memset(sdscr,0,sizeof(struct ibv_send_wr));
1893
armci_init_vbuf_srdma(sdscr,ssg_entry,lbuf,rbuf,bytes,
1894
&serv_memhandle,(handle_array+proc));
1896
printf("\n%d(s):handle_array[%d]=%p dbuf=%p flag=%p bytes=%d\n",armci_me,
1897
proc,&handle_array[proc],(char *)rbuf,
1898
(char *)rbuf+bytes-sizeof(int),bytes);
1901
sdscr->wr_id = dscrid;
1902
struct ibv_send_wr *bad_wr;
1903
rc = ibv_post_send((CLN_con+proc)->qp, sdscr, &bad_wr);
1904
dassert1(1,rc==0,rc);
1908
#define DATA_SERVER_YIELD_CPU
1909
void armci_call_data_server()
1913
vapibuf_t *vbuf,*vbufs;
1914
request_header_t *msginfo,*msg;
1915
int c,i,need_ack,pollcount;
1917
static int doineednotify=1;
1918
int rrr,serverwcount=0;
1920
#ifdef CHANGE_SERVER_AFFINITY
1921
cpu_set_t mycpuid,new_mask;
1922
char str[CPU_SETSIZE];
1924
extern char * cpuset_to_cstr(cpu_set_t *mask, char *str);
1925
int nslave=armci_clus_info[armci_clus_me].nslave;
1926
rrr=sched_getaffinity(0, sizeof(mycpuid), &mycpuid);
1929
/* #ifdef ARMCI_ENABLE_GPC_CALLS */
1930
/* unblock_thread_signal(GPC_COMPLETION_SIGNAL); */
1932
#if defined(PEND_BUFS)
1933
armci_pendbuf_init();
1937
struct ibv_wc *pdscr=NULL;
1938
struct ibv_wc pdscr1;
1940
pdscr->status = IBV_WC_SUCCESS;
1942
#ifdef CHANGE_SERVER_AFFINITY
1945
if(serverwcount==100){
1948
sprintf (cid, "%d", ccc);
1949
rrr = cstr_to_cpuset(&new_mask,cid);
1950
if (sched_setaffinity(0, sizeof (new_mask), &new_mask)) {
1951
perror("sched_setaffinity");
1952
printf("failed to set pid %d's affinity.\n", getpid());
1954
rrr=sched_getaffinity(0, sizeof(mycpuid), &mycpuid);
1955
if(rrr)perror("sched_getaffinity");
1958
#ifdef DATA_SERVER_YIELD_CPU_
1960
if(serverwcount==50){
1961
serverwcount=0;usleep(1);
1966
/* #ifdef ARMCI_ENABLE_GPC_CALLS */
1967
/* block_thread_signal(GPC_COMPLETION_SIGNAL); */
1969
bzero(pdscr, sizeof(*pdscr));
1970
if (server_can_poll) {
1971
rc = ibv_poll_cq(CLN_nic->rcq, 1, pdscr);
1973
rc = ibv_poll_cq(CLN_nic->rcq, 1, pdscr);
1974
if (armci_server_terminating) {
1975
/* server got interrupted when clients terminate connections */
1976
armci_server_transport_cleanup();
1982
rc = ibv_poll_cq(CLN_nic->rcq, 1, pdscr);
1983
if(rc==0){doineednotify=1;/*continue;*/}
1985
rc1 = ibv_req_notify_cq(CLN_nic->rcq, 0);
1986
dassert1(1,rc1==0,rc1);
1987
rc1=ibv_get_cq_event(CLN_nic->rch,&CLN_nic->rcq,&CLN_nic->rcq_cntx);
1988
dassert1(1,rc1==0,rc1);
1990
ibv_ack_cq_events(CLN_nic->rcq, 1);
1991
rc = ibv_poll_cq(CLN_nic->rcq, 1, pdscr);
1994
if (armci_server_terminating) {
1995
/* server got interrupted when clients terminate connections */
1996
armci_server_transport_cleanup();
2003
printf("\n%d:pdscr=%p %p %d %d %d %d\n",armci_me,pdscr,&pdscr1,
2004
pdscr->status,pdscr->opcode,pdscr->vendor_err,
2008
dassertp(1,rc>=0,("%d: rc=%d id=%d status=%d",
2009
armci_me,rc,(int)pdscr->wr_id,pdscr->status));
2010
dassert1(1,pdscr->status==IBV_WC_SUCCESS,pdscr->status);
2013
printf("%d(s) : NEW MESSAGE bytelen %d \n",armci_me,pdscr->byte_len);
2014
printf("%d(s) : NEW MESSAGE id is %ld \n",armci_me,pdscr->wr_id);
2017
#if defined(PEND_BUFS)
2018
if(pdscr->wr_id>=DSCRID_IMMBUF_RESP && pdscr->wr_id<DSCRID_IMMBUF_RESP_END) {
2019
/* fprintf(stderr, "%d(s) : Got server response msg completion\n", armci_me); */
2021
int id = pdscr->wr_id - DSCRID_IMMBUF_RESP;
2022
if(id>=0 && id<armci_nproc*(IMM_BUF_NUM+1)) {
2023
int dest = id/(IMM_BUF_NUM+1);
2024
dassert(1,serv_buf_arr[id]->send_pending==1);
2025
serv_buf_arr[id]->send_pending = 0;
2026
_armci_pendbuf_post_immbuf(serv_buf_arr[id],dest);
2031
if (pdscr->wr_id>=DSCRID_PENDBUF && pdscr->wr_id<DSCRID_PENDBUF_END) {
2032
int pbufid = PBUF_WRID_TO_PBUFID(pdscr->wr_id);
2033
/* printf("%d(s) : Progressing pending msg (something completed) pbufid=%d id=%ld byte_len=%d status=%d\n", armci_me, pbufid,pdscr->wr_id,pdscr->byte_len,done_status); */
2034
/* fflush(stdout); */
2035
if(PBUF_IS_GET_WRID(pdscr->wr_id))
2036
armci_pendbuf_done_get(pbufid);
2037
else if(PBUF_IS_PUT_WRID(pdscr->wr_id))
2038
armci_pendbuf_done_put(pbufid);
2040
armci_die("Pending buffer op completed. But not PUT or GET!",pdscr->wr_id);
2044
if (pdscr->wr_id >= DSCRID_SCATGAT && pdscr->wr_id < DSCRID_SCATGAT_END) {
2045
sr_descr_t *sdscr_arr, *rdscr_arr;
2047
printf("%d(s) : received SCATGAT DATA id = %ld, length = %d\n",
2048
armci_me,pdscr->wr_id, pdscr->byte_len);
2051
#if defined(PEND_BUFS)
2052
sdscr_arr = armci_vapi_serv_nbsdscr_array;
2053
assert(sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends>0);
2054
sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends--;
2055
if(sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends==0)
2056
sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].tag=0;
2059
rdscr_arr = armci_vapi_serv_nbrdscr_array;
2060
rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofrecvs--;
2061
if(rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofrecvs==0)
2062
rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].tag=0;
2067
#if defined(PEND_BUFS)
2068
assert(pdscr->wr_id>=DSCRID_IMMBUF_RECV && pdscr->wr_id<DSCRID_IMMBUF_RECV_END);
2070
vbuf = serv_buf_arr[DSCRID_TO_IMMBUFID(pdscr->wr_id)];
2071
assert(vbuf->dscr.wr_id == pdscr->wr_id);
2073
msginfo = (request_header_t*)vbuf->buf;
2074
armci_ack_proc = c = msginfo->from;
2077
printf("%d(s) : request id is %ld operation is %d, length is %d from=%d vbuf->dscr.wr_id=%d\n",
2078
armci_me,pdscr->wr_id,msginfo->operation,pdscr->byte_len,msginfo->from, (int)vbuf->dscr.wr_id);
2082
#if defined(PEND_BUFS)
2084
armci_init_vapibuf_recv(&vbufs->dscr, &vbufs->sg_entry,vbufs->buf,
2085
IMM_BUF_LEN, &serv_memhandle);
2086
vbufs->dscr.wr_id = pdscr->wr_id;
2088
vbufs = serv_buf_arr[pdscr->wr_id - armci_nproc] = spare_serv_buf;
2089
armci_init_vapibuf_recv(&vbufs->dscr, &vbufs->sg_entry,vbufs->buf,
2090
VBUF_DLEN, &serv_memhandle);
2091
vbufs->dscr.wr_id = c + armci_nproc;
2093
spare_serv_buf = vbuf;
2097
printf("%d(s):Came out of poll id=%ld\n",armci_me,pdscr->wr_id);
2101
if(msginfo->operation == PUT &&msginfo->pinned == 1){
2103
int stride_arr[MAX_STRIDE_LEVEL]; /*should be MAX_STRIDE_LEVELS*/
2104
int count[MAX_STRIDE_LEVEL];
2107
ARMCI_MEMHDL_T *loc_memhandle;
2108
void armci_post_scatter(void *,int *,int *,int, armci_vapi_memhndl_t *,int,int,int,sr_descr_t **);
2110
/*unpack decsriptor_record : should call a function instead */
2112
test_ptr = dest_ptr = *(void**)msg;
2113
msg = (request_header_t *) ((char*)msg + sizeof(void*));
2114
test_stride_levels=stride_levels = *(int*)msg;
2115
msg = (request_header_t *) ((char*)msg + sizeof(int));
2116
for(i =0; i<stride_levels; i++){
2117
test_stride_arr[i] = stride_arr[i] = *(int*)msg;
2118
msg = (request_header_t*) ((int*)msg + 1);
2120
for(i=0; i<stride_levels+1; i++){
2121
test_count[i] = count[i] = *(int*)msg;
2122
msg = (request_header_t*) ((int*)msg + 1);
2126
printf(" server:the dest_ptr is %p\n", dest_ptr);
2127
for(i =0; i<stride_levels; i++)
2128
printf("stride_arr[i] is %d,value of count[i] is %d\n",
2129
stride_arr[i], count[i]);
2130
printf("the value of stride_levels is %d\n", stride_levels);
2134
found =get_armci_region_local_hndl(dest_ptr,armci_me, &loc_memhandle);
2135
dassertp(1,found!=0,("%d:SERVER : local region not found id=%d",
2136
armci_me,pdscr->wr_id));
2139
printf("%d(s) : about to call armci_post_scatter\n",armci_me);
2143
armci_post_scatter(dest_ptr, stride_arr, count, stride_levels,
2144
loc_memhandle,msginfo->from, mytag, SERV,NULL );
2146
mytag = (mytag+1)%(MAX_PENDING);
2147
if(mytag==0)mytag=1;
2150
printf("%d(s) : finished posting %d scatter\n",armci_me,num);
2153
_armci_pendbuf_post_immbuf(vbufs, msginfo->from);
2154
SERVER_SEND_ACK(msginfo->from);
2157
else if(msginfo->operation == REGISTER){
2159
printf("%d(s) : Register_op id is %d, comp_dscr_id is %ld\n",
2160
armci_me,msginfo->operation,pdscr->wr_id);
2164
armci_server_register_region(*((void **)(msginfo+1)),
2165
*((long *)((char *)(msginfo+1)+sizeof(void *))),
2166
(ARMCI_MEMHDL_T *)(msginfo->tag.data_ptr));
2167
_armci_pendbuf_post_immbuf(vbufs, msginfo->from);
2168
*(long *)(msginfo->tag.ack_ptr) = ARMCI_STAMP;
2173
printf("%d(s) : request is %ld about to call armci_data_server\n",
2174
armci_me, pdscr->wr_id);
2177
#if defined(PEND_BUFS)
2178
armci_pendbuf_service_req(vbuf);
2180
_armci_pendbuf_post_immbuf(vbufs, msginfo->from);
2181
armci_data_server(vbuf);
2183
if((msginfo->operation == PUT) || ARMCI_ACC(msginfo->operation)) {
2184
/* for operations that do not send data back we can send ACK now */
2185
SERVER_SEND_ACK(msginfo->from);
2188
printf("%d(s) : posted ack\n\n",armci_me);
2195
printf("%d(s):Done processed request\n\n",armci_me);
2199
/* #ifdef ARMCI_ENABLE_GPC_CALLS */
2200
/* unblock_thread_signal(GPC_COMPLETION_SIGNAL); */
2206
void armci_vapi_complete_buf(armci_vapi_field_t *field,int snd,int rcv,int to,int op) {
2207
struct ibv_send_wr *snd_dscr;
2210
info = (BUF_INFO_T *)((char *)field-sizeof(BUF_INFO_T));
2212
if(info->tag && op==GET)return;
2215
request_header_t *msginfo = (request_header_t *)(field+1);
2216
snd_dscr=&(field->sdscr);
2217
if(mark_buf_send_complete[snd_dscr->wr_id]==0)
2218
armci_send_complete(snd_dscr,"armci_vapi_complete_buf",1);
2225
request_header_t *msginfo = (request_header_t *)(field+1);
2226
flag = (long *)&msginfo->tag.ack;
2229
if(op==PUT || ARMCI_ACC(op)){
2230
if(msginfo->bypass && msginfo->pinned && msginfo->format == STRIDED &&
2233
while(armci_util_long_getval(flag) != ARMCI_STAMP) {
2240
/* printf("%d: client complete_buf. op=%d loop=%d till *flag=ARMCI_STAMP\n", armci_me,op,loop); */
2241
/* fflush(stdout); */
2245
/*SK: I think we get here only for GET with result directly
2246
going to client's pinned memory. (info.tag==0 && op==GET)*/
2247
last = (int *)((char *)msginfo+msginfo->datalen-sizeof(int));
2248
while(armci_util_int_getval(last) == ARMCI_STAMP &&
2249
armci_util_long_getval(flag) != ARMCI_STAMP){
2254
printf("%d: client last(%p)=%d flag(%p)=%ld off=%d\n",
2255
armci_me,last,*last,flag,*flag,msginfo->datalen);
2264
void armci_vapi_test_buf(armci_vapi_field_t *field,int snd,int rcv,int to,int op, int *retval) {
2265
struct ibv_send_wr *snd_dscr;
2268
info = (BUF_INFO_T *)((char *)field-sizeof(BUF_INFO_T));
2272
if(info->tag && op==GET)return;
2275
request_header_t *msginfo = (request_header_t *)(field+1);
2276
snd_dscr=&(field->sdscr);
2277
if(mark_buf_send_complete[snd_dscr->wr_id]==0) {
2278
/* printf("%d: test buf. send not complete\n",armci_me); */
2279
/* fflush(stdout); */
2288
request_header_t *msginfo = (request_header_t *)(field+1);
2289
flag = (long *)&msginfo->tag.ack;
2291
if(op==PUT || ARMCI_ACC(op)){
2292
if(msginfo->bypass && msginfo->pinned && msginfo->format == STRIDED &&
2296
if(armci_util_long_getval(flag) == ARMCI_STAMP) {
2303
/*SK: I think we get here only for GET with result directly
2304
going to client's pinned memory. (info.tag==0 && op==GET)*/
2305
last = (int *)((char *)msginfo+msginfo->datalen-sizeof(int));
2306
if(armci_util_int_getval(last) != ARMCI_STAMP ||
2307
armci_util_long_getval(flag) == ARMCI_STAMP){
2316
static inline void armci_vapi_post_send(int isclient,int con_offset,
2317
struct ibv_send_wr *snd_dscr,char *from)
2321
armci_connect_t *con;
2326
con = CLN_con+con_offset;
2330
con = SRV_con+con_offset;
2334
printf("vapi_post_send: snd_dscr->num_sge=%d, snd_dscr->sg_list->length=%d\n",
2335
snd_dscr->num_sge, snd_dscr->sg_list->length);
2340
/* find the total length of all the segments */
2341
total = snd_dscr->sg_list->length * snd_dscr->num_sge;
2343
printf("%d(c) : total is %d\t, max_size is %d\n",armci_me,total,
2344
armci_vapi_max_inline_size);
2347
struct ibv_send_wr *bad_wr;
2348
if (total > armci_vapi_max_inline_size) {
2349
rc = ibv_post_send(con->qp, snd_dscr, &bad_wr);
2351
rc = ibv_post_send(con->qp, snd_dscr, &bad_wr);
2352
/* no corresponding call, using ibv_post_send
2353
rc = EVAPI_post_inline_sr(nic->handle,con->qp,snd_dscr);*/
2355
dassert1(1,rc==0,rc);
2358
/** Send request to server.
2360
int armci_send_req_msg(int proc, void *buf, int bytes)
2362
int cluster = armci_clus_id(proc), i;
2363
request_header_t *msginfo = (request_header_t *)buf;
2364
struct ibv_send_wr *snd_dscr;
2365
struct ibv_sge *ssg_lst;
2367
THREAD_LOCK(armci_user_threads.net_lock);
2369
snd_dscr = BUF_TO_SDESCR((char *)buf);
2370
ssg_lst = BUF_TO_SSGLST((char *)buf);
2372
/*Stamp end of buffers as needed*/
2373
if(msginfo->operation == GET && !msginfo->pinned) {
2374
const int dscrlen = msginfo->dscrlen;
2375
const int datalen = msginfo->datalen;
2377
if(dscrlen < (datalen - sizeof(int)))
2378
last = (int*)(((char*)(msginfo+1))+(datalen-sizeof(int)));
2380
last = (int*)(((char*)(msginfo+1))+(dscrlen+datalen-sizeof(int)));
2381
*last = ARMCI_STAMP;
2382
#ifdef GET_STRIDED_COPY_PIPELINED
2383
if(msginfo->format == STRIDED) {
2384
const int ssize = GET_STRIDED_COPY_PIPELINED_SIZE/sizeof(int);
2385
int *sfirst = (int*)(dscrlen+(char*)(msginfo+1))+ssize; /*stamping
2387
int *slast = last, *ptr;
2388
for(ptr=sfirst; ptr<slast; ptr+=ssize) {
2394
if(msginfo->operation == ACK) {
2395
*(int *)(msginfo +1) = ARMCI_STAMP+1;
2396
*(((int *)(msginfo +1))+1) = ARMCI_STAMP+1;
2400
#if defined(PEND_BUFS)
2401
if((msginfo->operation==PUT || ARMCI_ACC(msginfo->operation))
2402
&& bytes > IMM_BUF_LEN) {
2403
msginfo->tag.imm_msg=0;
2404
assert(sizeof(request_header_t)<IMM_BUF_LEN); /*sanity check*/
2405
bytes = ARMCI_MIN(bytes-msginfo->datalen, IMM_BUF_LEN);
2406
assert(bytes==IMM_BUF_LEN||(bytes==sizeof(*msginfo)+msginfo->dscrlen));
2408
else if(msginfo->operation==GET
2409
&& !(msginfo->datalen+sizeof(request_header_t)+msginfo->dscrlen<IMM_BUF_LEN)) {
2410
assert(sizeof(request_header_t) < IMM_BUF_LEN);
2411
msginfo->tag.imm_msg=0;
2412
bytes = ARMCI_MIN(sizeof(request_header_t)+msginfo->dscrlen, IMM_BUF_LEN);
2414
#if defined(PUT_NO_SRV_COPY) && 0 /*SK:disabled. Imm msgs are sent inline
2415
for latency reasons*/
2416
else if(msginfo->operation==PUT && !msginfo->pinned && msginfo->format==STRIDED && msginfo->tag.data_len>=2048) {
2417
msginfo->tag.imm_msg = 0;
2418
assert(sizeof(request_header_t)<IMM_BUF_LEN); /*sanity check*/
2419
bytes = ARMCI_MIN(bytes-msginfo->datalen, IMM_BUF_LEN);
2420
assert(bytes==IMM_BUF_LEN||(bytes==sizeof(*msginfo)+msginfo->dscrlen));
2424
msginfo->tag.imm_msg=1;
2426
/* printf("%d: send_req: op=%d bytes=%d data_len=%d imm=%d\n",*/
2427
/* armci_me, msginfo->operation, bytes, msginfo->datalen,msginfo->tag.imm_msg);*/
2428
/* fflush(stdout);*/
2429
if(bytes<0 || bytes>IMM_BUF_LEN) {
2430
printf("%d(pid=%d): Trying to send too large a mesg. op=%d bytes=%d(max=%d) to=%d\n", armci_me, getpid(),msginfo->operation,bytes,IMM_BUF_LEN, proc);
2434
assert(bytes <= IMM_BUF_LEN);
2436
_armci_buf_ensure_pend_outstanding_op_per_node(buf,cluster);
2437
/* printf("%d: send_req. ensured pend os per node. to=%d op=%d\n", armci_me, msginfo->to,msginfo->operation); */
2438
/* fflush(stdout); */
2440
_armci_buf_ensure_one_outstanding_op_per_node(buf,cluster);
2443
if(msginfo->operation == PUT || ARMCI_ACC(msginfo->operation)){
2444
#if defined(PEND_BUFS)
2445
if(!msginfo->tag.imm_msg){
2446
msginfo->tag.data_ptr = (char *)(msginfo+1)+msginfo->dscrlen;
2447
msginfo->tag.data_len = msginfo->datalen;
2450
msginfo->tag.data_ptr = NULL;
2453
msginfo->tag.data_ptr = (void *)&msginfo->tag.ack;
2458
if(msginfo->operation == GET && !msginfo->bypass && msginfo->dscrlen
2459
>= (msginfo->datalen-sizeof(int)))
2460
msginfo->tag.data_ptr = (char *)(msginfo+1)+msginfo->dscrlen;
2462
msginfo->tag.data_ptr = GET_DATA_PTR(buf);
2465
/*this has to be reset so that we can wait on it
2466
see ReadFromDirect*/
2467
msginfo->tag.ack = 0;
2468
msginfo->tag.ack_ptr = &(msginfo->tag.ack);
2471
printf("%d:the ack_ptr is initialised to %p, ack->value is %ld\n",
2472
armci_me,msginfo->tag.ack_ptr,msginfo->tag.ack);fflush(stdout);
2475
armci_init_vapibuf_send(snd_dscr, ssg_lst,buf,
2476
bytes, &client_memhandle);
2478
/* printf("%d: Sending req wr_id=%d to=%d\n",armci_me,snd_dscr->wr_id,proc);*/
2479
/* fflush(stdout);*/
2480
armci_vapi_post_send(1,cluster,snd_dscr,"send_req_msg:post_send");
2482
THREAD_UNLOCK(armci_user_threads.net_lock);
2485
printf("%d:client sent REQ=%d %d bytes serv=%d qp=%ld id =%ld lkey=%d\n",
2486
armci_me,msginfo->operation,bytes,cluster,
2487
(SRV_con+cluster)->qp,snd_dscr->wr_id,ssg_lst->lkey);
2495
* client waits for first phase ack before posting gather desr
2497
void armci_wait_ack(char *buffer)
2500
request_header_t *msginfo = (request_header_t *)(buffer);
2501
flag = (long*)&msginfo->tag.ack;
2503
while(armci_util_long_getval(flag) != ARMCI_STAMP);
2510
void armci_client_direct_send(int p,void *src_buf, void *dst_buf, int len,void** contextptr,int nbtag,ARMCI_MEMHDL_T *lochdl,ARMCI_MEMHDL_T *remhdl)
2512
sr_descr_t *dirdscr;
2513
int clus = armci_clus_id(p);
2515
THREAD_LOCK(armci_user_threads.net_lock);
2517
/*ID for the desr that comes from get_next_descr is already set*/
2518
dirdscr = armci_vapi_get_next_sdescr(nbtag,0);
2519
if(nbtag)*contextptr = dirdscr;
2521
armci_init_vbuf_srdma(&dirdscr->sdescr,dirdscr->sg_entry,src_buf,dst_buf,
2524
armci_vapi_post_send(1,clus,&(dirdscr->sdescr),
2525
"client_direct_send:post_send");
2527
/* the following unlock/lock ensures fairness (in case other threads are waiting
2528
on the lock) not required to work */
2530
THREAD_UNLOCK(armci_user_threads.net_lock);
2531
THREAD_LOCK(armci_user_threads.net_lock);
2535
armci_send_complete(&(dirdscr->sdescr),"armci_client_direct_send",1);
2537
THREAD_UNLOCK(armci_user_threads.net_lock);
2542
void armci_client_direct_get(int p, void *src_buf, void *dst_buf, int len,
2543
void** cptr,int nbtag,ARMCI_MEMHDL_T *lochdl,
2544
ARMCI_MEMHDL_T *remhdl)
2547
sr_descr_t *dirdscr;
2548
int clus = armci_clus_id(p);
2549
struct ibv_send_wr *bad_wr;
2551
THREAD_LOCK(armci_user_threads.net_lock);
2553
/*ID for the desr that comes from get_next_descr is already set*/
2554
dirdscr = armci_vapi_get_next_sdescr(nbtag,0);
2555
if(nbtag)*cptr = dirdscr;
2558
printf("\n%d: in direct get lkey=%d rkey=%d\n",armci_me,lochdl->lkey,
2559
remhdl->rkey);fflush(stdout);
2562
armci_init_vbuf_rrdma(&dirdscr->sdescr,dirdscr->sg_entry,dst_buf,src_buf,
2564
rc = ibv_post_send((SRV_con+clus)->qp, &(dirdscr->sdescr), &bad_wr);
2565
dassert1(1,rc==0,rc);
2567
/* unlock/lock to ensure fairness: allows others thread post before
2568
waiting for completion */
2569
/*VT?check to see if this should be UNLOCK followed by lock*/
2571
THREAD_UNLOCK(armci_user_threads.net_lock);
2572
THREAD_LOCK(armci_user_threads.net_lock);
2576
armci_send_complete(&(dirdscr->sdescr),"armci_client_direct_get",1);
2579
THREAD_UNLOCK(armci_user_threads.net_lock);
2582
#define WQE_LIST_LENGTH 32
2583
#define WQE_LIST_COUNT 1
2585
/** Direct put into remote processor memory. Assumes that (and invoked
2586
* only when) the source buffers in user memory are pinned as well.
2587
* @param operation PUT/GET
2588
* @param src_ptr Source pointer for data
2589
* @param src_stride_arr Strides on the source array
2590
* @param dst_ptr Destination pointer to start writing to
2591
* @param seq_count[stride_levels+1] #els in each stride
2592
* level. seg_count[0] is contiguous bytes
2593
* @param proc Destimation process
2594
* @param cptr OUT Pointer to store the descriptor to wait on for completion
2595
* @param nbtag IN Non-blocking tag (non-blocking op if nbtag!=0)
2596
* @param lochdl IN Local memory handle/key (registered memory stuff)
2597
* @param remhdl IN Remote memory handle/key
2601
void armci_client_direct_rdma_strided(int operation, int proc,
2602
char *src_ptr, int src_stride_arr[],
2603
char *dst_ptr, int dst_stride_arr[],
2606
void **cptr, int nbtag,
2607
ARMCI_MEMHDL_T *lochdl,
2608
ARMCI_MEMHDL_T *remhdl) {
2611
sr_descr_t *dirdscr;
2612
const int clus = armci_clus_id(proc);
2613
struct ibv_send_wr *bad_wr;
2614
struct ibv_send_wr sdscr[WQE_LIST_COUNT][WQE_LIST_LENGTH];
2615
struct ibv_sge sg_entry[WQE_LIST_COUNT][WQE_LIST_LENGTH];
2616
int busy[WQE_LIST_COUNT], wait_count[WQE_LIST_COUNT],clst;
2617
int i, j, c, numposts;
2618
int idx[MAX_STRIDE_LEVEL];
2620
THREAD_LOCK(armci_user_threads.net_lock);
2622
assert(stride_levels >= 0);
2623
assert(stride_levels<=MAX_STRIDE_LEVEL);
2624
/*ID for the desr that comes from get_next_descr is already set*/
2625
dirdscr = armci_vapi_get_next_sdescr(nbtag,0);
2626
if(nbtag)*cptr = dirdscr;
2627
assert(dirdscr->tag == nbtag);
2630
printf("\n%d: in direct rdma strided id=%d lkey=%ld rkey=%ld\n",
2631
armci_me,dirdscr->sdescr.wr_id,lochdl->lkey,remhdl->rkey);fflush(stdout);
2634
for(c=0; c<WQE_LIST_COUNT; c++) {
2637
/*initialize fixed values for descriptors*/
2638
bzero(sdscr, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_send_wr));
2639
bzero(sg_entry, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_sge));
2640
for(j=0; j<WQE_LIST_COUNT; j++) {
2641
for(i=0; i<WQE_LIST_LENGTH; i++) {
2642
if(operation == PUT)
2643
armci_init_vbuf_srdma(&sdscr[j][i],&sg_entry[j][i],NULL,NULL,seg_count[0],lochdl,remhdl);
2644
else if(operation == GET)
2645
armci_init_vbuf_rrdma(&sdscr[j][i],&sg_entry[j][i],NULL,NULL,seg_count[0],lochdl,remhdl);
2647
armci_die("rdma_strided: unsupported operation",operation);
2648
sdscr[j][i].wr_id = dirdscr->sdescr.wr_id;
2649
sdscr[j][i].send_flags = 0; /*non-signalled*/
2650
if(i<WQE_LIST_LENGTH-1)
2651
sdscr[j][i].next = &sdscr[j][i+1];
2654
/*post requests in a loop*/
2656
for(i=1; i<=stride_levels; i++) {
2657
numposts *= seg_count[i];
2659
/* printf("%d: client rdma op=%d numposts=%d\n",armci_me,operation,numposts); */
2661
dirdscr->numofsends=0;
2662
bzero(idx, stride_levels*sizeof(int));
2663
int count = (numposts%WQE_LIST_LENGTH) ? (numposts%WQE_LIST_LENGTH):WQE_LIST_LENGTH;
2664
assert(count == ARMCI_MIN(count, numposts));
2666
for(i=0; i<numposts; ) {
2667
for(j=i; j<i+count; j++) {
2668
int src_offset=0, dst_offset=0;
2669
for(c=0; c<stride_levels; c++) {
2670
src_offset += idx[c]*src_stride_arr[c];
2671
dst_offset += idx[c]*dst_stride_arr[c];
2674
/* armci_client_direct_send(proc,src_ptr+src_offset, */
2675
/* dst_ptr+dst_offset, seg_count[0], */
2676
/* NULL,0,lochdl,remhdl); */
2678
assert(wait_count[clst]>0);
2679
armci_send_complete(&dirdscr->sdescr,"client_direct_rdma_strided",wait_count[clst]);
2680
dirdscr->numofsends -= wait_count[clst];
2685
if(operation == PUT) {
2686
sg_entry[clst][j-i].addr = (uint64_t)(src_ptr + src_offset);
2687
sdscr[clst][j-i].wr.rdma.remote_addr = (uint64_t)(dst_ptr + dst_offset);
2689
else if (operation == GET) {
2690
sg_entry[clst][j-i].addr = (uint64_t)(dst_ptr + dst_offset);
2691
sdscr[clst][j-i].wr.rdma.remote_addr = (uint64_t)(src_ptr + src_offset);
2693
assert(sg_entry[clst][j-i].length == seg_count[0]);
2696
for(c=0;c<stride_levels-1 && idx[c]==seg_count[c+1]; c++) {
2697
idx[c]=0; idx[c+1]++;
2700
sdscr[clst][count-1].next=NULL;
2701
sdscr[clst][count-1].send_flags=IBV_SEND_SIGNALED; /*only the last one*/
2702
for(c=0; c<count-1; c++) {
2703
assert(sdscr[clst][c].next == &sdscr[clst][c+1]);
2705
rc = ibv_post_send(SRV_con[clus].qp, sdscr[clst], &bad_wr);
2706
dassert1(1,rc==0,rc);
2707
dirdscr->numofsends += 1;
2708
wait_count[clst] = 1;
2709
/* armci_send_complete(&dirdscr->sdescr,"armci_client_direct_rdma_strided",count); */
2711
if(count < WQE_LIST_LENGTH) {
2712
sdscr[clst][count-1].next=&sdscr[clst][count]; /*reset it*/
2714
sdscr[clst][count-1].send_flags=0; /*reset it*/
2716
count = ARMCI_MIN(WQE_LIST_LENGTH,numposts-i);
2717
assert(count==0 || count==WQE_LIST_LENGTH);
2718
clst = (clst+1)%WQE_LIST_COUNT;
2722
armci_send_complete(&dirdscr->sdescr,"armci_client_direct_get",dirdscr->numofsends);
2723
dirdscr->numofsends = 0;
2726
THREAD_UNLOCK(armci_user_threads.net_lock);
2729
void armci_client_direct_rdma_strided(int operation, int proc,
2730
char *src_ptr, int src_stride_arr[],
2731
char *dst_ptr, int dst_stride_arr[],
2734
void **cptr, int nbtag,
2735
ARMCI_MEMHDL_T *lochdl,
2736
ARMCI_MEMHDL_T *remhdl) {
2737
int rc, i, j, c, busy[WQE_LIST_COUNT], clst, ctr;
2738
sr_descr_t *dirdscr;
2739
const int clus = armci_clus_id(proc);
2740
struct ibv_send_wr *bad_wr;
2741
struct ibv_send_wr sdscr[WQE_LIST_COUNT][WQE_LIST_LENGTH];
2742
struct ibv_sge sg_entry[WQE_LIST_COUNT][WQE_LIST_LENGTH];
2743
stride_info_t sinfo, dinfo;
2745
THREAD_LOCK(armci_user_threads.net_lock);
2747
assert(stride_levels >= 0);
2748
assert(stride_levels<=MAX_STRIDE_LEVEL);
2749
/*ID for the desr that comes from get_next_descr is already set*/
2750
dirdscr = armci_vapi_get_next_sdescr(nbtag,0);
2751
if(nbtag)*cptr = dirdscr;
2752
assert(dirdscr->tag == nbtag);
2755
printf("\n%d: in direct rdma strided id=%d lkey=%ld rkey=%ld\n",
2756
armci_me,dirdscr->sdescr.wr_id,lochdl->lkey,remhdl->rkey);fflush(stdout);
2759
/*initialize fixed values for descriptors*/
2760
bzero(sdscr, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_send_wr));
2761
bzero(sg_entry, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_sge));
2762
for(j=0; j<WQE_LIST_COUNT; j++) {
2763
for(i=0; i<WQE_LIST_LENGTH; i++) {
2764
if(operation == PUT)
2765
armci_init_vbuf_srdma(&sdscr[j][i],&sg_entry[j][i],NULL,NULL,seg_count[0],lochdl,remhdl);
2766
else if(operation == GET)
2767
armci_init_vbuf_rrdma(&sdscr[j][i],&sg_entry[j][i],NULL,NULL,seg_count[0],lochdl,remhdl);
2769
armci_die("rdma_strided: unsupported operation",operation);
2770
sdscr[j][i].wr_id = dirdscr->sdescr.wr_id;
2771
sdscr[j][i].send_flags = 0; /*non-signalled*/
2772
if(i<WQE_LIST_LENGTH-1)
2773
sdscr[j][i].next = &sdscr[j][i+1];
2777
/*post requests in a loop*/
2778
armci_stride_info_init(&sinfo,src_ptr,stride_levels,src_stride_arr,seg_count);
2779
armci_stride_info_init(&dinfo,dst_ptr,stride_levels,dst_stride_arr,seg_count);
2780
assert(armci_stride_info_size(&sinfo)==armci_stride_info_size(&dinfo));
2782
dirdscr->numofsends=0;
2784
bzero(busy, sizeof(int)*WQE_LIST_COUNT);
2785
while(armci_stride_info_has_more(&sinfo)) {
2786
assert(armci_stride_info_has_more(&dinfo));
2787
uint64_t saddr = (uint64_t)armci_stride_info_seg_ptr(&sinfo);
2788
uint64_t daddr = (uint64_t)armci_stride_info_seg_ptr(&dinfo);
2789
if(operation == PUT) {
2790
sg_entry[clst][ctr].addr = saddr;
2791
sdscr[clst][ctr].wr.rdma.remote_addr = daddr;
2793
else if (operation == GET) {
2794
sg_entry[clst][ctr].addr = daddr;
2795
sdscr[clst][ctr].wr.rdma.remote_addr = saddr;
2797
assert(sg_entry[clst][ctr].length == seg_count[0]);
2800
armci_stride_info_next(&sinfo);
2801
armci_stride_info_next(&dinfo);
2802
if(ctr == WQE_LIST_LENGTH || !armci_stride_info_has_more(&sinfo)) {
2803
sdscr[clst][ctr-1].next=NULL;
2804
sdscr[clst][ctr-1].send_flags=IBV_SEND_SIGNALED; /*only the last one*/
2805
for(c=0; c<ctr-1; c++) {
2806
assert(sdscr[clst][c].next == &sdscr[clst][c+1]);
2808
rc = ibv_post_send(SRV_con[clus].qp, sdscr[clst], &bad_wr);
2809
dassert1(1,rc==0,rc);
2811
dirdscr->numofsends += 1;
2812
if(ctr<WQE_LIST_LENGTH)
2813
sdscr[clst][ctr-1].next = &sdscr[clst][ctr];
2814
sdscr[clst][ctr-1].send_flags = 0;
2817
clst = (clst+1)%WQE_LIST_COUNT;
2819
armci_send_complete(&dirdscr->sdescr,"client_direct_rdma_strided",1);
2820
dirdscr->numofsends -= 1;
2825
armci_stride_info_destroy(&sinfo);
2826
armci_stride_info_destroy(&dinfo);
2829
armci_send_complete(&dirdscr->sdescr,"armci_client_direct_get",dirdscr->numofsends);
2830
dirdscr->numofsends = 0;
2833
THREAD_UNLOCK(armci_user_threads.net_lock);
2837
#if defined(PEND_BUFS)
2838
int armci_server_msginfo_to_pbuf_index(request_header_t *msginfo) {
2840
vapibuf_pend_t *pbuf=NULL;
2842
assert(!msginfo->tag.imm_msg);
2843
for(i = 0; i<PENDING_BUF_NUM; i++) {
2844
if(serv_pendbuf_arr[i].buf == (char *)msginfo) {
2845
pbuf = &serv_pendbuf_arr[i];
2854
/** Routine for server to RDMA strided data to the client-side buffers
2855
* (allocated through buffers.c). This is to be used instead of
2856
* copying the data to immediate or pending buffers when possible.
2859
void armci_server_rdma_strided_to_contig(char *src_ptr, int src_stride_arr[],
2862
char *dst_ptr, int proc,
2863
request_header_t *msginfo) {
2864
int rc, i, j, c, busy[WQE_LIST_COUNT], clst, ctr, wr_id;
2865
sr_descr_t *dirdscr;
2866
struct ibv_send_wr *bad_wr, sdscr1;
2867
struct ibv_send_wr sdscr[WQE_LIST_COUNT][WQE_LIST_LENGTH];
2868
struct ibv_sge sg_entry[WQE_LIST_COUNT][WQE_LIST_LENGTH];
2869
stride_info_t sinfo;
2871
ARMCI_MEMHDL_T *loc_memhdl;
2872
ARMCI_MEMHDL_T *rem_memhdl = &handle_array[proc];
2874
THREAD_LOCK(armci_user_threads.net_lock);
2876
assert(msginfo->operation == GET);
2877
assert(stride_levels >= 0);
2878
assert(stride_levels<=MAX_STRIDE_LEVEL);
2880
if(!get_armci_region_local_hndl(src_ptr,armci_clus_id(armci_me), &loc_memhdl)) {
2881
armci_die("rdma_strided_to_contig: failed to get local handle\n",0);
2884
if(!msginfo->tag.imm_msg) {
2885
int index = armci_server_msginfo_to_pbuf_index(msginfo);
2887
wr_id = PBUF_BUFID_TO_PUT_WRID(index);
2890
wr_id = DSCRID_IMMBUF_RESP_END-1-proc;
2892
bzero(&sdscr1, sizeof(sdscr1));
2893
sdscr1.wr_id = wr_id;
2896
printf("\n%d: in rdma strided to contig id=%d lkey=%ld rkey=%ld\n",
2897
armci_me,wr_id,loc_memhdl->lkey,rem_memhdl->rkey);
2901
/*initialize fixed values for descriptors*/
2902
bzero(sdscr, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_send_wr));
2903
bzero(sg_entry, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_sge));
2904
for(j=0; j<WQE_LIST_COUNT; j++) {
2905
for(i=0; i<WQE_LIST_LENGTH; i++) {
2906
armci_init_vbuf_srdma(&sdscr[j][i],&sg_entry[j][i],NULL,NULL,seg_count[0],loc_memhdl,rem_memhdl);
2907
sdscr[j][i].wr_id = wr_id;
2908
sdscr[j][i].send_flags = 0; /*non-signalled*/
2909
/* sdscr[j][i].send_flags = IBV_SEND_SIGNALED; /\*signalled*\/ */
2910
if(i<WQE_LIST_LENGTH-1)
2911
sdscr[j][i].next = &sdscr[j][i+1];
2915
/*post requests in a loop*/
2916
armci_stride_info_init(&sinfo,src_ptr,stride_levels,src_stride_arr,seg_count);
2919
bzero(busy, sizeof(int)*WQE_LIST_COUNT);
2920
daddr = (uint64_t)dst_ptr;
2921
while(armci_stride_info_has_more(&sinfo)) {
2922
uint64_t saddr = (uint64_t)armci_stride_info_seg_ptr(&sinfo);
2923
sg_entry[clst][ctr].addr = saddr;
2924
sdscr[clst][ctr].wr.rdma.remote_addr = daddr;
2925
assert(sg_entry[clst][ctr].length == seg_count[0]);
2928
daddr += seg_count[0];
2929
armci_stride_info_next(&sinfo);
2930
if(ctr == WQE_LIST_LENGTH || !armci_stride_info_has_more(&sinfo)) {
2931
sdscr[clst][ctr-1].next=NULL;
2932
if(!armci_stride_info_has_more(&sinfo)) {
2933
sdscr[clst][ctr-1].send_flags=IBV_SEND_SIGNALED; /*only the last one*/
2935
for(c=0; c<ctr-1; c++) {
2936
assert(sdscr[clst][c].next == &sdscr[clst][c+1]);
2938
rc = ibv_post_send(CLN_con[proc].qp, sdscr[clst], &bad_wr);
2939
dassert1(1,rc==0,rc);
2942
armci_send_complete(&sdscr1,"serv_rdma_to_contig",ctr);
2945
if(ctr<WQE_LIST_LENGTH)
2946
sdscr[clst][ctr-1].next = &sdscr[clst][ctr];
2947
sdscr[clst][ctr-1].send_flags = 0;
2950
clst = (clst+1)%WQE_LIST_COUNT;
2953
armci_send_complete(&sdscr1,"client_direct_rdma_strided",1);
2959
armci_stride_info_destroy(&sinfo);
2960
assert(proc == msginfo->from);
2961
THREAD_UNLOCK(armci_user_threads.net_lock);
2965
#define MAX_NUM_SGE 64
2967
/*same as above, but uses gather rdma writes*/
2968
void armci_server_rdma_strided_to_contig(char *src_ptr, int src_stride_arr[],
2971
char *dst_ptr, int proc,
2972
request_header_t *msginfo) {
2973
int rc, ctr, wr_id, bytes;
2974
struct ibv_send_wr *bad_wr, sdscr1, sdscr;
2975
struct ibv_sge sg_entry[MAX_NUM_SGE];
2976
stride_info_t sinfo;
2978
ARMCI_MEMHDL_T *loc_memhdl;
2979
ARMCI_MEMHDL_T *rem_memhdl = &handle_array[proc];
2980
const int max_num_sge = ARMCI_MIN(MAX_NUM_SGE, armci_max_num_sg_ent);
2981
int numposts=0, numsegs=0;
2983
THREAD_LOCK(armci_user_threads.net_lock);
2985
assert(msginfo->operation == GET);
2986
assert(stride_levels >= 0);
2987
assert(stride_levels<=MAX_STRIDE_LEVEL);
2989
if(!get_armci_region_local_hndl(src_ptr,armci_clus_id(armci_me), &loc_memhdl)) {
2990
armci_die("rdma_strided_to_contig: failed to get local handle\n",0);
2993
if(!msginfo->tag.imm_msg) {
2994
int index = armci_server_msginfo_to_pbuf_index(msginfo);
2996
wr_id = PBUF_BUFID_TO_PUT_WRID(index);
2999
wr_id = DSCRID_IMMBUF_RESP_END-1-proc;
3001
bzero(&sdscr1, sizeof(sdscr1));
3002
sdscr1.wr_id = wr_id;
3005
printf("\n%d: in rdma strided to contig id=%d lkey=%ld rkey=%ld\n",
3006
armci_me,wr_id,loc_memhdl->lkey,rem_memhdl->rkey);
3010
/*initialize fixed values for descriptors*/
3011
bzero(&sdscr, sizeof(sdscr));
3012
bzero(sg_entry, max_num_sge*sizeof(struct ibv_sge));
3013
armci_init_vbuf_srdma(&sdscr,&sg_entry[0],NULL,NULL,seg_count[0],loc_memhdl,rem_memhdl);
3014
sdscr.send_flags = 0; /*non-signalled*/
3015
sdscr.num_sge = 0; /*set below in the loop*/
3016
sdscr.wr_id = wr_id;
3018
for(ctr=0; ctr<max_num_sge; ctr++) {
3019
sg_entry[ctr].length = seg_count[0];
3020
sg_entry[ctr].lkey = loc_memhdl->lkey;
3023
/*post requests in a loop*/
3024
armci_stride_info_init(&sinfo,src_ptr,stride_levels,src_stride_arr,seg_count);
3026
numposts = numsegs = 0;
3028
daddr = (uint64_t)dst_ptr;
3030
while(armci_stride_info_has_more(&sinfo)) {
3031
sg_entry[ctr].addr = (uint64_t)armci_stride_info_seg_ptr(&sinfo);
3032
assert(sg_entry[ctr].length == seg_count[0]);
3035
bytes += seg_count[0];
3038
armci_stride_info_next(&sinfo);
3039
if(ctr == max_num_sge || !armci_stride_info_has_more(&sinfo)) {
3040
sdscr.wr.rdma.remote_addr = daddr;
3041
if(!armci_stride_info_has_more(&sinfo)) {
3042
sdscr.send_flags=IBV_SEND_SIGNALED; /*only the last one*/
3045
assert(sdscr.send_flags == 0);
3047
assert(ctr == sdscr.num_sge);
3048
rc = ibv_post_send(CLN_con[proc].qp, &sdscr, &bad_wr);
3049
dassert1(1,rc==0,rc);
3058
/* printf("%d(s): scatgat write numposts=%d numsegs=%d\n",armci_me,numposts,numsegs); */
3059
armci_stride_info_destroy(&sinfo);
3060
assert(proc == msginfo->from);
3061
THREAD_UNLOCK(armci_user_threads.net_lock);
3064
/*Directly read data from client buffers into remote memory. Data is
3065
contiguous in client-side. */
3066
void armci_server_rdma_contig_to_strided(char *src_ptr, int proc,
3068
int dst_stride_arr[],
3071
request_header_t *msginfo) {
3072
int rc, ctr, wr_id, bytes;
3073
struct ibv_send_wr *bad_wr, sdscr1, sdscr;
3074
struct ibv_sge sg_entry[MAX_NUM_SGE];
3075
stride_info_t dinfo;
3077
ARMCI_MEMHDL_T *loc_memhdl;
3078
ARMCI_MEMHDL_T *rem_memhdl = &handle_array[proc];
3079
const int max_num_sge = ARMCI_MIN(MAX_NUM_SGE, armci_max_num_sg_ent);
3080
int numposts=0, numsegs=0;
3082
THREAD_LOCK(armci_user_threads.net_lock);
3084
assert(msginfo->operation == PUT);
3085
assert(stride_levels >= 0);
3086
assert(stride_levels<=MAX_STRIDE_LEVEL);
3088
if(!get_armci_region_local_hndl(dst_ptr,armci_clus_id(armci_me), &loc_memhdl)) {
3089
armci_die("rdma_strided_to_contig: failed to get local handle\n",0);
3092
if(!msginfo->tag.imm_msg) {
3093
int index = armci_server_msginfo_to_pbuf_index(msginfo);
3095
wr_id = PBUF_BUFID_TO_GET_WRID(index);
3098
wr_id = DSCRID_IMMBUF_RESP_END-1-proc;
3100
bzero(&sdscr1, sizeof(sdscr1));
3101
sdscr1.wr_id = wr_id;
3104
printf("\n%d: in rdma strided to contig id=%d lkey=%ld rkey=%ld\n",
3105
armci_me,wr_id,loc_memhdl->lkey,rem_memhdl->rkey);
3109
/*initialize fixed values for descriptors*/
3110
bzero(&sdscr, sizeof(sdscr));
3111
bzero(sg_entry, max_num_sge*sizeof(struct ibv_sge));
3112
armci_init_vbuf_rrdma(&sdscr,&sg_entry[0],NULL,NULL,seg_count[0],loc_memhdl,rem_memhdl);
3113
sdscr.send_flags = 0; /*non-signalled*/
3114
sdscr.num_sge = 0; /*set below in the loop*/
3115
sdscr.wr_id = wr_id;
3117
for(ctr=0; ctr<max_num_sge; ctr++) {
3118
sg_entry[ctr].length = seg_count[0];
3119
sg_entry[ctr].lkey = loc_memhdl->lkey;
3122
/*post requests in a loop*/
3123
armci_stride_info_init(&dinfo,dst_ptr,stride_levels,dst_stride_arr,seg_count);
3125
numposts = numsegs = 0;
3127
saddr = (uint64_t)src_ptr;
3129
while(armci_stride_info_has_more(&dinfo)) {
3130
sg_entry[ctr].addr = (uint64_t)armci_stride_info_seg_ptr(&dinfo);
3131
assert(sg_entry[ctr].length == seg_count[0]);
3134
bytes += seg_count[0];
3137
armci_stride_info_next(&dinfo);
3138
if(ctr == max_num_sge || !armci_stride_info_has_more(&dinfo)) {
3139
sdscr.wr.rdma.remote_addr = saddr;
3140
if(!armci_stride_info_has_more(&dinfo)) {
3141
sdscr.send_flags=IBV_SEND_SIGNALED; /*only the last one*/
3144
assert(sdscr.send_flags == 0);
3146
assert(ctr == sdscr.num_sge);
3147
rc = ibv_post_send(CLN_con[proc].qp, &sdscr, &bad_wr);
3148
dassert1(1,rc==0,rc);
3157
/* printf("%d(s): scatgat write numposts=%d numsegs=%d\n",armci_me,numposts,numsegs); */
3158
armci_stride_info_destroy(&dinfo);
3159
assert(proc == msginfo->from);
3160
THREAD_UNLOCK(armci_user_threads.net_lock);
3166
char *armci_ReadFromDirect(int proc, request_header_t *msginfo, int len)
3168
int cluster = armci_clus_id(proc);
3169
vapibuf_ext_t* evbuf=BUF_TO_EVBUF(msginfo);
3170
char *dataptr = GET_DATA_PTR(evbuf->buf);
3171
extern void armci_util_wait_int(volatile int *,int,int);
3173
if(DEBUG_CLN){ printf("%d(c):read direct %d qp=%p\n",armci_me,
3174
len,&(SRV_con+cluster)->qp); fflush(stdout);
3177
if(mark_buf_send_complete[evbuf->snd_dscr.wr_id]==0)
3178
armci_send_complete(&(evbuf->snd_dscr),"armci_ReadFromDirect",1);
3180
if(!msginfo->bypass){
3184
flag = &(msginfo->tag.ack);
3185
if(msginfo->operation==GET){
3186
last = (int *)(dataptr+len-sizeof(int));
3187
if(msginfo->dscrlen >= (len-sizeof(int))){
3188
last = (int *)(dataptr+len+msginfo->dscrlen-sizeof(int));
3189
dataptr+=msginfo->dscrlen;
3193
printf("\n%d:flagval=%d at ptr=%p ack=%ld dist=%d\n",armci_me,*last,
3194
last,*flag,len);fflush(stdout);
3197
while(armci_util_int_getval(last) == ARMCI_STAMP &&
3198
armci_util_long_getval(flag) != ARMCI_STAMP){
3203
printf("%d: client last(%p)=%d flag(%p)=%ld off=%d\n",
3204
armci_me,last,*last,flag,*flag,msginfo->datalen);
3211
else if(msginfo->operation == REGISTER){
3212
while(armci_util_long_getval(flag) != ARMCI_STAMP){
3217
printf("%d: client flag(%p)=%ld off=%d\n",
3218
armci_me,flag,*flag,msginfo->datalen);
3225
int *flg = (int *)(dataptr+len);
3226
while(armci_util_int_getval(flg) != ARMCI_STAMP){
3231
printf("%d: client waiting (%p)=%d off=%d\n",
3232
armci_me,flg,*flg,len);
3243
#ifdef GET_STRIDED_COPY_PIPELINED
3244
/**Same as armci_ReadFromDirect, except reads partial segments
3245
* (identify by stamping done in armci_send_req_msg() and
3246
* returns. Note that the return value is the starting pointer of the
3247
* buffer containig the data. It is the same for all the segments
3248
* read for a message.
3249
* @param proc IN Read data corresponding to an earlier req to this proc
3250
* @param msginfo IN The request for which we are reading now
3251
* @param len IN #bytes in the total response
3252
* @param bytes_done OUT @bytes of the total response read so far (monotonic)
3253
* @return Starting pointer to the buffer containing the data
3255
char *armci_ReadFromDirectSegment(int proc, request_header_t *msginfo, int len, int *bytes_done) {
3256
int cluster = armci_clus_id(proc);
3257
vapibuf_ext_t* evbuf=BUF_TO_EVBUF(msginfo);
3258
char *dataptr = GET_DATA_PTR(evbuf->buf);
3259
extern void armci_util_wait_int(volatile int *,int,int);
3261
if(DEBUG_CLN){ printf("%d(c):read direct %d qp=%p\n",armci_me,
3262
len,&(SRV_con+cluster)->qp); fflush(stdout);
3265
if(mark_buf_send_complete[evbuf->snd_dscr.wr_id]==0)
3266
armci_send_complete(&(evbuf->snd_dscr),"armci_ReadFromDirect",1);
3268
if(!msginfo->bypass){
3270
int *last, *mid1, *mid2, third;
3272
flag = &(msginfo->tag.ack);
3273
if(msginfo->operation==GET){
3274
last = (int *)(dataptr+len-sizeof(int));
3275
if(msginfo->dscrlen >= (len-sizeof(int))){
3276
last = (int *)(dataptr+len+msginfo->dscrlen-sizeof(int));
3277
dataptr+=msginfo->dscrlen;
3279
third = (last-(int*)(msginfo->dscrlen+(char*)(msginfo+1)))/3;
3280
mid2 = (last - third);
3281
mid1 = mid2 - third;
3284
printf("\n%d:flagval=%d at ptr=%p ack=%ld dist=%d\n",armci_me,*last,
3285
last,*flag,len);fflush(stdout);
3288
while(armci_util_int_getval(last) == ARMCI_STAMP &&
3289
armci_util_long_getval(flag) != ARMCI_STAMP){
3294
printf("%d: client last(%p)=%d flag(%p)=%ld off=%d\n",
3295
armci_me,last,*last,flag,*flag,msginfo->datalen);
3301
int ssize = GET_STRIDED_COPY_PIPELINED_SIZE/sizeof(int);
3302
int *sfirst = (int*)(msginfo->dscrlen+(char*)(msginfo+1))+ssize; /*stamping
3305
int off = (((int *)(dataptr+*bytes_done)-sfirst+ssize)/ssize)*ssize;
3306
int *ptr = sfirst+off;
3308
dassert(1,(void *)sfirst>dataptr);
3309
dassert(1,(void *)ptr>dataptr);
3310
if(ptr<=slast && armci_util_int_getval(ptr)!=ARMCI_STAMP) {
3311
*bytes_done = ((char*)ptr)-dataptr;
3320
else if(msginfo->operation == REGISTER){
3321
while(armci_util_long_getval(flag) != ARMCI_STAMP){
3326
printf("%d: client flag(%p)=%ld off=%d\n",
3327
armci_me,flag,*flag,msginfo->datalen);
3334
int *flg = (int *)(dataptr+len);
3335
while(armci_util_int_getval(flg) != ARMCI_STAMP){
3340
printf("%d: client waiting (%p)=%d off=%d\n",
3341
armci_me,flg,*flg,len);
3354
* @param proc IN id of remote client to put to
3355
* @param buf IN local buf (has to be registered)
3357
void armci_send_data_to_client(int proc, void *buf, int bytes,void *dbuf)
3360
struct ibv_send_wr *bad_wr;
3361
struct ibv_send_wr sdscr;
3362
struct ibv_sge ssg_entry;
3365
printf("\n%d(s):sending data to client %d at %p flag = %p bytes=%d\n",
3367
proc,dbuf,(char *)dbuf+bytes-sizeof(int),bytes);fflush(stdout);
3370
memset(&sdscr,0,sizeof(struct ibv_send_wr));
3371
memset(&ssg_entry,0,sizeof(ssg_entry));
3372
armci_init_vbuf_srdma(&sdscr,&ssg_entry,buf,dbuf,bytes,
3373
&serv_memhandle,(handle_array+proc));
3376
printf("\n%d(s):handle_array[%d]=%p dbuf=%p flag=%p bytes=%d\n",armci_me,
3377
proc,&handle_array[proc],(char *)dbuf,
3378
(char *)dbuf+bytes-sizeof(int),bytes);
3382
#if defined(PEND_BUFS)
3383
for(i=proc*(IMM_BUF_NUM+1); i<(proc+1)*(IMM_BUF_NUM+1); i++) {
3384
if((char*)buf>= serv_buf_arr[i]->buf &&
3385
(char*)buf<IMM_BUF_LEN+(char*)serv_buf_arr[i]->buf)
3390
if(i<(proc+1)*(IMM_BUF_NUM+1)) {
3391
/*Message from an immediate buffer*/
3392
assert(serv_buf_arr[i]->send_pending==0);
3393
serv_buf_arr[i]->send_pending=1;
3394
sdscr.wr_id = DSCRID_IMMBUF_RESP+i;
3399
sdscr.wr_id = DSCRID_IMMBUF_RESP+armci_nproc*(IMM_BUF_NUM+1)+1;
3403
/* #if defined(PEND_BUFS) */
3405
/* static uint64_t ctr=DSCRID_IMMBUF_RESP; */
3406
/* sdscr.wr_id = ctr; */
3407
/* ctr = (ctr+1-DSCRID_IMMBUF_RESP)%(DSCRID_IMMBUF_RESP_END-DSCRID_IMMBUF_RESP)+DSCRID_IMMBUF_RESP; */
3410
sdscr.wr_id = proc+armci_nproc;
3412
rc = ibv_post_send((CLN_con+proc)->qp, &sdscr, &bad_wr);
3413
dassert1(1,rc==0,rc);
3415
#if !defined(PEND_BUFS)
3416
armci_send_complete(&sdscr,"armci_send_data_to_client",1);
3420
void armci_WriteToDirect(int proc, request_header_t* msginfo, void *buf)
3424
ARMCI_PR_DBG("enter",0);
3425
bytes = (int)msginfo->datalen;
3427
printf("%d(s):write to direct sent %d to %d at %p\n",armci_me,
3428
bytes,proc,(char *)msginfo->tag.data_ptr);
3431
if(msginfo->operation!=GET){
3432
*(int *)((char *)buf+bytes)=ARMCI_STAMP;
3435
#if defined(PEND_BUFS)
3436
if(!msginfo->tag.imm_msg) {
3438
/* fprintf(stderr, "%d:: Not immediate mesg operated on\n", armci_me); */
3439
assert(msginfo->operation == GET); /*nothing else uses this for now*/
3440
/**This is a pending buf*/
3441
vapibuf_pend_t *pbuf=NULL;
3443
for(i = 0; i<PENDING_BUF_NUM; i++) {
3444
if(serv_pendbuf_arr[i].buf == (char *)msginfo) {
3445
pbuf = &serv_pendbuf_arr[i];
3450
assert(pbuf != NULL);
3451
assert(sizeof(request_header_t)+msginfo->dscrlen+bytes<PENDING_BUF_LEN);
3452
_armci_send_data_to_client_pbuf(proc, &pbuf->sdscr,
3453
PBUF_BUFID_TO_PUT_WRID(index),
3455
msginfo->tag.data_ptr, buf,
3461
armci_send_data_to_client(proc,buf,bytes,msginfo->tag.data_ptr);
3463
/*if(msginfo->dscrlen >= (bytes-sizeof(int)))
3464
last = (int*)(((char*)(buf)) + (msginfo->dscrlen+bytes - sizeof(int)));
3466
last = (int*)(((char*)(buf)) + (bytes - sizeof(int)));
3468
if(msginfo->operation==GET && *last == ARMCI_STAMP){
3469
SERVER_SEND_ACK(msginfo->from);
3471
armci_ack_proc=NONE;
3472
ARMCI_PR_DBG("exit",0);
3476
#if defined(PEND_BUFS)
3477
void armci_rcv_req(void *mesg,void *phdr,void *pdescr,void *pdata,int *buflen)
3479
request_header_t *msginfo = *(request_header_t**)mesg;
3480
*(void **)phdr = msginfo;
3482
if(msginfo->tag.imm_msg)
3483
*buflen = IMM_BUF_LEN - sizeof(request_header_t) - msginfo->dscrlen;
3485
*buflen = PENDING_BUF_LEN - sizeof(request_header_t) - msginfo->dscrlen;
3487
*(void **)pdata = msginfo->dscrlen + (char *)(msginfo+1);
3489
*(void **)pdescr = msginfo+1;
3491
*(void **)pdescr = NULL;
3494
void armci_rcv_req(void *mesg,void *phdr,void *pdescr,void *pdata,int *buflen)
3496
vapibuf_t *vbuf = (vapibuf_t*)mesg;
3497
request_header_t *msginfo = (request_header_t *)vbuf->buf;
3498
*(void **)phdr = msginfo;
3500
ARMCI_PR_DBG("enter",msginfo->operation);
3502
printf("%d(server): got %d req (dscrlen=%d datalen=%d) from %d\n",
3503
armci_me, msginfo->operation, msginfo->dscrlen,
3504
msginfo->datalen, msginfo->from); fflush(stdout);
3507
/* we leave room for msginfo on the client side */
3508
*buflen = MSG_BUFLEN - sizeof(request_header_t);
3510
if(msginfo->bytes) {
3511
*(void **)pdescr = msginfo+1;
3512
if(msginfo->operation == GET)
3513
*(void **)pdata = MessageRcvBuffer;
3515
*(void **)pdata = msginfo->dscrlen + (char*)(msginfo+1);
3517
*(void**)pdescr = NULL;
3518
*(void**)pdata = MessageRcvBuffer;
3520
ARMCI_PR_DBG("exit",msginfo->operation);
3524
static void posts_scatter_desc(sr_descr_t *pend_dscr,int proc,int type)
3527
int cluster = armci_clus_id(proc);
3528
struct ibv_recv_wr *scat_dscr;
3529
struct ibv_recv_wr *bad_wr;
3531
scat_dscr = &pend_dscr->rdescr;
3533
/*armci_vapi_print_dscr_info(NULL,scat_dscr);*/
3534
if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN)){
3535
printf("%d(%d) : inside posts scatter dscr, id is %d\n",
3536
armci_me,type,scat_dscr->wr_id);
3541
rc = ibv_post_recv((CLN_con + proc)->qp, scat_dscr, &bad_wr);
3543
rc = ibv_post_recv((SRV_con+cluster)->qp, scat_dscr, &bad_wr);
3544
dassert1(1,rc==0,rc);
3546
if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN) ) {
3547
printf("\n%d: list_length is %d, id is %ld\n",
3548
armci_me,scat_dscr->num_sge,scat_dscr->wr_id);
3555
* client calls from request.c
3556
* server calls from ds-shared.c
3558
static sr_descr_t serv_blocking_scatter_dscr;
3559
static sr_descr_t client_blocking_scatter_dscr;
3560
void armci_post_scatter(void *dest_ptr, int dest_stride_arr[], int count[],
3561
int stride_levels, armci_vapi_memhndl_t *mhandle,
3562
int proc, int nbtag, int type, sr_descr_t **srd)
3566
int total_of_2D = 1;
3567
int index[MAX_STRIDE_LEVEL], unit[MAX_STRIDE_LEVEL];
3570
int num_xmit = 0, num_seg, max_seg, rem_seg,vecind;
3572
sr_descr_t *pend_dscr;
3573
struct ibv_sge *scat_sglist;
3574
struct ibv_recv_wr *scat_dscr;
3576
if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN) ){
3577
printf("%d(%d) : inside post_scatter %d\n",armci_me,type,nbtag);
3581
max_seg = armci_max_num_sg_ent;
3583
THREAD_LOCK(armci_user_threads.net_lock);
3586
pend_dscr = armci_vapi_get_next_rdescr(nbtag,1);
3587
if(srd!=NULL)*srd=pend_dscr;
3590
pend_dscr = &client_blocking_scatter_dscr;
3591
pend_dscr->rdescr.wr_id=DSCRID_SCATGAT + MAX_PENDING;
3594
/*pend_dscr->proc = proc;*/
3595
pend_dscr->numofrecvs=0;
3597
scat_dscr = &pend_dscr->rdescr;
3598
scat_sglist = pend_dscr->sg_entry;
3599
/* scat_dscr->opcode = VAPI_RECEIVE; no ->opcode in ibv_recv_wr */
3600
/* scat_dscr->comp_type = VAPI_SIGNALED; no ->comp_type in ibv_recv_wr */
3601
scat_dscr->sg_list = scat_sglist;
3602
scat_dscr->num_sge = 0;
3604
index[2] = 0; unit[2] = 1;
3605
if(stride_levels > 1){
3606
total_of_2D = count[2];
3607
for(j=3; j<=stride_levels; j++){
3608
index[j] = 0; unit[j] = unit[j-1]*count[j-1];
3609
total_of_2D*=count[j];
3613
num_xmit = total_of_2D*count[1]/max_seg;
3614
rem_seg = (total_of_2D*count[1])%max_seg;
3615
if(num_xmit == 0) num_xmit = 1;
3616
else if(rem_seg!= 0)num_xmit++;
3619
if ((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN) ) {
3620
printf("%d(%d):armci_post_scatter num_xmit = %d\t, rem_seg = %d\n",
3621
armci_me,type,num_xmit,rem_seg);
3626
if(rem_seg!=0 && k==(num_xmit-1))num_seg = rem_seg;
3627
else num_seg = max_seg;
3630
for(i=0;i<total_of_2D;i++){
3631
src = (char *)dest_ptr;
3632
for(j=2;j<=stride_levels;j++){
3633
src+= index[j]*dest_stride_arr[j-1];
3634
if(((i+1)%unit[j]) == 0) index[j]++;
3635
if(index[j] >= count[j]) index[j] =0;
3639
for(j=0; j<count[1]; j++, vecind++){
3640
if(vecind == num_seg) {
3641
posts_scatter_desc(pend_dscr,proc,type);
3642
pend_dscr->numofrecvs++;
3644
/* the previous one has been posted, start off new*/
3645
scat_dscr->num_sge = 0;
3646
y = 0; /* reuse the same scatter descriptor */
3647
vecind=0;total_size=0;k++;
3648
if(rem_seg!=0 && k==(num_xmit-1))num_seg = rem_seg;
3650
/* fill the scatter descriptor */
3651
scat_sglist[y].addr = (uint64_t)src1;
3652
scat_sglist[y].lkey = mhandle->lkey;
3653
scat_sglist[y].length = count[0];
3654
scat_dscr->num_sge++;
3655
src1 += dest_stride_arr[0];
3660
if(vecind == num_seg){
3661
posts_scatter_desc(pend_dscr,proc,type);
3662
pend_dscr->numofrecvs++;
3664
/* the previous one has been posted, start off new*/
3665
scat_dscr->num_sge = 0;
3667
vecind = 0; total_size=0; k++;
3668
if(rem_seg!=0 && k==(num_xmit-1))num_seg=rem_seg;
3669
else num_seg = max_seg;
3674
THREAD_UNLOCK(armci_user_threads.net_lock);
3676
/* printf("%d(s): num scatters posted=%d\n", armci_me,pend_dscr->numofrecvs); */
3678
/*if blocking call wait_for_blocking_scatter to complete*/
3683
void armci_wait_for_blocking_scatter()
3685
sr_descr_t *pend_dscr=&client_blocking_scatter_dscr;
3687
armci_recv_complete(&pend_dscr->rdescr,"armci_post_scatter",pend_dscr->numofrecvs);
3692
* function used by armci_post_gather to actually post the sctter list
3694
static void posts_gather_desc(sr_descr_t *pend_dscr,int proc,int type)
3697
int cluster = armci_clus_id(proc);
3698
struct ibv_send_wr *gat_dscr;
3699
struct ibv_send_wr *bad_wr;
3701
THREAD_LOCK(armci_user_threads.net_lock);
3703
gat_dscr = &pend_dscr->sdescr;
3704
/*armci_vapi_print_dscr_info(gat_dscr,NULL);*/
3705
if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN)){
3706
printf("%d: type(client=1)=%d inside posts gather dscr, id is %d\n",
3707
armci_me,type,gat_dscr->wr_id);
3713
rc = ibv_post_send((SRV_con+cluster)->qp, gat_dscr, &bad_wr);
3716
rc = ibv_post_send((CLN_con + proc)->qp, gat_dscr, &bad_wr);
3718
dassert1(1,rc==0,rc);
3720
THREAD_UNLOCK(armci_user_threads.net_lock);
3725
* posts a bunch of gather descriptors
3727
static sr_descr_t serv_blocking_gather_dscr;
3728
static sr_descr_t client_blocking_gather_dscr;
3729
void armci_post_gather(void *src_ptr, int src_stride_arr[], int count[],
3730
int stride_levels, armci_vapi_memhndl_t *mhandle,
3731
int proc,int nbtag, int type, sr_descr_t **srd)
3734
int total_of_2D = 1;
3736
int index[MAX_STRIDE_LEVEL], unit[MAX_STRIDE_LEVEL];
3740
int num_xmit = 0, num_seg, max_seg, rem_seg,vecind;
3741
sr_descr_t *pend_dscr;
3743
struct ibv_sge *gat_sglist;
3744
struct ibv_send_wr *gat_dscr;
3746
if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN)){
3747
printf("%d(%d) : inside post_gather\n",armci_me,type);
3751
max_seg = armci_max_num_sg_ent;
3753
pend_dscr = armci_vapi_get_next_sdescr(nbtag,1);
3754
if(srd!=NULL)*srd=pend_dscr;
3757
pend_dscr = &client_blocking_gather_dscr;
3758
pend_dscr->sdescr.wr_id=DSCRID_SCATGAT + MAX_PENDING;
3760
pend_dscr->numofsends=0;
3762
gat_dscr = &pend_dscr->sdescr;
3763
gat_sglist = pend_dscr->sg_entry;
3764
gat_dscr->opcode = IBV_WR_SEND;
3765
gat_dscr->send_flags = IBV_SEND_SIGNALED;
3766
gat_dscr->sg_list = gat_sglist;
3767
gat_dscr->num_sge = 0;
3768
/* gat_dscr->send_flags = 0; */
3770
index[2] = 0; unit[2] = 1;
3771
if(stride_levels > 1){
3772
total_of_2D = count[2];
3773
for(j=3; j<=stride_levels; j++){
3774
index[j] = 0; unit[j] = unit[j-1]*count[j-1];
3775
total_of_2D*=count[j];
3779
num_xmit = total_of_2D*count[1]/max_seg;
3780
rem_seg = (total_of_2D*count[1])%max_seg;
3781
if(num_xmit == 0) num_xmit = 1;
3782
else if(rem_seg!= 0)num_xmit++;
3784
if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN) ){
3785
printf("%d(%d):armci_post_gather total_2D=%d, num_xmit=%d, rem_seg =%d, count[1] = %d\n",armci_me,type,total_of_2D, num_xmit,rem_seg,count[1]);
3790
if(rem_seg!=0 && k==(num_xmit-1))num_seg = rem_seg;
3791
else num_seg = max_seg;
3794
for(i=0;i<total_of_2D;i++){
3795
src = (char *)src_ptr;
3796
for(j=2;j<=stride_levels;j++){
3797
src+= index[j]*src_stride_arr[j-1];
3798
if(((i+1)%unit[j]) == 0) index[j]++;
3799
if(index[j] >= count[j]) index[j] =0;
3803
for(j=0; j<count[1]; j++, vecind++){
3804
if(vecind == num_seg){
3805
posts_gather_desc(pend_dscr,proc,type);
3806
pend_dscr->numofsends++;
3808
/* the previous one has been posted, start off new*/
3809
gat_dscr->num_sge = 0;
3811
vecind=0;total_size=0;k++;
3812
if(rem_seg!=0 && k==(num_xmit-1))num_seg = rem_seg;
3815
/* fill the gather descriptor */
3816
gat_sglist[y].addr = (uint64_t)src1;
3817
gat_sglist[y].lkey = mhandle->lkey;
3818
gat_sglist[y].length = count[0];
3819
gat_dscr->num_sge++;
3820
src1 += src_stride_arr[0];
3825
if(vecind == num_seg){
3826
posts_gather_desc(pend_dscr,proc,type);
3827
pend_dscr->numofsends++;
3828
if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN) ){
3829
printf("%d(%d)posts_gather_desc done\n",armci_me,type);
3833
/* the previous one has been posted, start off new*/
3834
gat_dscr->num_sge = 0;
3836
vecind = 0; total_size=0; k++;
3837
if(rem_seg!=0 && k==(num_xmit-1))num_seg=rem_seg;
3838
else num_seg = max_seg;
3841
/* printf("%d: num gathers posted =%d\n",armci_me,pend_dscr->numofsends); */
3844
armci_send_complete(&pend_dscr->sdescr,"armci_post_gather",pend_dscr->numofsends);
3848
/***********************END SCATTER GATHER STUFF******************************/
3852
/***********************SPECIAL SEND/RECV*************************************/
3853
void armci_server_direct_send(int dst, char *src_buf, char *dst_buf, int len,
3854
uint32_t *lkey, uint32_t *rkey)
3857
struct ibv_wc *pdscr=NULL;
3858
struct ibv_wc pdscr1;
3859
struct ibv_send_wr sdscr;
3860
struct ibv_sge ssg_entry;
3865
printf("\n%d(s):sending dir data to client %d at %p bytes=%d last=%p\n",
3866
armci_me,dst,dst_buf,len,(dst_buf+len-4));fflush(stdout);
3869
memset(&sdscr,0,sizeof(struct ibv_send_wr));
3870
armci_init_vbuf_srdma(&sdscr,&ssg_entry,src_buf,dst_buf,len,NULL,NULL);
3871
sdscr.wr.rdma.rkey = *rkey;
3872
ssg_entry.lkey = *lkey;
3874
sdscr.wr_id = dst+armci_nproc;
3875
struct ibv_send_wr *bad_wr;
3876
rc = ibv_post_send((CLN_con+dst)->qp, &sdscr, &bad_wr);
3877
dassert1(1,rc==0,rc);
3880
rc = ibv_poll_cq(CLN_nic->scq, 1, pdscr);
3882
dassertp(1,rc>=0,("%d: rc=%d id=%d status=%d\n",
3883
armci_me,rc,(int)pdscr->wr_id,pdscr->status));
3884
dassert1(1,pdscr->status==IBV_WC_SUCCESS,pdscr->status);;
3889
void armci_send_contig_bypass(int proc, request_header_t *msginfo,
3890
void *src_ptr, void *rem_ptr, int bytes)
3893
uint32_t *lkey=NULL;
3895
int dscrlen = msginfo->dscrlen;
3897
last = (int*)(((char*)(src_ptr)) + (bytes - sizeof(int)));
3898
dassertp(1,msginfo->pinned,("%d: not pinned proc=%d",armci_me,proc));
3900
rkey = (uint32_t *)((char *)(msginfo+1)+dscrlen-(sizeof(uint32_t)+sizeof(uint32_t)));
3903
printf("%d(server): sending data bypass to %d (%p,%p) %d %d\n", armci_me,
3904
msginfo->from,src_ptr, rem_ptr,*lkey,*rkey);
3907
armci_server_direct_send(msginfo->from,src_ptr,rem_ptr,bytes,lkey,rkey);
3909
if(*last == ARMCI_STAMP){
3910
SERVER_SEND_ACK(msginfo->from);
3914
void armci_rcv_strided_data_bypass_both(int proc, request_header_t *msginfo,
3915
void *ptr, int *count, int stride_levels)
3917
int datalen = msginfo->datalen;
3922
if(DEBUG_CLN){ printf("%d:rcv_strided_data_both bypass from %d\n",
3923
armci_me, proc); fflush(stdout);
3926
last = (int*)(((char*)(ptr)) + (count[0] -sizeof(int)));
3927
ack = (long *)&msginfo->tag;
3928
while(armci_util_int_getval(last) == ARMCI_STAMP &&
3929
armci_util_long_getval(ack) != ARMCI_STAMP){
3934
printf("%d: client last(%p)=%d ack(%p)=%ld off=%d\n",
3935
armci_me,last,*last,ack,*ack,(int)((char*)last - (char*)ptr));
3942
printf("\n%d:rcv_strided_data called, it should never be called\n",armci_me);
3943
armci_dscrlist_recv_complete(0,"armci_rcv_strided_data_bypass_both",NULL);
3946
if(DEBUG_CLN){printf("%d:rcv_strided_data bypass both: %d bytes from %d\n",
3947
armci_me, datalen, proc); fflush(stdout);
3952
/*************************END OF FILE UNUSED CODE BELOW********************/
3953
int armci_pin_memory(void *ptr, int stride_arr[], int count[], int strides)
3955
printf("\n%d:armci_pin_memory not implemented",armci_me);fflush(stdout);
3960
void armci_client_send_ack(int proc, int n)
3962
printf("\n%d:client_send_ack not implemented",armci_me);fflush(stdout);
3966
void armci_rcv_strided_data_bypass(int proc, request_header_t* msginfo,
3967
void *ptr, int stride_levels)
3969
printf("\n%d:armci_rcv_strided_data_bypass not implemented",armci_me);
3974
void armci_unpin_memory(void *ptr, int stride_arr[], int count[], int strides)
3976
printf("\n%d:armci_unpin_memory not implemented",armci_me);fflush(stdout);
3980
int armcill_server_wait_ack(int proc, int n)
3982
printf("\n%d:armcill_server_wait_ack not implemented",armci_me);
3988
void armcill_server_put(int proc, void* s, void *d, int len)
3990
printf("\n%d:armcill_server_put not implemented",armci_me);fflush(stdout);
3995
* initialising the atomic send descriptor
3997
void armci_init_vapibuf_atomic(struct ibv_send_wr *sd, struct ibv_sge *sg,
3998
int op, int*ploc,int *prem, int extra,
3999
int id,ARMCI_MEMHDL_T *lhandle,
4000
ARMCI_MEMHDL_T *rhandle)
4003
printf("%d(c) : entered armci_init_vapibuf_atomic\n",armci_me);
4006
memset(sd,0,sizeof(struct ibv_send_wr));
4007
if (op == ARMCI_FETCH_AND_ADD_LONG ) {
4008
printf("%d(c) :setting opcode for snd dscr to FETCH_AND_ADD\n",armci_me);
4009
sd->opcode = IBV_WR_ATOMIC_FETCH_AND_ADD;
4010
sd->wr.atomic.compare_add = (uint64_t)extra;
4011
} else if(op == ARMCI_SWAP_LONG){
4012
sd->opcode = IBV_WR_ATOMIC_CMP_AND_SWP;
4013
sd->wr.atomic.swap = (uint64_t)extra;
4015
sd->send_flags = IBV_SEND_SIGNALED;
4016
sg->length = 8; /* 64 bit atomic*/
4017
printf("--------\n");
4018
sg->addr= (uint64_t)(void *)ploc;
4020
sg->lkey = lhandle->lkey;
4023
sd->wr.atomic.remote_addr = (uint64_t)(void *)prem;
4025
sd->wr.atomic.rkey = rhandle->rkey; /* how do we get the remote key */
4026
sd->wr_id = DSCRID_RMW + armci_me;
4029
printf("%d(c) : finished initialising atomic send desc id is %ld,armci_ime = %d\n",armci_me,sd->wr_id,armci_me);
4034
* using vapi remote atomic operations
4036
void client_rmw_complete(struct ibv_send_wr *snd_dscr, char *from)
4039
struct ibv_wc pdscr1;
4040
struct ibv_wc *pdscr=&pdscr1;
4042
printf("%d(c) : inside client_rmw_complete\n",armci_me);
4045
rc = ibv_poll_cq(CLN_nic->scq, 1, pdscr);
4047
dassertp(DBG_POLL|DBG_ALL,rc>=0,
4048
("%d: rc=%d id=%d status=%d\n",
4049
armci_me,rc,pdscr->wr_id,pdscr->status));
4050
dassert1(1,pdscr->status==IBV_WC_SUCCESS,pdscr->status);
4052
} while(pdscr->wr_id != snd_dscr->wr_id);
4056
void armci_direct_rmw(int op, int*ploc, int *prem, int extra, int proc,
4057
ARMCI_MEMHDL_T *lhandle, ARMCI_MEMHDL_T *rhandle)
4060
struct ibv_send_wr *sd;
4063
armci_connect_t *con;
4068
sd = &(rmw[armci_me].rmw_dscr);
4069
sg = &(rmw[armci_me].rmw_entry);
4072
printf("%d(c) : about to call armci_init_vapibuf_atomic\n",armci_me);
4076
armci_init_vapibuf_atomic(sd, sg, op,ploc,prem,extra,proc,lhandle,rhandle);
4079
printf("%d(c) : finished armci_init_vapibuf_atomic\n",armci_me);
4083
struct ibv_send_wr * bad_wr;
4084
rc = ibv_post_send(con->qp, sd, &bad_wr);
4085
dassert1(1,rc==0,rc);
4088
printf("%d(c) : finished posting desc\n",armci_me);
4092
/*armci_send_complete(sd,"send_remote_atomic");*/
4093
client_rmw_complete(sd,"send_remote_atomic");