1
/*****************************************************************************/
2
/* "NetPIPE" -- Network Protocol Independent Performance Evaluator. */
3
/* Copyright 1997, 1998 Iowa State University Research Foundation, Inc. */
5
/* This program is free software; you can redistribute it and/or modify */
6
/* it under the terms of the GNU General Public License as published by */
7
/* the Free Software Foundation. You should have received a copy of the */
8
/* GNU General Public License along with this program; if not, write to the */
9
/* Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */
11
/* ib.c ---- Infiniband module for the Mellanox VAPI */
12
/*****************************************************************************/
14
#define USE_VOLATILE_RPTR /* needed for polling on last byte of recv buffer */
19
/* Debugging output macro */
24
#define LOGPRINTF(_format, _aa...) fprintf(logfile, __FUNCTION__": " _format, ##_aa); fflush(logfile)
26
#define LOGPRINTF(_format, _aa...)
29
/* Header files needed for Infiniband */
31
#include "vapi.h" /* Mellanox Verbs API */
32
#include "evapi.h" /* Mellanox Verbs API extension */
33
#include "vapi_common.h" /* Mellanox VIP layer of HCA Verbs */
37
static VAPI_hca_hndl_t hca_hndl=VAPI_INVAL_HNDL;
38
static VAPI_hca_port_t hca_port;
41
static IB_lid_t d_lid;
42
static VAPI_pd_hndl_t pd_hndl=VAPI_INVAL_HNDL;
43
static VAPI_cqe_num_t num_cqe;
44
static VAPI_cqe_num_t act_num_cqe;
45
static VAPI_cq_hndl_t s_cq_hndl=VAPI_INVAL_HNDL;
46
static VAPI_cq_hndl_t r_cq_hndl=VAPI_INVAL_HNDL;
47
static EVAPI_compl_handler_hndl_t ceh_hndl=VAPI_INVAL_HNDL;
48
static VAPI_mrw_t mr_in;
49
static VAPI_mrw_t s_mr_out;
50
static VAPI_mrw_t r_mr_out;
51
static VAPI_mr_hndl_t s_mr_hndl=VAPI_INVAL_HNDL;
52
static VAPI_mr_hndl_t r_mr_hndl=VAPI_INVAL_HNDL;
53
static VAPI_qp_init_attr_t qp_init_attr;
54
static VAPI_qp_prop_t qp_prop;
55
static VAPI_qp_hndl_t qp_hndl=VAPI_INVAL_HNDL;
56
static VAPI_qp_num_t d_qp_num;
57
static VAPI_qp_attr_mask_t qp_attr_mask;
58
static VAPI_qp_attr_t qp_attr;
59
static VAPI_qp_cap_t qp_cap;
60
static VAPI_wc_desc_t wc;
61
static int max_wq=50000;
62
static void* remote_address;
63
static VAPI_rkey_t remote_key;
64
static volatile int receive_complete;
66
/* Local prototypes */
68
void event_handler(VAPI_hca_hndl_t, VAPI_cq_hndl_t, void*);
70
/* Function definitions */
72
void Init(ArgStruct *p, int* pargc, char*** pargv)
76
p->prot.ib_mtu = MTU1024; /* 1024 Byte MTU */
77
p->prot.commtype = NP_COMM_SENDRECV; /* Use Send/Receive communications */
78
p->prot.comptype = NP_COMP_LOCALPOLL; /* Use local polling for completion */
79
p->tr = 0; /* I am not the transmitter */
80
p->rcv = 1; /* I am the receiver */
83
void Setup(ArgStruct *p)
88
struct sockaddr_in *lsin1, *lsin2; /* ptr to sockaddr_in in ArgStruct */
91
struct protoent *proto;
92
int send_size, recv_size, sizeofint = sizeof(int);
93
struct sigaction sigact1;
97
if( p->prot.commtype == NP_COMM_RDMAWRITE &&
98
p->prot.comptype != NP_COMP_LOCALPOLL ) {
99
fprintf(stderr, "Error, RDMA Write may only be used with local polling.\n");
100
fprintf(stderr, "Try using RDMA Write With Immediate Data with vapi polling\n");
101
fprintf(stderr, "or event completion\n");
106
sprintf(logfilename, ".iblog%d", 1 - p->tr);
107
logfile = fopen(logfilename, "w");
109
host = p->host; /* copy ptr to hostname */
111
lsin1 = &(p->prot.sin1);
112
lsin2 = &(p->prot.sin2);
114
bzero((char *) lsin1, sizeof(*lsin1));
115
bzero((char *) lsin2, sizeof(*lsin2));
117
if ( (sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
118
printf("NetPIPE: can't open stream socket! errno=%d\n", errno);
122
if(!(proto = getprotobyname("tcp"))){
123
printf("NetPIPE: protocol 'tcp' unknown!\n");
127
if (p->tr){ /* if client i.e., Sender */
130
if (atoi(host) > 0) { /* Numerical IP address */
131
lsin1->sin_family = AF_INET;
132
lsin1->sin_addr.s_addr = inet_addr(host);
136
if ((addr = gethostbyname(host)) == NULL){
137
printf("NetPIPE: invalid hostname '%s'\n", host);
141
lsin1->sin_family = addr->h_addrtype;
142
bcopy(addr->h_addr, (char*) &(lsin1->sin_addr.s_addr), addr->h_length);
145
lsin1->sin_port = htons(p->port);
147
} else { /* we are the receiver (server) */
149
bzero((char *) lsin1, sizeof(*lsin1));
150
lsin1->sin_family = AF_INET;
151
lsin1->sin_addr.s_addr = htonl(INADDR_ANY);
152
lsin1->sin_port = htons(p->port);
154
if (bind(sockfd, (struct sockaddr *) lsin1, sizeof(*lsin1)) < 0){
155
printf("NetPIPE: server: bind on local address failed! errno=%d", errno);
164
p->servicefd = sockfd;
168
/* Establish tcp connections */
172
/* Initialize Mellanox Infiniband */
174
if(initIB(p) == -1) {
180
int initIB(ArgStruct *p)
186
/* open hca just in case it was not opened by system earlier */
187
ret = VAPI_open_hca("InfiniHost0", &hca_hndl);
189
ret = EVAPI_get_hca_hndl("InfiniHost0", &hca_hndl);
191
fprintf(stderr, "Error opening Infiniband HCA: %s\n", VAPI_strerror(ret));
194
LOGPRINTF("Opened Infiniband HCA\n");
197
/* Get HCA properties */
200
ret = VAPI_query_hca_port_prop(hca_hndl, (IB_port_t)port_num,
201
(VAPI_hca_port_t *)&hca_port);
203
fprintf(stderr, "Error querying Infiniband HCA: %s\n", VAPI_strerror(ret));
206
LOGPRINTF("Queried Infiniband HCA\n");
209
LOGPRINTF(" lid = %d\n", lid);
212
/* Allocate Protection Domain */
214
ret = VAPI_alloc_pd(hca_hndl, &pd_hndl);
216
fprintf(stderr, "Error allocating PD: %s\n", VAPI_strerror(ret));
219
LOGPRINTF("Allocated Protection Domain\n");
223
/* Create send completion queue */
225
num_cqe = 30000; /* Requested number of completion q elements */
226
ret = VAPI_create_cq(hca_hndl, num_cqe, &s_cq_hndl, &act_num_cqe);
228
fprintf(stderr, "Error creating send CQ: %s\n", VAPI_strerror(ret));
231
LOGPRINTF("Created Send Completion Queue with %d elements\n", act_num_cqe);
235
/* Create recv completion queue */
237
num_cqe = 20000; /* Requested number of completion q elements */
238
ret = VAPI_create_cq(hca_hndl, num_cqe, &r_cq_hndl, &act_num_cqe);
240
fprintf(stderr, "Error creating recv CQ: %s\n", VAPI_strerror(ret));
243
LOGPRINTF("Created Recv Completion Queue with %d elements\n", act_num_cqe);
247
/* Placeholder for MR */
250
/* Create Queue Pair */
252
qp_init_attr.cap.max_oust_wr_rq = max_wq; /* Max outstanding WR on RQ */
253
qp_init_attr.cap.max_oust_wr_sq = max_wq; /* Max outstanding WR on SQ */
254
qp_init_attr.cap.max_sg_size_rq = 1; /* Max scatter/gather entries on RQ */
255
qp_init_attr.cap.max_sg_size_sq = 1; /* Max scatter/gather entries on SQ */
256
qp_init_attr.pd_hndl = pd_hndl; /* Protection domain handle */
257
qp_init_attr.rdd_hndl = 0; /* Reliable datagram domain handle */
258
qp_init_attr.rq_cq_hndl = r_cq_hndl; /* CQ handle for RQ */
259
qp_init_attr.rq_sig_type = VAPI_SIGNAL_REQ_WR; /* Signalling type */
260
qp_init_attr.sq_cq_hndl = s_cq_hndl; /* CQ handle for RQ */
261
qp_init_attr.sq_sig_type = VAPI_SIGNAL_REQ_WR; /* Signalling type */
262
qp_init_attr.ts_type = IB_TS_RC; /* Transmission type */
264
ret = VAPI_create_qp(hca_hndl, &qp_init_attr, &qp_hndl, &qp_prop);
266
fprintf(stderr, "Error creating Queue Pair: %s\n", VAPI_strerror(ret));
269
LOGPRINTF("Created Queue Pair, max outstanding WR on RQ: %d, on SQ: %d\n",
270
qp_prop.cap.max_oust_wr_rq, qp_prop.cap.max_oust_wr_sq);
274
/* Exchange lid and qp_num with other node */
276
if( write(p->commfd, &lid, sizeof(lid) ) != sizeof(lid) ) {
277
fprintf(stderr, "Failed to send lid over socket\n");
280
if( write(p->commfd, &qp_prop.qp_num, sizeof(qp_prop.qp_num) ) != sizeof(qp_prop.qp_num) ) {
281
fprintf(stderr, "Failed to send qpnum over socket\n");
284
if( read(p->commfd, &d_lid, sizeof(d_lid) ) != sizeof(d_lid) ) {
285
fprintf(stderr, "Failed to read lid from socket\n");
288
if( read(p->commfd, &d_qp_num, sizeof(d_qp_num) ) != sizeof(d_qp_num) ) {
289
fprintf(stderr, "Failed to read qpnum from socket\n");
293
LOGPRINTF("Local: lid=%d qp_num=%d Remote: lid=%d qp_num=%d\n",
294
lid, qp_prop.qp_num, d_lid, d_qp_num);
297
/* Bring up Queue Pair */
299
/******* INIT state ******/
301
QP_ATTR_MASK_CLR_ALL(qp_attr_mask);
303
qp_attr.qp_state = VAPI_INIT;
304
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_QP_STATE);
307
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_PKEY_IX);
309
qp_attr.port = port_num;
310
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_PORT);
312
qp_attr.remote_atomic_flags = VAPI_EN_REM_WRITE | VAPI_EN_REM_READ;
313
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_REMOTE_ATOMIC_FLAGS);
315
ret = VAPI_modify_qp(hca_hndl, qp_hndl, &qp_attr, &qp_attr_mask, &qp_cap);
317
fprintf(stderr, "Error modifying QP to INIT: %s\n", VAPI_strerror(ret));
321
LOGPRINTF("Modified QP to INIT\n");
323
/******* RTR (Ready-To-Receive) state *******/
325
QP_ATTR_MASK_CLR_ALL(qp_attr_mask);
327
qp_attr.qp_state = VAPI_RTR;
328
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_QP_STATE);
330
qp_attr.qp_ous_rd_atom = 1;
331
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_QP_OUS_RD_ATOM);
333
qp_attr.dest_qp_num = d_qp_num;
334
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_DEST_QP_NUM);
337
qp_attr.av.grh_flag = FALSE;
338
qp_attr.av.dlid = d_lid;
339
qp_attr.av.static_rate = 0;
340
qp_attr.av.src_path_bits = 0;
341
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_AV);
343
qp_attr.path_mtu = p->prot.ib_mtu;
344
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_PATH_MTU);
347
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_RQ_PSN);
350
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_PKEY_IX);
352
qp_attr.min_rnr_timer = 5;
353
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_MIN_RNR_TIMER);
355
ret = VAPI_modify_qp(hca_hndl, qp_hndl, &qp_attr, &qp_attr_mask, &qp_cap);
357
fprintf(stderr, "Error modifying QP to RTR: %s\n", VAPI_strerror(ret));
361
LOGPRINTF("Modified QP to RTR\n");
363
/* Sync before going to RTS state */
366
/******* RTS (Ready-to-Send) state *******/
368
QP_ATTR_MASK_CLR_ALL(qp_attr_mask);
370
qp_attr.qp_state = VAPI_RTS;
371
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_QP_STATE);
374
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_SQ_PSN);
376
qp_attr.timeout = 31;
377
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_TIMEOUT);
379
qp_attr.retry_count = 1;
380
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_RETRY_COUNT);
382
qp_attr.rnr_retry = 1;
383
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_RNR_RETRY);
385
qp_attr.ous_dst_rd_atom = 1;
386
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_OUS_DST_RD_ATOM);
388
ret = VAPI_modify_qp(hca_hndl, qp_hndl, &qp_attr, &qp_attr_mask, &qp_cap);
390
fprintf(stderr, "Error modifying QP to RTS: %s\n", VAPI_strerror(ret));
394
LOGPRINTF("Modified QP to RTS\n");
396
/* If using event completion, register event completion handler and request
397
* the initial notification
399
if( p->prot.comptype == NP_COMP_EVENT ) {
401
EVAPI_set_comp_eventh(hca_hndl, r_cq_hndl, event_handler, p, &ceh_hndl);
402
VAPI_req_comp_notif(hca_hndl, r_cq_hndl, VAPI_NEXT_COMP);
409
int finalizeIB(ArgStruct *p)
413
LOGPRINTF("Finalizing IB stuff\n");
415
/* Clear completion event handler */
417
if(p->prot.comptype == NP_COMP_EVENT ) {
418
LOGPRINTF("Clearing comp handler\n");
419
ret = EVAPI_clear_comp_eventh(hca_hndl, ceh_hndl);
421
fprintf(stderr, "Error clearing event handler: %s\n",
426
if(qp_hndl != VAPI_INVAL_HNDL) {
427
LOGPRINTF("Destroying QP\n");
428
ret = VAPI_destroy_qp(hca_hndl, qp_hndl);
430
fprintf(stderr, "Error destroying Queue Pair: %s\n", VAPI_strerror(ret));
434
if(r_cq_hndl != VAPI_INVAL_HNDL) {
435
LOGPRINTF("Destroying Recv CQ\n");
436
ret = VAPI_destroy_cq(hca_hndl, r_cq_hndl);
438
fprintf(stderr, "Error destroying recv CQ: %s\n", VAPI_strerror(ret));
442
if(s_cq_hndl != VAPI_INVAL_HNDL) {
443
LOGPRINTF("Destroying Send CQ\n");
444
ret = VAPI_destroy_cq(hca_hndl, s_cq_hndl);
446
fprintf(stderr, "Error destroying send CQ: %s\n", VAPI_strerror(ret));
450
/* Check memory registrations just in case user bailed out */
451
if(s_mr_hndl != VAPI_INVAL_HNDL) {
452
LOGPRINTF("Deregistering send buffer\n");
453
ret = VAPI_deregister_mr(hca_hndl, s_mr_hndl);
455
fprintf(stderr, "Error deregistering send mr: %s\n", VAPI_strerror(ret));
459
if(r_mr_hndl != VAPI_INVAL_HNDL) {
460
LOGPRINTF("Deregistering recv buffer\n");
461
ret = VAPI_deregister_mr(hca_hndl, r_mr_hndl);
463
fprintf(stderr, "Error deregistering recv mr: %s\n", VAPI_strerror(ret));
467
if(pd_hndl != VAPI_INVAL_HNDL) {
468
LOGPRINTF("Deallocating PD\n");
469
ret = VAPI_dealloc_pd(hca_hndl, pd_hndl);
471
fprintf(stderr, "Error deallocating PD: %s\n", VAPI_strerror(ret));
475
/* Application code should not close HCA, just release handle */
477
if(hca_hndl != VAPI_INVAL_HNDL) {
478
LOGPRINTF("Releasing HCA\n");
479
ret = EVAPI_release_hca_hndl(hca_hndl);
481
fprintf(stderr, "Error releasing HCA: %s\n", VAPI_strerror(ret));
488
void event_handler(VAPI_hca_hndl_t hca, VAPI_cq_hndl_t cq, void* data)
494
ret = VAPI_poll_cq(hca, cq, &wc);
496
if(ret == VAPI_CQ_EMPTY) {
497
LOGPRINTF("Empty completion queue, requesting next notification\n");
498
VAPI_req_comp_notif(hca_hndl, r_cq_hndl, VAPI_NEXT_COMP);
500
} else if(ret != VAPI_OK) {
501
fprintf(stderr, "Error in event_handler, polling cq: %s\n",
504
} else if(wc.status != VAPI_SUCCESS) {
505
fprintf(stderr, "Error in event_handler, on returned work completion "
506
"status: %s\n", VAPI_wc_status_sym(wc.status));
510
LOGPRINTF("Retrieved work completion\n");
512
/* For ping-pong mode at least, this check shouldn't be needed for
513
* normal operation, but it will help catch any bugs with multiple
514
* sends coming through when we're only expecting one.
516
if(receive_complete == 1) {
518
while(receive_complete != 0) sched_yield();
522
receive_complete = 1;
529
readFully(int fd, void *obuf, int len)
532
char *buf = (char *) obuf;
535
while (bytesLeft > 0 &&
536
(bytesRead = read(fd, (void *) buf, bytesLeft)) > 0)
538
bytesLeft -= bytesRead;
546
void Sync(ArgStruct *p)
551
if (write(p->commfd, s, strlen(s)) < 0 ||
552
readFully(p->commfd, response, strlen(s)) < 0)
554
perror("NetPIPE: error writing or reading synchronization string");
557
if (strncmp(s, response, strlen(s)))
559
fprintf(stderr, "NetPIPE: Synchronization string incorrect!\n");
564
void PrepareToReceive(ArgStruct *p)
566
VAPI_ret_t ret; /* Return code */
567
VAPI_rr_desc_t rr; /* Receive request */
568
VAPI_sg_lst_entry_t sg_entry; /* Scatter/Gather list - holds buff addr */
570
/* We don't need to post a receive if doing RDMA write with local polling */
572
if( p->prot.commtype == NP_COMM_RDMAWRITE &&
573
p->prot.comptype == NP_COMP_LOCALPOLL )
576
rr.opcode = VAPI_RECEIVE;
578
/* We only need signaled completions if using VAPI
579
* completion methods.
581
if( p->prot.comptype == NP_COMP_LOCALPOLL )
582
rr.comp_type = VAPI_UNSIGNALED;
584
rr.comp_type = VAPI_SIGNALED;
587
rr.sg_lst_p = &sg_entry;
589
sg_entry.lkey = r_mr_out.l_key;
590
sg_entry.len = p->bufflen;
591
sg_entry.addr = (VAPI_virt_addr_t)(MT_virt_addr_t)p->r_ptr;
593
ret = VAPI_post_rr(hca_hndl, qp_hndl, &rr);
595
fprintf(stderr, "Error posting recv request: %s\n", VAPI_strerror(ret));
599
LOGPRINTF("Posted recv request\n");
602
/* Set receive flag to zero and request event completion
603
* notification for this receive so the event handler will
604
* be triggered when the receive completes.
606
if( p->prot.comptype == NP_COMP_EVENT ) {
607
receive_complete = 0;
611
void SendData(ArgStruct *p)
613
VAPI_ret_t ret; /* Return code */
614
VAPI_sr_desc_t sr; /* Send request */
615
VAPI_sg_lst_entry_t sg_entry; /* Scatter/Gather list - holds buff addr */
617
/* Fill in send request struct */
619
if(p->prot.commtype == NP_COMM_SENDRECV) {
620
sr.opcode = VAPI_SEND;
621
LOGPRINTF("Doing regular send\n");
622
} else if(p->prot.commtype == NP_COMM_SENDRECV_WITH_IMM) {
623
sr.opcode = VAPI_SEND_WITH_IMM;
624
LOGPRINTF("Doing regular send with imm\n");
625
} else if(p->prot.commtype == NP_COMM_RDMAWRITE) {
626
sr.opcode = VAPI_RDMA_WRITE;
627
sr.remote_addr = (VAPI_virt_addr_t)(MT_virt_addr_t)(remote_address + (p->s_ptr - p->s_buff));
628
sr.r_key = remote_key;
629
LOGPRINTF("Doing RDMA write (raddr=%p)\n", sr.remote_addr);
630
} else if(p->prot.commtype == NP_COMM_RDMAWRITE_WITH_IMM) {
631
sr.opcode = VAPI_RDMA_WRITE_WITH_IMM;
632
sr.remote_addr = (VAPI_virt_addr_t)(MT_virt_addr_t)(remote_address + (p->s_ptr - p->s_buff));
633
sr.r_key = remote_key;
634
LOGPRINTF("Doing RDMA write with imm (raddr=%p)\n", sr.remote_addr);
636
fprintf(stderr, "Error, invalid communication type in SendData\n");
640
sr.comp_type = VAPI_UNSIGNALED;
641
sr.set_se = FALSE; /* This needed due to a bug in Mellanox HW rel a-0 */
644
sr.sg_lst_p = &sg_entry;
646
sg_entry.lkey = s_mr_out.l_key; /* Local memory region key */
647
sg_entry.len = p->bufflen;
648
sg_entry.addr = (VAPI_virt_addr_t)(MT_virt_addr_t)p->s_ptr;
650
ret = VAPI_post_sr(hca_hndl, qp_hndl, &sr);
652
fprintf(stderr, "Error posting send request: %s\n", VAPI_strerror(ret));
654
LOGPRINTF("Posted send request\n");
659
void RecvData(ArgStruct *p)
663
/* Busy wait for incoming data */
665
LOGPRINTF("Receiving at buffer address %p\n", p->r_ptr);
667
if( p->prot.comptype == NP_COMP_LOCALPOLL ) {
669
/* Poll for receive completion locally on the receive data */
671
LOGPRINTF("Waiting for last byte of data to arrive\n");
673
while(p->r_ptr[p->bufflen-1] != 'a' + (p->cache ? 1 - p->tr : 1) )
675
/* BUSY WAIT -- this should be fine since we
676
* declared r_ptr with volatile qualifier */
679
/* Reset last byte */
680
p->r_ptr[p->bufflen-1] = 'a' + (p->cache ? p->tr : 0);
682
LOGPRINTF("Received all of data\n");
684
} else if( p->prot.comptype == NP_COMP_VAPIPOLL ) {
686
/* Poll for receive completion using VAPI poll function */
688
LOGPRINTF("Polling completion queue for VAPI work completion\n");
691
while(ret == VAPI_CQ_EMPTY)
692
ret = VAPI_poll_cq(hca_hndl, r_cq_hndl, &wc);
695
fprintf(stderr, "Error in RecvData, polling for completion: %s\n",
700
if(wc.status != VAPI_SUCCESS) {
701
fprintf(stderr, "Error in status of returned completion: %s\n",
702
VAPI_wc_status_sym(wc.status));
706
LOGPRINTF("Retrieved successful completion\n");
708
} else if( p->prot.comptype == NP_COMP_EVENT ) {
710
/* Instead of polling directly on data or VAPI completion queue,
711
* let the VAPI event completion handler set a flag when the receive
712
* completes, and poll on that instead. Could try using semaphore here
713
* as well to eliminate busy polling
716
LOGPRINTF("Polling receive flag\n");
718
while( receive_complete == 0 )
723
/* If in prepost-burst mode, we won't be calling PrepareToReceive
724
* between ping-pongs, so we need to reset the receive_complete
727
if( p->preburst ) receive_complete = 0;
729
LOGPRINTF("Receive completed\n");
733
/* Reset is used after a trial to empty the work request queues so we
734
have enough room for the next trial to run */
735
void Reset(ArgStruct *p)
738
VAPI_ret_t ret; /* Return code */
739
VAPI_sr_desc_t sr; /* Send request */
740
VAPI_rr_desc_t rr; /* Recv request */
742
/* If comptype is event, then we'll use event handler to detect receive,
743
* so initialize receive_complete flag
745
if(p->prot.comptype == NP_COMP_EVENT) receive_complete = 0;
747
/* Prepost receive */
748
rr.opcode = VAPI_RECEIVE;
749
rr.comp_type = VAPI_SIGNALED;
752
LOGPRINTF("Posting recv request in Reset\n");
753
ret = VAPI_post_rr(hca_hndl, qp_hndl, &rr);
755
fprintf(stderr, " Error posting recv request: %s\n", VAPI_strerror(ret));
760
/* Make sure both nodes have preposted receives */
764
sr.opcode = VAPI_SEND;
765
sr.comp_type = VAPI_SIGNALED;
766
sr.set_se = FALSE; /* This needed due to a bug in Mellanox HW rel a-0 */
769
LOGPRINTF("Posting send request \n");
770
ret = VAPI_post_sr(hca_hndl, qp_hndl, &sr);
772
fprintf(stderr, " Error posting send request in Reset: %s\n",
776
if(wc.status != VAPI_SUCCESS) {
777
fprintf(stderr, " Error in completion status: %s\n",
778
VAPI_wc_status_sym(wc.status));
782
LOGPRINTF("Polling for completion of send request\n");
784
while(ret == VAPI_CQ_EMPTY)
785
ret = VAPI_poll_cq(hca_hndl, s_cq_hndl, &wc);
788
fprintf(stderr, "Error polling CQ for send in Reset: %s\n",
792
if(wc.status != VAPI_SUCCESS) {
793
fprintf(stderr, " Error in completion status: %s\n",
794
VAPI_wc_status_sym(wc.status));
798
LOGPRINTF("Status of send completion: %s\n", VAPI_wc_status_sym(wc.status));
800
if(p->prot.comptype == NP_COMP_EVENT) {
801
/* If using event completion, the event handler will set receive_complete
802
* when it gets the completion event.
804
LOGPRINTF("Waiting for receive_complete flag\n");
805
while(receive_complete == 0) { /* BUSY WAIT */ }
807
LOGPRINTF("Polling for completion of recv request\n");
809
while(ret == VAPI_CQ_EMPTY)
810
ret = VAPI_poll_cq(hca_hndl, r_cq_hndl, &wc);
813
fprintf(stderr, "Error polling CQ for recv in Reset: %s\n",
817
if(wc.status != VAPI_SUCCESS) {
818
fprintf(stderr, " Error in completion status: %s\n",
819
VAPI_wc_status_sym(wc.status));
823
LOGPRINTF("Status of recv completion: %s\n", VAPI_wc_status_sym(wc.status));
825
LOGPRINTF("Done with reset\n");
828
void SendTime(ArgStruct *p, double *t)
830
uint32_t ltime, ntime;
833
Multiply the number of seconds by 1e6 to get time in microseconds
834
and convert value to an unsigned 32-bit integer.
836
ltime = (uint32_t)(*t * 1.e6);
838
/* Send time in network order */
839
ntime = htonl(ltime);
840
if (write(p->commfd, (char *)&ntime, sizeof(uint32_t)) < 0)
842
printf("NetPIPE: write failed in SendTime: errno=%d\n", errno);
847
void RecvTime(ArgStruct *p, double *t)
849
uint32_t ltime, ntime;
852
bytesRead = readFully(p->commfd, (void *)&ntime, sizeof(uint32_t));
855
printf("NetPIPE: read failed in RecvTime: errno=%d\n", errno);
858
else if (bytesRead != sizeof(uint32_t))
860
fprintf(stderr, "NetPIPE: partial read in RecvTime of %d bytes\n",
864
ltime = ntohl(ntime);
866
/* Result is ltime (in microseconds) divided by 1.0e6 to get seconds */
867
*t = (double)ltime / 1.0e6;
870
void SendRepeat(ArgStruct *p, int rpt)
875
/* Send repeat count as a long in network order */
877
if (write(p->commfd, (void *) &nrpt, sizeof(uint32_t)) < 0)
879
printf("NetPIPE: write failed in SendRepeat: errno=%d\n", errno);
884
void RecvRepeat(ArgStruct *p, int *rpt)
889
bytesRead = readFully(p->commfd, (void *)&nrpt, sizeof(uint32_t));
892
printf("NetPIPE: read failed in RecvRepeat: errno=%d\n", errno);
895
else if (bytesRead != sizeof(uint32_t))
897
fprintf(stderr, "NetPIPE: partial read in RecvRepeat of %d bytes\n",
906
void establish(ArgStruct *p)
912
clen = sizeof(p->prot.sin2);
914
if(connect(p->commfd, (struct sockaddr *) &(p->prot.sin1),
915
sizeof(p->prot.sin1)) < 0){
916
printf("Client: Cannot Connect! errno=%d\n",errno);
922
listen(p->servicefd, 5);
923
p->commfd = accept(p->servicefd, (struct sockaddr *) &(p->prot.sin2),
927
printf("Server: Accept Failed! errno=%d\n",errno);
933
void CleanUp(ArgStruct *p)
938
write(p->commfd,quit, 5);
939
read(p->commfd, quit, 5);
944
read(p->commfd,quit, 5);
945
write(p->commfd,quit,5);
954
void AfterAlignmentInit(ArgStruct *p)
958
/* Exchange buffer pointers and remote infiniband keys if doing rdma. Do
959
* the exchange in this function because this will happen after any
960
* memory alignment is done, which is important for getting the
961
* correct remote address.
963
if( p->prot.commtype == NP_COMM_RDMAWRITE ||
964
p->prot.commtype == NP_COMM_RDMAWRITE_WITH_IMM ) {
966
/* Send my receive buffer address
968
if(write(p->commfd, (void *)&p->r_buff, sizeof(void*)) < 0) {
969
perror("NetPIPE: write of buffer address failed in AfterAlignmentInit");
973
LOGPRINTF("Sent buffer address: %p\n", p->r_buff);
975
/* Send my remote key for accessing
976
* my remote buffer via IB RDMA
978
if(write(p->commfd, (void *)&r_mr_out.r_key, sizeof(VAPI_rkey_t)) < 0) {
979
perror("NetPIPE: write of remote key failed in AfterAlignmentInit");
983
LOGPRINTF("Sent remote key: %d\n", r_mr_out.r_key);
985
/* Read the sent data
987
bytesRead = readFully(p->commfd, (void *)&remote_address, sizeof(void*));
989
perror("NetPIPE: read of buffer address failed in AfterAlignmentInit");
991
} else if (bytesRead != sizeof(void*)) {
992
perror("NetPIPE: partial read of buffer address in AfterAlignmentInit");
996
LOGPRINTF("Received remote address from other node: %p\n", remote_address);
998
bytesRead = readFully(p->commfd, (void *)&remote_key, sizeof(VAPI_rkey_t));
1000
perror("NetPIPE: read of remote key failed in AfterAlignmentInit");
1002
} else if (bytesRead != sizeof(VAPI_rkey_t)) {
1003
perror("NetPIPE: partial read of remote key in AfterAlignmentInit");
1007
LOGPRINTF("Received remote key from other node: %d\n", remote_key);
1013
void MyMalloc(ArgStruct *p, int bufflen, int soffset, int roffset)
1017
/* Allocate buffers */
1019
p->r_buff = malloc(bufflen+MAX(soffset,roffset));
1020
if(p->r_buff == NULL) {
1021
fprintf(stderr, "Error malloc'ing buffer\n");
1027
/* Infiniband spec says we can register same memory region
1028
* more than once, so just copy buffer address. We will register
1029
* the same buffer twice with Infiniband.
1031
p->s_buff = p->r_buff;
1035
p->s_buff = malloc(bufflen+soffset);
1036
if(p->s_buff == NULL) {
1037
fprintf(stderr, "Error malloc'ing buffer\n");
1043
/* Register buffers with Infiniband */
1045
mr_in.acl = VAPI_EN_LOCAL_WRITE | VAPI_EN_REMOTE_WRITE;
1047
mr_in.pd_hndl = pd_hndl;
1049
mr_in.size = bufflen+MAX(soffset,roffset);
1050
mr_in.start = (VAPI_virt_addr_t)(MT_virt_addr_t)p->r_buff;
1051
mr_in.type = VAPI_MR;
1053
ret = VAPI_register_mr(hca_hndl, &mr_in, &r_mr_hndl, &r_mr_out);
1056
fprintf(stderr, "Error registering recv buffer: %s\n", VAPI_strerror(ret));
1061
LOGPRINTF("Registered Recv Buffer\n");
1064
mr_in.acl = VAPI_EN_LOCAL_WRITE;
1066
mr_in.pd_hndl = pd_hndl;
1068
mr_in.size = bufflen+soffset;
1069
mr_in.start = (VAPI_virt_addr_t)(MT_virt_addr_t)p->s_buff;
1070
mr_in.type = VAPI_MR;
1072
ret = VAPI_register_mr(hca_hndl, &mr_in, &s_mr_hndl, &s_mr_out);
1073
if(ret != VAPI_OK) {
1074
fprintf(stderr, "Error registering send buffer: %s\n", VAPI_strerror(ret));
1077
LOGPRINTF("Registered Send Buffer\n");
1081
void FreeBuff(char *buff1, char *buff2)
1085
if(s_mr_hndl != VAPI_INVAL_HNDL) {
1086
LOGPRINTF("Deregistering send buffer\n");
1087
ret = VAPI_deregister_mr(hca_hndl, s_mr_hndl);
1088
if(ret != VAPI_OK) {
1089
fprintf(stderr, "Error deregistering send mr: %s\n", VAPI_strerror(ret));
1091
s_mr_hndl = VAPI_INVAL_HNDL;
1095
if(r_mr_hndl != VAPI_INVAL_HNDL) {
1096
LOGPRINTF("Deregistering recv buffer\n");
1097
ret = VAPI_deregister_mr(hca_hndl, r_mr_hndl);
1098
if(ret != VAPI_OK) {
1099
fprintf(stderr, "Error deregistering recv mr: %s\n", VAPI_strerror(ret));
1101
r_mr_hndl = VAPI_INVAL_HNDL;