~ubuntu-branches/ubuntu/saucy/nwchem/saucy

« back to all changes in this revision

Viewing changes to src/tools/ga-4-3/armci/src/openib.c

  • Committer: Package Import Robot
  • Author(s): Michael Banck, Michael Banck, Daniel Leidert
  • Date: 2012-02-09 20:02:41 UTC
  • mfrom: (1.1.1)
  • Revision ID: package-import@ubuntu.com-20120209200241-jgk03qfsphal4ug2
Tags: 6.1-1
* New upstream release.

[ Michael Banck ]
* debian/patches/02_makefile_flags.patch: Updated.
* debian/patches/02_makefile_flags.patch: Use internal blas and lapack code.
* debian/patches/02_makefile_flags.patch: Define GCC4 for LINUX and LINUX64
  (Closes: #632611 and LP: #791308).
* debian/control (Build-Depends): Added openssh-client.
* debian/rules (USE_SCALAPACK, SCALAPACK): Removed variables (Closes:
  #654658).
* debian/rules (LIBDIR, USE_MPIF4, ARMCI_NETWORK): New variables.
* debian/TODO: New file.
* debian/control (Build-Depends): Removed libblas-dev, liblapack-dev and
  libscalapack-mpi-dev.
* debian/patches/04_show_testsuite_diff_output.patch: New patch, shows the
  diff output for failed tests.
* debian/patches/series: Adjusted.
* debian/testsuite: Optionally run all tests if "all" is passed as option.
* debian/rules: Run debian/testsuite with "all" if DEB_BUILD_OPTIONS
  contains "checkall".

[ Daniel Leidert ]
* debian/control: Used wrap-and-sort. Added Vcs-Svn and Vcs-Browser fields.
  (Priority): Moved to extra according to policy section 2.5.
  (Standards-Version): Bumped to 3.9.2.
  (Description): Fixed a typo.
* debian/watch: Added.
* debian/patches/03_hurd-i386_define_path_max.patch: Added.
  - Define MAX_PATH if not defines to fix FTBFS on hurd.
* debian/patches/series: Adjusted.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* $Id: openib.c,v 1.4.2.9 2007-10-18 06:08:03 d3h325 Exp $
 
2
 *
 
3
 * File organized as follows
 
4
 */
 
5
#define _GNU_SOURCE
 
6
#include <stdio.h>
 
7
#include <strings.h>
 
8
#include <assert.h>
 
9
#include <unistd.h>
 
10
#include <mpi.h>
 
11
#include <string.h>
 
12
 
 
13
#include "armcip.h"
 
14
#include "copy.h"
 
15
#include "request.h"
 
16
#include "armci-vapi.h"
 
17
#include "iterator.h"
 
18
#define DEBUG_INIT 0
 
19
#define DEBUG_FINALIZE 0
 
20
#define DEBUG_SERVER 0
 
21
#define DEBUG_CLN 0
 
22
#define TIME_INIT 0
 
23
#  define VAPIDEV_NAME "InfiniHost0"
 
24
#  define INVAL_HNDL 0xFFFFFFFF
 
25
#define RNR_TIMER 12
 
26
 
 
27
/*Debug macros used to tune what is being tested -- mostly openib calls*/
 
28
#define DBG_INIT  1
 
29
#define DBG_POLL  1
 
30
#define DBG_ALL   1
 
31
 
 
32
u_int32_t armci_max_num_sg_ent;
 
33
u_int32_t armci_max_qp_ous_swr;
 
34
u_int32_t armci_max_qp_ous_rwr;
 
35
 
 
36
typedef struct {
 
37
   struct ibv_qp *qp;
 
38
   uint32_t sqpnum;                /*we need to exchng qp nums,arr for that*/
 
39
#if 0
 
40
   uint32_t *rqpnum;               /*we need rqp nums,arr for that*/
 
41
#endif
 
42
   uint16_t lid;
 
43
} armci_connect_t;
 
44
armci_connect_t *CLN_con, *SRV_con;
 
45
static uint32_t *SRV_rqpnums, *CLN_rqpnums; /*relevant rqp num arrs, to connect to svr and client*/
 
46
static uint32_t *CLN_rqpnumtmpbuf=NULL; /*temporary buf used during connection setup*/
 
47
/*\
 
48
 * datastrucure for infinihost NIC
 
49
\*/
 
50
typedef struct {
 
51
  uint16_t *lid_arr;                /*we need to exchange lids, arr for that*/
 
52
  struct ibv_context *handle;       /*device context/handle*/
 
53
  int maxtransfersize;
 
54
  struct ibv_device_attr attr;      /*device properties*/
 
55
  struct ibv_port_attr hca_port;    /*mostly for getting lid*/
 
56
  uint8_t active_port;
 
57
  struct ibv_pd *ptag;              /*protection tag*/
 
58
  const char *vendor;
 
59
  struct ibv_cq *scq;               /*send completion queue*/
 
60
  struct ibv_cq *rcq;               /*recv completion queue*/
 
61
  struct ibv_comp_channel *sch;     /*send completion channel*/
 
62
  struct ibv_comp_channel *rch;     /*recv completion channel*/
 
63
  void *scq_cntx;                   /*send context for completion queue*/
 
64
  void *rcq_cntx;                   /*recv context for completion queue*/
 
65
  int scv;                          /*send completion vector*/
 
66
  int rcv;                          /*recv completion vector*/
 
67
} vapi_nic_t;
 
68
 
 
69
typedef struct {
 
70
  armci_vapi_memhndl_t *prem_handle; /*address server to store memory handle*/
 
71
  armci_vapi_memhndl_t handle;
 
72
}ack_t;
 
73
 
 
74
armci_vapi_memhndl_t *CLN_handle;
 
75
armci_vapi_memhndl_t serv_memhandle, client_memhandle;
 
76
armci_vapi_memhndl_t *handle_array;
 
77
armci_vapi_memhndl_t *pinned_handle;
 
78
 
 
79
static vapi_nic_t nic_arr[3];
 
80
static vapi_nic_t *SRV_nic= nic_arr;
 
81
static vapi_nic_t *CLN_nic= nic_arr+1;
 
82
static int armci_server_terminating;
 
83
 
 
84
#define NONE -1
 
85
static int armci_ack_proc=NONE;
 
86
 
 
87
static int armci_vapi_server_ready;
 
88
static int armci_vapi_server_stage1=0;
 
89
static int armci_vapi_client_stage1=0;
 
90
static int armci_vapi_server_stage2=0;
 
91
static int armci_vapi_client_ready;
 
92
int _s=-1,_c=-1;
 
93
static int server_can_poll=0;
 
94
static int armci_vapi_max_inline_size=-1;
 
95
#define CLIENT_STAMP 101
 
96
#define SERV_STAMP 99
 
97
 
 
98
static char * client_tail;
 
99
static char * serv_tail;
 
100
static ack_t *SRV_ack;
 
101
 
 
102
#if defined(PEND_BUFS)
 
103
typedef immbuf_t vapibuf_t;
 
104
typedef pendbuf_t vapibuf_pend_t;
 
105
#else
 
106
typedef struct {
 
107
    struct ibv_recv_wr  dscr;
 
108
    struct ibv_sge      sg_entry;
 
109
    char buf[VBUF_DLEN];
 
110
} vapibuf_t;
 
111
#endif
 
112
 
 
113
typedef struct {
 
114
    struct ibv_send_wr  snd_dscr;
 
115
    struct ibv_sge      ssg_entry;
 
116
    struct ibv_recv_wr  rcv_dscr;
 
117
    struct ibv_sge      rsg_entry;
 
118
  char buf[VBUF_DLEN];
 
119
} vapibuf_ext_t;
 
120
 
 
121
typedef struct {
 
122
    struct ibv_send_wr  rmw_dscr;
 
123
    struct ibv_sge      rmw_entry;
 
124
} vapirmw_t;
 
125
 
 
126
 
 
127
static vapibuf_t **serv_buf_arr;
 
128
#if !defined(PEND_BUFS)
 
129
/*These are typically used as spare buffers for communication. Since
 
130
  we do not wait on completion anymore, we need to ensure things work
 
131
  fine when these have in-flight messages. Disabled for now.*/
 
132
static vapibuf_t *spare_serv_buf, *spare_serv_bufptr;
 
133
static vapibuf_ext_t *serv_buf;
 
134
#endif
 
135
 
 
136
static vapirmw_t rmw[64];
 
137
 
 
138
static int *flag_arr; /* flag indicates its receiving scatter data */
 
139
#define SERV 2
 
140
#define CLN 1
 
141
 
 
142
#define MAX_DESCR 2
 
143
typedef struct {
 
144
    int avail;
 
145
    struct ibv_qp *qp;
 
146
    struct ibv_recv_wr *descr;
 
147
} descr_pool_t;
 
148
 
 
149
static int* _gtmparr;
 
150
static void* test_ptr;
 
151
static int test_stride_arr[1];
 
152
static int test_count[2];
 
153
static int test_stride_levels;
 
154
char *MessageRcvBuffer;
 
155
 
 
156
extern void armci_util_wait_int(volatile int *,int,int);
 
157
void armci_send_data_to_client(int proc, void *buf,int bytes,void *dbuf);
 
158
void armci_server_register_region(void *,long,ARMCI_MEMHDL_T *);
 
159
static descr_pool_t serv_descr_pool = {MAX_DESCR,NULL,NULL};
 
160
static descr_pool_t client_descr_pool = {MAX_DESCR,NULL,NULL};
 
161
 
 
162
/**Buffer (long[1] used to set msginfo->tag.ack_ptr in
 
163
   client-side. See usage in SERVER_SEND_ACK macro*/
 
164
static long *ack_buf;
 
165
 
 
166
#define GET_DATA_PTR(buf) (sizeof(request_header_t) + (char*)buf)
 
167
 
 
168
#define BUF_TO_SDESCR(buf) ((struct ibv_send_wr *)(&((armci_vapi_field_t *)((char *)(buf) - sizeof(armci_vapi_field_t)))->sdscr))
 
169
 
 
170
#define BUF_TO_RDESCR(buf) ((struct ibv_recv_wr *)(&((armci_vapi_field_t *)((char *)(buf) - sizeof(armci_vapi_field_t)))->rdscr))
 
171
 
 
172
#define BUF_TO_SSGLST(buf) ((struct ibv_sge *)(&((armci_vapi_field_t *)((char *)(buf) - sizeof(armci_vapi_field_t)))->ssg_entry))
 
173
 
 
174
#define BUF_TO_RSGLST(buf) ((struct ibv_sge *)(&((armci_vapi_field_t *)((char *)(buf) - sizeof(armci_vapi_field_t)))->rsg_entry))
 
175
 
 
176
#define BUF_TO_EVBUF(buf) (vapibuf_ext_t*)(((char*)buf) - (sizeof(struct ibv_send_wr)+sizeof(struct ibv_recv_wr)+2*sizeof(struct ibv_sge)))
 
177
 
 
178
#define SERVER_SEND_ACK(p) do {            \
 
179
    assert(*ack_buf == ARMCI_STAMP);       \
 
180
    assert((p)>=0);                        \
 
181
    armci_send_data_to_client((p),ack_buf, \
 
182
      sizeof(long),msginfo->tag.ack_ptr);  \
 
183
  } while(0)
 
184
/* #define SERVER_SEND_ACK(p) {assert(serv_buf!=NULL);assert(msginfo->from==(p));*((long *)serv_buf->buf)=ARMCI_STAMP;armci_send_data_to_client((p),serv_buf->buf,sizeof(long),msginfo->tag.ack_ptr);} */
 
185
 
 
186
#define SERVER_SEND_DATA(_SS_proc,_SS_src,_SS_dst,_SS_size) {armci_send_data_to_client(_SS_proc,_SS_src,_SS_size,_SS_dst);}
 
187
#define SERVER_GET_DATA(_SG_proc,_SG_src,_SG_dst,_SG_size) {armci_get_data_from_client(_SG_proc,_SG_src,_SG_size,_SG_dst);}
 
188
 
 
189
 
 
190
/*\ descriptors will have unique ID's for the wait on descriptor routine to
 
191
 * complete a descriptor and know where it came from
 
192
\*/
 
193
 
 
194
#define NUMOFBUFFERS (MAX_BUFS+MAX_SMALL_BUFS)
 
195
#define DSCRID_FROMBUFS 1
 
196
#define DSCRID_FROMBUFS_END (DSCRID_FROMBUFS+NUMOFBUFFERS)
 
197
 
 
198
#define DSCRID_NBDSCR 10000
 
199
#define DSCRID_NBDSCR_END (10000+MAX_PENDING)
 
200
 
 
201
#define DSCRID_SCATGAT 20000
 
202
#define DSCRID_SCATGAT_END 20000+MAX_PENDING
 
203
 
 
204
#define DSCRID_RMW 30000
 
205
#define DSCRID_RMW_END 30000+9999
 
206
 
 
207
#if defined(PEND_BUFS)
 
208
#define DSCRID_PENDBUF (40000)
 
209
#define DSCRID_PENDBUF_END (DSCRID_PENDBUF + 2*PENDING_BUF_NUM+1)
 
210
 
 
211
#define DSCRID_IMMBUF_RECV     (200000)
 
212
#define DSCRID_IMMBUF_RECV_END (600000)
 
213
 
 
214
#define DSCRID_IMMBUF_RESP     (600000)
 
215
#define DSCRID_IMMBUF_RESP_END (1000000)
 
216
#endif
 
217
 
 
218
extern double MPI_Wtime();
 
219
static double inittime0=0,inittime1=0,inittime2=0,inittime3=0,inittime4=0;
 
220
 
 
221
static int mark_buf_send_complete[NUMOFBUFFERS+1];
 
222
static sr_descr_t armci_vapi_client_nbsdscr_array[MAX_PENDING];
 
223
static sr_descr_t armci_vapi_client_nbrdscr_array[MAX_PENDING];
 
224
static sr_descr_t armci_vapi_serv_nbsdscr_array[MAX_PENDING];
 
225
static sr_descr_t armci_vapi_serv_nbrdscr_array[MAX_PENDING];
 
226
 
 
227
void armci_server_transport_cleanup();
 
228
/********************FUNCTIONS TO CHECK OPENIB RETURN STATUS*******************/
 
229
void armci_check_status(int debug, int rc,char *msg)
 
230
{
 
231
  dassertp(debug,rc==0,("%d: %s, rc=%d\n",armci_me,msg,rc));
 
232
/*     if(debug)printf("%d:%s, rc = %d\n", armci_me,msg, rc); */
 
233
/*     if(rc!=0)armci_die(msg,rc); */
 
234
}
 
235
 
 
236
void armci_vapi_check_return(int debug, int ret, const char *ss)
 
237
{
 
238
#if 0
 
239
    if(ret!=VAPI_OK){
 
240
       printf("\n%d:from %s ret=%d str=%s str_sym=%s\n",armci_me,ss,ret,
 
241
                       VAPI_strerror(ret),VAPI_strerror_sym(ret));
 
242
       fflush(stdout);
 
243
    }
 
244
    if(debug){
 
245
       printf("\n%d:from %s ret=%d str=%s str_sym=%s\n",armci_me,ss,ret,
 
246
                       VAPI_strerror(ret),VAPI_strerror_sym(ret));
 
247
    }
 
248
#endif
 
249
}
 
250
 
 
251
void armci_vapi_print_dscr_info(struct ibv_send_wr *sr, struct ibv_recv_wr *rr)
 
252
{
 
253
int i;
 
254
    if(rr){
 
255
       printf("\n%d:print_dscr rr id=%ld sg_lst_len=%d",
 
256
              armci_me, rr->wr_id, rr->num_sge);
 
257
       for (i = 0; i < rr->num_sge; i++) {
 
258
         printf("\n\t:sg_entry=%d addr=%p len=%d",
 
259
                i, rr->sg_list[i].addr, rr->sg_list[i].length);
 
260
       }
 
261
       fflush(stdout);
 
262
    }
 
263
    if(sr){
 
264
       printf("\n%d:print_dscr sr id=%d opcode=%d sg_lst_len=%d",
 
265
              armci_me, sr->wr_id, sr->opcode, sr->num_sge);
 
266
       for (i = 0; i < sr->num_sge; i++) {
 
267
         printf("\n\t:sg_entry=%d addr=%p len=%d",
 
268
                i, sr->sg_list[i].addr, sr->sg_list[i].length);
 
269
       }
 
270
       fflush(stdout);
 
271
    }
 
272
}
 
273
 
 
274
/*****************END FUNCTIONS TO CHECK VAPI RETURN STATUS********************/
 
275
 
 
276
void armci_recv_complete(struct ibv_recv_wr *rcv_dscr, char *from, int numofrecvs) /*needs work*/
 
277
{
 
278
int rc=0;
 
279
struct ibv_wc pdscr1;
 
280
struct ibv_wc *pdscr = &pdscr1;
 
281
sr_descr_t *rdscr_arr;
 
282
vapi_nic_t *nic;
 
283
int debug,i,done=0;
 
284
 
 
285
    if(SERVER_CONTEXT){
 
286
       rdscr_arr = armci_vapi_serv_nbrdscr_array;
 
287
       nic=CLN_nic;
 
288
       debug = DEBUG_SERVER;
 
289
    }
 
290
    else{
 
291
       rdscr_arr = armci_vapi_client_nbrdscr_array;
 
292
       nic=SRV_nic;
 
293
       debug = DEBUG_CLN;
 
294
    }
 
295
    if(debug){
 
296
       printf("\n%d%s:recv_complete called from %s id=%ld\n",armci_me,
 
297
               ((SERVER_CONTEXT)?"(s)":" "),from,rcv_dscr->wr_id);fflush(stdout);
 
298
    }
 
299
    for(i=0;i<numofrecvs;i++){
 
300
    do{
 
301
      while(rc == 0) {
 
302
         rc = ibv_poll_cq(nic->rcq, 1, pdscr);
 
303
      }
 
304
      dassertp(DBG_POLL|DBG_ALL,rc>=0,
 
305
               ("%d: rc=%d id=%d status=%d (%d/%d)\n",
 
306
                armci_me,rc,pdscr->wr_id,pdscr->status,i,numofrecvs));
 
307
      dassert1(1,pdscr->status==IBV_WC_SUCCESS,pdscr->status);
 
308
       if(debug){
 
309
         if(pdscr->wr_id >= DSCRID_SCATGAT && pdscr->wr_id < DSCRID_SCATGAT_END)
 
310
           printf("\n%d:recv from %s complete id=%d num=%d",armci_me,
 
311
             from,pdscr->wr_id,rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofrecvs);
 
312
       }
 
313
       if(pdscr->wr_id >= DSCRID_SCATGAT && pdscr->wr_id < DSCRID_SCATGAT_END){
 
314
         rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofrecvs--;
 
315
         if(rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofrecvs==0)
 
316
           rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].tag=0;
 
317
       }
 
318
       else if(pdscr->wr_id == (DSCRID_SCATGAT + MAX_PENDING)){
 
319
               /*this was from a blocking call, do nothing*/
 
320
         continue;
 
321
       }
 
322
       else {
 
323
         armci_die("\nclient should be posting only one kind of recv",armci_me);
 
324
       }
 
325
       rc = 0;
 
326
   }while(pdscr->wr_id!=rcv_dscr->wr_id);
 
327
   rc = 0;
 
328
   }
 
329
 
 
330
}
 
331
 
 
332
void armci_vapi_set_mark_buf_send_complete(int id)
 
333
{
 
334
    mark_buf_send_complete[id]=0;
 
335
}
 
336
 
 
337
void armci_send_complete(struct ibv_send_wr *snd_dscr, char *from,int numoftimes)
 
338
{
 
339
int rc=0;
 
340
struct ibv_wc pdscr1;
 
341
struct ibv_wc *pdscr = &pdscr1;
 
342
sr_descr_t *sdscr_arr;
 
343
vapi_nic_t *nic;
 
344
int debug,i;
 
345
 
 
346
 pdscr1.status = IBV_WC_SUCCESS;
 
347
/*  bzero(&pdscr1, sizeof(pdscr1)); */
 
348
/* printf("%d: Waiting for send with wr_id=%d to complete\n", armci_me, snd_dscr->wr_id); */
 
349
/* fflush(stdout); */
 
350
 
 
351
    if(SERVER_CONTEXT){
 
352
       sdscr_arr = armci_vapi_serv_nbsdscr_array;
 
353
       nic=CLN_nic;
 
354
       debug = DEBUG_SERVER;
 
355
    }
 
356
    else{
 
357
       sdscr_arr = armci_vapi_client_nbsdscr_array;
 
358
       nic=SRV_nic;
 
359
       debug = DEBUG_CLN;
 
360
    }
 
361
 
 
362
    if(debug) {
 
363
       printf("\n%d%s:send_complete called from %s id=%ld nt=%d\n",armci_me,
 
364
               ((SERVER_CONTEXT)?"(s)":" "),from,snd_dscr->wr_id,numoftimes);
 
365
       fflush(stdout);
 
366
    }
 
367
    for(i=0;i<numoftimes;i++){
 
368
    do{
 
369
       while(rc == 0){  
 
370
#if defined(PEND_BUFS) 
 
371
         if(SERVER_CONTEXT)
 
372
           rc = ibv_poll_cq(nic->rcq,1,pdscr);
 
373
         else
 
374
#endif
 
375
           rc = ibv_poll_cq(nic->scq,1, pdscr);
 
376
       }  
 
377
       dassertp(DBG_POLL|DBG_ALL,rc>=0,
 
378
                ("%d:rc=%d status=%d id=%d (%d/%d)",armci_me,
 
379
                 rc,pdscr->status,(int)pdscr->wr_id,i,numoftimes));
 
380
       dassert1(1,pdscr->status==IBV_WC_SUCCESS,pdscr->status);
 
381
/*       printf("%d: Obtained completion of wr_id=%d\n", armci_me, pdscr->wr_id); */
 
382
/*       fflush(stdout); */
 
383
       if(SERVER_CONTEXT){
 
384
         if(debug)printf("%d:completed id %d i=%d\n",armci_me,pdscr->wr_id,i);
 
385
         if(pdscr->wr_id >=DSCRID_SCATGAT && pdscr->wr_id < DSCRID_SCATGAT_END){
 
386
           sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends--;
 
387
           if(sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends==0)
 
388
             sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].tag=0;
 
389
         }
 
390
         else if(pdscr->wr_id >=armci_nproc && pdscr->wr_id < 2*armci_nproc){
 
391
                 /*its coming from send_data_to_client just return*/
 
392
         }
 
393
#if defined(PEND_BUFS)
 
394
         else if(pdscr->wr_id >= DSCRID_IMMBUF_RESP && pdscr->wr_id>DSCRID_IMMBUF_RESP_END) {
 
395
           /*send from server to client completed*/
 
396
         }
 
397
#endif
 
398
         else armci_die("server send complete got weird id",pdscr->wr_id);
 
399
       }
 
400
       else{
 
401
         if(debug)printf("%d:completed id %d i=%d\n",armci_me,pdscr->wr_id,i);
 
402
         if(pdscr->wr_id >=DSCRID_FROMBUFS && pdscr->wr_id < DSCRID_FROMBUFS_END) {
 
403
/*         printf("%d: marking send buffer %d as complete\n", armci_me, pdscr->wr_id);*/
 
404
           mark_buf_send_complete[pdscr->wr_id]=1;
 
405
         }
 
406
         else if(pdscr->wr_id >=DSCRID_NBDSCR && pdscr->wr_id < DSCRID_NBDSCR_END){
 
407
           sdscr_arr[pdscr->wr_id-DSCRID_NBDSCR].numofsends--;
 
408
           if(sdscr_arr[pdscr->wr_id-DSCRID_NBDSCR].numofsends==0)
 
409
             sdscr_arr[pdscr->wr_id-DSCRID_NBDSCR].tag=0;
 
410
         }
 
411
         else if(pdscr->wr_id >=DSCRID_SCATGAT && pdscr->wr_id < DSCRID_SCATGAT_END){
 
412
           sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends--;
 
413
           if(sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends==0)
 
414
             sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].tag=0;
 
415
         }
 
416
         else if(pdscr->wr_id == (DSCRID_SCATGAT + MAX_PENDING)){
 
417
/*         printf("%d: completed a blocking scatgat descriptor\n", armci_me); */
 
418
           /*this was from a blocking call, do nothing*/
 
419
           continue;
 
420
         }
 
421
         else armci_die("client send complete got weird id",pdscr->wr_id);
 
422
       }
 
423
       rc = 0;
 
424
    }while(pdscr->wr_id!=snd_dscr->wr_id);
 
425
       rc = 0;
 
426
    }
 
427
}
 
428
 
 
429
 
 
430
void armci_dscrlist_recv_complete(int tag, char* from,sr_descr_t *dscr)
 
431
{
 
432
int i,nr,j;
 
433
sr_descr_t *retdscr,*rdscr_arr;
 
434
    if(dscr==NULL){
 
435
       if(SERVER_CONTEXT)
 
436
         rdscr_arr = armci_vapi_serv_nbrdscr_array;
 
437
       else
 
438
         rdscr_arr = armci_vapi_client_nbrdscr_array;
 
439
 
 
440
       for(i=0;i<MAX_PENDING;i++){
 
441
         if(rdscr_arr[i].tag==tag)
 
442
           break;
 
443
       }
 
444
 
 
445
       if(i==MAX_PENDING)return;
 
446
       retdscr = &rdscr_arr[i];
 
447
    }
 
448
    else
 
449
       retdscr=dscr;
 
450
 
 
451
    nr = retdscr->numofrecvs;
 
452
    armci_recv_complete(&(retdscr->rdescr),"(s)list_send_complete",nr);
 
453
}
 
454
 
 
455
 
 
456
void armci_dscrlist_send_complete(int tag,char *from, sr_descr_t *dscr)
 
457
{
 
458
int i,ns,j;
 
459
sr_descr_t *retdscr,*sdscr_arr;
 
460
    if(dscr==NULL){
 
461
       if(SERVER_CONTEXT)
 
462
         sdscr_arr = armci_vapi_serv_nbsdscr_array;
 
463
       else
 
464
         sdscr_arr = armci_vapi_client_nbsdscr_array;
 
465
 
 
466
       for(i=0;i<MAX_PENDING;i++){
 
467
         if(sdscr_arr[i].tag==tag)
 
468
           break;
 
469
       }
 
470
       if(i==MAX_PENDING)return;
 
471
       retdscr=&sdscr_arr[i];
 
472
    }
 
473
    else
 
474
       retdscr=dscr;
 
475
 
 
476
    ns = retdscr->numofsends;
 
477
 
 
478
    armci_send_complete(&(retdscr->sdescr),"dscrlist_send_complete",ns);
 
479
 
 
480
}
 
481
 
 
482
void armci_client_nbcall_complete(sr_descr_t *dscr, int tag, int op)
 
483
{
 
484
    if(tag != dscr->tag)return;
 
485
 
 
486
        THREAD_LOCK(armci_user_threads.net_lock);
 
487
 
 
488
    if(op==GET){
 
489
       if(dscr->issg){
 
490
         if(dscr->numofrecvs>0)
 
491
           armci_dscrlist_recv_complete(tag,"armci_client_nbcall_complete recv",
 
492
                           dscr);
 
493
       }
 
494
       else{
 
495
         if(dscr->numofsends>0)
 
496
           armci_dscrlist_send_complete(tag,"armci_client_nbcall_complete send",
 
497
                           dscr);
 
498
       }
 
499
    }
 
500
    if(op==PUT){
 
501
       if(dscr->numofsends>0)
 
502
         armci_dscrlist_send_complete(tag,"armci_client_nbcall_complete send",
 
503
                         dscr);
 
504
    }
 
505
 
 
506
        THREAD_UNLOCK(armci_user_threads.net_lock);
 
507
}
 
508
 
 
509
 
 
510
static int cur_serv_pend_descr;
 
511
static int cur_client_pend_descr;
 
512
 
 
513
sr_descr_t *armci_vapi_get_next_rdescr(int nbtag,int sg)
 
514
{
 
515
static int serverthreadavail=-1; /*client thread can't touch this*/
 
516
static int clientthreadavail=-1; /*server thread can't touch this*/
 
517
int avail,newavail;
 
518
sr_descr_t *retdscr,*rdscr_arr;
 
519
 
 
520
    if(SERVER_CONTEXT){
 
521
       rdscr_arr = armci_vapi_serv_nbrdscr_array;
 
522
       avail = serverthreadavail;
 
523
       /*printf("\n%d:serv thread avail=%d",armci_me,serverthreadavail);*/
 
524
    }
 
525
    else{
 
526
       rdscr_arr = armci_vapi_client_nbrdscr_array;
 
527
       avail = clientthreadavail;
 
528
    }
 
529
    if(avail==-1){
 
530
       int i;
 
531
       for(i=0;i<MAX_PENDING;i++){
 
532
         rdscr_arr[i].tag=0;
 
533
         bzero(&rdscr_arr[i].rdescr,sizeof(struct ibv_recv_wr)); 
 
534
         if(sg)
 
535
           rdscr_arr[i].rdescr.wr_id = DSCRID_SCATGAT + i;
 
536
         else
 
537
           rdscr_arr[i].rdescr.wr_id = DSCRID_NBDSCR + i; 
 
538
       }
 
539
       avail=0;
 
540
    }
 
541
 
 
542
    if(rdscr_arr[avail].tag!=0){
 
543
       armci_dscrlist_recv_complete(rdscr_arr[avail].tag,
 
544
                         "armci_vapi_get_next_rdescr",&rdscr_arr[avail]);
 
545
    }
 
546
 
 
547
    rdscr_arr[avail].tag=nbtag;
 
548
    rdscr_arr[avail].issg=sg;
 
549
    retdscr= (rdscr_arr+avail);
 
550
 
 
551
    memset(&retdscr->rdescr,0,sizeof(struct ibv_recv_wr));
 
552
 
 
553
    if(sg)
 
554
       retdscr->rdescr.wr_id = DSCRID_SCATGAT + avail;
 
555
    else{
 
556
       retdscr->rdescr.wr_id = DSCRID_NBDSCR + avail; 
 
557
       retdscr->numofrecvs=1;
 
558
    }
 
559
 
 
560
    newavail = (avail+1)%MAX_PENDING;
 
561
 
 
562
    if(SERVER_CONTEXT){
 
563
      cur_serv_pend_descr = avail;
 
564
      serverthreadavail=newavail;
 
565
    }
 
566
    else{
 
567
      cur_client_pend_descr = avail;
 
568
      clientthreadavail=newavail;
 
569
    }
 
570
 
 
571
    return(retdscr);
 
572
 
 
573
}
 
574
 
 
575
sr_descr_t *armci_vapi_get_next_sdescr(int nbtag,int sg)
 
576
{
 
577
static int serverthreadavail=-1; /*client thread can't touch this*/
 
578
static int clientthreadavail=-1; /*server thread can't touch this*/
 
579
int avail,newavail;
 
580
sr_descr_t *retdscr,*sdscr_arr;
 
581
 
 
582
    if(SERVER_CONTEXT){
 
583
       sdscr_arr = armci_vapi_serv_nbsdscr_array;
 
584
       avail = serverthreadavail;
 
585
    }
 
586
    else{
 
587
       sdscr_arr = armci_vapi_client_nbsdscr_array;
 
588
       avail = clientthreadavail;
 
589
    }
 
590
 
 
591
    if(avail==-1){ /*first call*/
 
592
       int i;
 
593
       for(i=0;i<MAX_PENDING;i++){
 
594
         sdscr_arr[i].tag=0;
 
595
         bzero(&sdscr_arr[i].sdescr,sizeof(struct ibv_send_wr));
 
596
         if(sg)
 
597
           sdscr_arr[i].sdescr.wr_id = DSCRID_SCATGAT+i;
 
598
         else
 
599
           sdscr_arr[i].sdescr.wr_id = DSCRID_NBDSCR + i;
 
600
       }
 
601
       avail=0;
 
602
    }
 
603
 
 
604
    if(sdscr_arr[avail].tag!=0){
 
605
       armci_dscrlist_send_complete(sdscr_arr[avail].tag,
 
606
                       "armci_vapi_get_next_sdescr",&sdscr_arr[avail]);
 
607
    }
 
608
 
 
609
    sdscr_arr[avail].tag=nbtag;
 
610
    sdscr_arr[avail].issg=sg;
 
611
    retdscr= (sdscr_arr+avail);
 
612
 
 
613
    memset(&retdscr->sdescr,0,sizeof(struct ibv_recv_wr));
 
614
 
 
615
    if(sg)
 
616
       retdscr->sdescr.wr_id = DSCRID_SCATGAT + avail;
 
617
    else{
 
618
       retdscr->sdescr.wr_id = DSCRID_NBDSCR + avail;
 
619
       retdscr->numofsends=1;
 
620
    }
 
621
 
 
622
    newavail = (avail+1)%MAX_PENDING;
 
623
 
 
624
    if(SERVER_CONTEXT){
 
625
      cur_serv_pend_descr = avail;
 
626
      serverthreadavail=newavail;
 
627
    }
 
628
    else{
 
629
      cur_client_pend_descr = avail;
 
630
      clientthreadavail=newavail;
 
631
    }
 
632
    /*
 
633
    printf("\n%d:avail=%d newavail=%d cln=%d serv=%d",armci_me,avail,
 
634
      newavail,clientthreadavail,serverthreadavail); 
 
635
    */
 
636
    return(retdscr);
 
637
}
 
638
 
 
639
void armci_wait_for_server()
 
640
{
 
641
    armci_server_terminating = 1;
 
642
}
 
643
 
 
644
 
 
645
/* ibv_create_qp does not use separate structure to return properties,
 
646
   seems it is all inside ibv_qp */
 
647
static void armci_create_qp(vapi_nic_t *nic, struct ibv_qp **qp)
 
648
{
 
649
    struct ibv_qp_init_attr initattr;
 
650
 
 
651
    bzero(&initattr, sizeof(struct ibv_qp_init_attr));
 
652
 
 
653
    *qp = NULL;
 
654
 
 
655
    initattr.cap.max_send_wr = armci_max_qp_ous_swr;
 
656
    initattr.cap.max_recv_wr = armci_max_qp_ous_rwr;
 
657
    initattr.cap.max_recv_sge = armci_max_num_sg_ent;
 
658
    initattr.cap.max_send_sge = armci_max_num_sg_ent;
 
659
#if defined(PEND_BUFS)
 
660
    if(nic==CLN_nic) {
 
661
        initattr.send_cq = nic->rcq;
 
662
        initattr.recv_cq = nic->rcq;
 
663
    }
 
664
    else 
 
665
#endif
 
666
    {
 
667
        initattr.send_cq = nic->scq;
 
668
        initattr.recv_cq = nic->rcq;      
 
669
    }
 
670
    initattr.qp_type = IBV_QPT_RC;
 
671
 
 
672
    *qp = ibv_create_qp(nic->ptag, &initattr);
 
673
    dassert(1,*qp!=NULL);
 
674
 
 
675
    if(!armci_vapi_max_inline_size){
 
676
        armci_vapi_max_inline_size = initattr.cap.max_inline_data;
 
677
    }
 
678
}
 
679
 
 
680
int armci_openib_sl;
 
681
 
 
682
void armci_openib_env_init()
 
683
{
 
684
    char *value;
 
685
 
 
686
    if ((value = getenv("ARMCI_OPENIB_SL")) != NULL){
 
687
        armci_openib_sl = atoi(value);
 
688
    } 
 
689
    else {
 
690
        /* AV: default based on Chinook */
 
691
        armci_openib_sl = 0;
 
692
    }
 
693
        
 
694
    /* Similarly other constants can be changes to runtime */
 
695
}
 
696
 
 
697
static void armci_init_nic(vapi_nic_t *nic, int scq_entries, int
 
698
        rcq_entries)
 
699
{
 
700
    int rc, ndevs, i;
 
701
    struct ibv_device **devs=NULL;
 
702
    struct ibv_context *cxt;
 
703
 
 
704
    if (nic == SRV_nic) {
 
705
        /* Initialize OpenIB runtime variables only once*/
 
706
        armci_openib_env_init();
 
707
    }
 
708
 
 
709
    bzero(nic,sizeof(vapi_nic_t));
 
710
    nic->lid_arr = (uint16_t *)calloc(armci_nproc,sizeof(uint16_t));
 
711
    dassert(1,nic->lid_arr!=NULL);
 
712
 
 
713
    devs = ibv_get_device_list(&ndevs);
 
714
 
 
715
    nic->handle = ibv_open_device(*devs); 
 
716
 
 
717
    nic->maxtransfersize = MAX_RDMA_SIZE;
 
718
 
 
719
    nic->vendor = ibv_get_device_name(*devs);
 
720
 
 
721
    rc = ibv_query_device(nic->handle, &nic->attr);
 
722
 
 
723
    int down_port_count_check = 0;
 
724
    for (i = 1; i <= 2; i++) {
 
725
        rc = ibv_query_port(nic->handle, (uint8_t)i, &nic->hca_port);
 
726
        if (IBV_PORT_ACTIVE == nic->hca_port.state) {
 
727
            nic->active_port = i;
 
728
            break;
 
729
        } 
 
730
        else {
 
731
            down_port_count_check++;
 
732
        }
 
733
    }
 
734
 
 
735
    /* Assert that the number of inactive ports is not equal to the number
 
736
     * of down ports on any adapter */
 
737
    assert(down_port_count_check != 2);
 
738
        
 
739
    /*save the lid for doing a global exchange later */
 
740
    nic->lid_arr[armci_me] = nic->hca_port.lid;
 
741
 
 
742
    /*allocate tag (protection domain) */
 
743
    nic->ptag = ibv_alloc_pd(nic->handle);
 
744
 
 
745
    /* properties of scq and rcq required for the cq number, this also needs
 
746
     * to be globally exchanged
 
747
     */
 
748
    nic->scv = 1;
 
749
    nic->rcv = 2;
 
750
    nic->scq = nic->rcq = NULL; 
 
751
    
 
752
    if(scq_entries) {
 
753
        nic->sch = ibv_create_comp_channel(nic->handle);
 
754
        nic->scq = ibv_create_cq(nic->handle, 16000,
 
755
                nic->scq_cntx,nic->sch, 0);
 
756
    }
 
757
    
 
758
    if(rcq_entries) {
 
759
        nic->rch = ibv_create_comp_channel(nic->handle);
 
760
        nic->rcq = ibv_create_cq(nic->handle, 32768,
 
761
                nic->rcq_cntx,nic->rch, 0);
 
762
    }
 
763
    
 
764
    ibv_free_device_list(devs);
 
765
 
 
766
    armci_max_num_sg_ent = 29; 
 
767
    armci_max_qp_ous_swr = 100;
 
768
    armci_max_qp_ous_rwr = 50;
 
769
 
 
770
    if(armci_max_qp_ous_rwr + armci_max_qp_ous_swr>nic->attr.max_qp_wr){
 
771
        armci_max_qp_ous_swr = nic->attr.max_qp_wr/16;
 
772
        armci_max_qp_ous_rwr = nic->attr.max_qp_wr - armci_max_qp_ous_swr;
 
773
    }
 
774
    if(armci_max_num_sg_ent >= nic->attr.max_sge){
 
775
        armci_max_num_sg_ent = nic->attr.max_sge - 1;
 
776
    }
 
777
    
 
778
}
 
779
 
 
780
/****************MEMORY ALLOCATION REGISTRATION DEREGISTRATION****************/
 
781
static char * serv_malloc_buf_base;
 
782
/* #ifdef ARMCI_ENABLE_GPC_CALLS */
 
783
/* extern gpc_buf_t *gpc_req; */
 
784
/* #endif */
 
785
void armci_server_alloc_bufs()
 
786
{
 
787
    int rc;
 
788
    int mod, bytes, total, extra =sizeof(struct ibv_recv_wr)*MAX_DESCR+SIXTYFOUR;
 
789
    int mhsize = armci_nproc*sizeof(armci_vapi_memhndl_t); /* ack */
 
790
    char *tmp, *tmp0;
 
791
    int i, j=0;
 
792
#if defined(PEND_BUFS)
 
793
    int clients = (IMM_BUF_NUM+1)*armci_nproc;
 
794
#else
 
795
    int clients = armci_nproc;
 
796
#endif
 
797
 
 
798
    /* allocate memory for the recv buffers-must be alligned on 64byte bnd */
 
799
    /* note we add extra one to repost it for the client we are received req */
 
800
    bytes = (clients+1)*sizeof(vapibuf_t)+sizeof(vapibuf_ext_t) + extra+ mhsize
 
801
/* #ifdef ARMCI_ENABLE_GPC_CALLS */
 
802
/*       + MAX_GPC_REQ * sizeof(gpc_buf_t) */
 
803
/* #endif */
 
804
#if defined(PEND_BUFS)
 
805
      + (clients+1)*IMM_BUF_LEN
 
806
      + PENDING_BUF_NUM*(sizeof(vapibuf_pend_t)+PENDING_BUF_LEN)
 
807
#endif
 
808
      + sizeof(long)
 
809
      + 7*SIXTYFOUR;
 
810
    total = bytes + SIXTYFOUR;
 
811
    if(total%4096!=0)
 
812
       total = total - (total%4096) + 4096;
 
813
    tmp0=tmp = malloc(total);
 
814
    serv_malloc_buf_base = tmp0;
 
815
 
 
816
    dassert1(1,tmp!=NULL,(int)total);
 
817
    /* stamp the last byte */
 
818
    serv_tail= tmp + bytes+SIXTYFOUR-1;
 
819
    *serv_tail=SERV_STAMP;
 
820
    /* allocate memory for client memory handle to support put response
 
821
     *         in dynamic memory registration protocols */
 
822
    CLN_handle = (armci_vapi_memhndl_t*)tmp;
 
823
    memset(CLN_handle,0,mhsize); /* set it to zero */
 
824
    tmp += mhsize;
 
825
 
 
826
/* #ifdef ARMCI_ENABLE_GPC_CALLS */
 
827
/*     /\* gpc_req memory*\/ */
 
828
/*     tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR); */
 
829
/*     gpc_req = (gpc_buf_t *)tmp; */
 
830
/*     tmp += MAX_GPC_REQ * sizeof(gpc_buf_t); */
 
831
/* #endif */
 
832
 
 
833
    /* setup descriptor memory */
 
834
    tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
 
835
    serv_descr_pool.descr= (struct ibv_recv_wr *)(tmp);
 
836
    tmp += extra;
 
837
 
 
838
    /* setup ack buffer*/
 
839
    tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
 
840
    ack_buf = (long *)(tmp);
 
841
    *ack_buf=ARMCI_STAMP;
 
842
    tmp += sizeof(long);
 
843
 
 
844
    /* setup buffer pointers */
 
845
    tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
 
846
    serv_buf_arr = (vapibuf_t **)malloc(sizeof(vapibuf_t*)*clients);
 
847
    for(i=0;i<clients;i++){
 
848
      serv_buf_arr[i] = (vapibuf_t*)(tmp) + i;
 
849
    }
 
850
    tmp = (char *)(serv_buf_arr[0]+clients);
 
851
 
 
852
#if defined(PEND_BUFS)
 
853
    /*setup buffers in immediate buffers*/
 
854
    tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
 
855
    for(i=0; i<clients; i++) {
 
856
      serv_buf_arr[i]->buf = tmp + i*IMM_BUF_LEN;
 
857
    }
 
858
    tmp += clients*IMM_BUF_LEN;
 
859
 
 
860
    /*setup pending buffers*/
 
861
    tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
 
862
    serv_pendbuf_arr = (vapibuf_pend_t *)(tmp);
 
863
    tmp=(char *)(serv_pendbuf_arr+PENDING_BUF_NUM);
 
864
    tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
 
865
    for(i=0; i<PENDING_BUF_NUM; i++) {
 
866
      serv_pendbuf_arr[i].buf = tmp+i*PENDING_BUF_LEN;
 
867
      assert(serv_pendbuf_arr[i].buf != NULL);
 
868
    }
 
869
    tmp += PENDING_BUF_NUM*PENDING_BUF_LEN;
 
870
    MessageRcvBuffer = NULL;    
 
871
#else
 
872
    tmp += SIXTYFOUR - ((ssize_t)tmp % SIXTYFOUR);
 
873
    spare_serv_buf = (vapibuf_t *)tmp; /* spare buffer is at the end */
 
874
    spare_serv_bufptr = spare_serv_buf;    /* save the pointer for later */
 
875
    serv_buf =(vapibuf_ext_t*)(spare_serv_buf+1);
 
876
    tmp = (char *)(serv_buf+1);
 
877
 
 
878
    MessageRcvBuffer = serv_buf->buf;
 
879
#endif
 
880
 
 
881
   flag_arr = (int *)malloc(sizeof(int)*armci_nproc);
 
882
   for (i =0; i<armci_nproc; i++) flag_arr[i] = 9999;
 
883
 
 
884
    if(DEBUG_SERVER){
 
885
      printf("\n%d(s):registering mem %p %dbytes ptag=%ld handle=%d\n",
 
886
             armci_me, tmp0,total,CLN_nic->ptag,CLN_nic->handle);fflush(stdout);
 
887
    }
 
888
 
 
889
    serv_memhandle.memhndl = ibv_reg_mr(CLN_nic->ptag, tmp0, total,
 
890
                                        IBV_ACCESS_LOCAL_WRITE |
 
891
                                        IBV_ACCESS_REMOTE_WRITE |
 
892
                                        IBV_ACCESS_REMOTE_READ);
 
893
    dassert1(1,serv_memhandle.memhndl!=NULL,total);
 
894
    serv_memhandle.lkey=serv_memhandle.memhndl->lkey;
 
895
    serv_memhandle.rkey=serv_memhandle.memhndl->rkey;
 
896
 
 
897
    /* exchange address of ack/memhandle flag on servers */
 
898
    if(DEBUG_SERVER){
 
899
       printf("%d(s):registered mem %p %dbytes mhandle=%d mharr starts%p\n",
 
900
              armci_me, tmp0, total, serv_memhandle.memhndl,CLN_handle);
 
901
       fflush(stdout);
 
902
    }
 
903
}
 
904
 
 
905
static char * client_malloc_buf_base;
 
906
char * armci_vapi_client_mem_alloc(int size)
 
907
{
 
908
    int rc;
 
909
    int mod, total;
 
910
    int extra = MAX_DESCR*sizeof(struct ibv_recv_wr)+SIXTYFOUR;
 
911
    char *tmp,*tmp0;
 
912
 
 
913
    /*we use the size passed by the armci_init_bufs routine instead of bytes*/
 
914
 
 
915
    total = size + extra + 2*SIXTYFOUR;
 
916
 
 
917
    if(total%4096!=0)
 
918
       total = total - (total%4096) + 4096;
 
919
    tmp0  = tmp = malloc(total);
 
920
    dassert1(1,tmp!=NULL,total);
 
921
    client_malloc_buf_base = tmp;
 
922
#if 0
 
923
    /*SK: could this lead to a problem at ibv_reg_mr() because of unfixed 'total'?*/
 
924
    if(ALIGN64ADD(tmp0))tmp0+=ALIGN64ADD(tmp0);
 
925
#endif
 
926
    /* stamp the last byte */
 
927
    client_tail= tmp + extra+ size +2*SIXTYFOUR-1;
 
928
    *client_tail=CLIENT_STAMP;
 
929
 
 
930
    /* we also have a place to store memhandle for zero-copy get */
 
931
    pinned_handle =(armci_vapi_memhndl_t *) (tmp + extra+ size +SIXTYFOUR-16);
 
932
 
 
933
    mod = ((ssize_t)tmp)%SIXTYFOUR;
 
934
    client_descr_pool.descr= (struct ibv_recv_wr*)(tmp+SIXTYFOUR-mod);
 
935
    tmp += extra;
 
936
 
 
937
    client_memhandle.memhndl = ibv_reg_mr(SRV_nic->ptag, tmp0, total,
 
938
                                          IBV_ACCESS_LOCAL_WRITE |
 
939
                                          IBV_ACCESS_REMOTE_WRITE |
 
940
                                          IBV_ACCESS_REMOTE_READ);
 
941
    dassert(1,client_memhandle.memhndl!=NULL);
 
942
    
 
943
    client_memhandle.lkey = client_memhandle.memhndl->lkey;
 
944
    client_memhandle.rkey = client_memhandle.memhndl->rkey;
 
945
    handle_array[armci_me].lkey = client_memhandle.lkey;
 
946
    handle_array[armci_me].rkey = client_memhandle.rkey;
 
947
  
 
948
    handle_array[armci_me].memhndl = client_memhandle.memhndl;
 
949
 
 
950
    if(DEBUG_INIT){
 
951
       printf("%d: registered client memory %p %dsize tmp=%p \n",
 
952
               armci_me,tmp0, total, tmp);
 
953
       fflush(stdout);
 
954
    }
 
955
    /*now that we have the handle array, we get every body elses RDMA handle*/
 
956
    total = (sizeof(armci_vapi_memhndl_t)*armci_nproc)/sizeof(int);
 
957
    armci_msg_gop_scope(SCOPE_ALL,handle_array,total,"+",ARMCI_INT);
 
958
 
 
959
    return(tmp);
 
960
}
 
961
 
 
962
 
 
963
void armci_server_register_region(void *ptr,long bytes, ARMCI_MEMHDL_T *memhdl)
 
964
{
 
965
    bzero(memhdl,sizeof(ARMCI_MEMHDL_T));
 
966
 
 
967
    memhdl->memhndl = ibv_reg_mr(CLN_nic->ptag, ptr, bytes,
 
968
               IBV_ACCESS_LOCAL_WRITE |
 
969
               IBV_ACCESS_REMOTE_WRITE |
 
970
               IBV_ACCESS_REMOTE_READ);
 
971
    dassert(1,memhdl->memhndl!=NULL);
 
972
 
 
973
    memhdl->lkey=memhdl->memhndl->lkey;
 
974
    memhdl->rkey=memhdl->memhndl->rkey;
 
975
 
 
976
    if(DEBUG_SERVER){
 
977
       printf("\n%d(s):registered lkey=%d rkey=%d ptr=%p end=%p %p\n",armci_me,
 
978
               memhdl->lkey,memhdl->rkey,ptr,(char *)ptr+bytes,memhdl);
 
979
       fflush(stdout);
 
980
    }
 
981
}
 
982
 
 
983
int armci_pin_contig_hndl(void *ptr, size_t bytes, ARMCI_MEMHDL_T *memhdl)
 
984
{
 
985
    memhdl->memhndl = ibv_reg_mr(SRV_nic->ptag, ptr, bytes,
 
986
               IBV_ACCESS_LOCAL_WRITE |
 
987
               IBV_ACCESS_REMOTE_WRITE |
 
988
               IBV_ACCESS_REMOTE_READ);
 
989
    dassert(1,memhdl->memhndl!=NULL);
 
990
    memhdl->lkey=memhdl->memhndl->lkey;
 
991
    memhdl->rkey=memhdl->memhndl->rkey;
 
992
    if(0){
 
993
       printf("\n%d:registered lkey=%d rkey=%d ptr=%p end=%p\n",armci_me,
 
994
               memhdl->lkey,memhdl->rkey,ptr,(char *)ptr+bytes);fflush(stdout);
 
995
    }
 
996
    return 1;
 
997
}
 
998
 
 
999
#if 1
 
1000
void armci_network_client_deregister_memory(ARMCI_MEMHDL_T *mh)
 
1001
{
 
1002
    int rc;
 
1003
    rc = ibv_dereg_mr(mh->memhndl);
 
1004
    dassert1(1,rc==0,rc);
 
1005
    armci_vapi_check_return(DEBUG_FINALIZE,rc,
 
1006
                        "armci_network_client_deregister_memory:deregister_mr");
 
1007
}
 
1008
void armci_network_server_deregister_memory(ARMCI_MEMHDL_T *mh)
 
1009
{
 
1010
    int rc;
 
1011
return; /* ??? why ??? */
 
1012
    printf("\n%d:deregister ptr=%p",armci_me,mh);fflush(stdout);
 
1013
    rc = ibv_dereg_mr(mh->memhndl);
 
1014
    dassert1(1,rc==0,rc);
 
1015
    armci_vapi_check_return(DEBUG_FINALIZE,rc,
 
1016
                        "armci_network_server_deregister_memory:deregister_mr");
 
1017
}
 
1018
#else
 
1019
#   define armci_network_client_deregister_memory(mh)           \
 
1020
           armci_vapi_check_return(DEBUG_FINALIZE,              \
 
1021
                                   ibv_dereg_mr(mh->memhndl),   \
 
1022
                                   "armci_network_client_deregister_memory:deregister_mr")
 
1023
#   define armci_network_server_deregister_memory(mh)           \
 
1024
           armci_vapi_check_return(DEBUG_FINALIZE,              \
 
1025
                                   ibv_dereg_mr(mh->memhndl),   \
 
1026
                                   "armci_network_server_deregister_memory:deregister_mr")
 
1027
#endif
 
1028
 
 
1029
void armci_set_serv_mh()
 
1030
{
 
1031
int s, ratio = sizeof(ack_t)/sizeof(int);
 
1032
    /* first collect addrresses on all masters */
 
1033
    if(armci_me == armci_master){
 
1034
       SRV_ack[armci_clus_me].prem_handle=CLN_handle;
 
1035
       SRV_ack[armci_clus_me].handle =serv_memhandle;
 
1036
       armci_msg_gop_scope(SCOPE_MASTERS,SRV_ack,ratio*armci_nclus,"+",
 
1037
                           ARMCI_INT);
 
1038
    }
 
1039
    /* next master broadcasts the addresses within its node */
 
1040
    armci_msg_bcast_scope(SCOPE_NODE,SRV_ack,armci_nclus*sizeof(ack_t),
 
1041
                          armci_master);
 
1042
 
 
1043
    /* Finally save address corresponding to my id on each server */
 
1044
    for(s=0; s< armci_nclus; s++){
 
1045
       SRV_ack[s].prem_handle += armci_me;
 
1046
    }
 
1047
 
 
1048
}
 
1049
/**********END MEMORY ALLOCATION REGISTRATION AND DEREGISTRATION**************/
 
1050
 
 
1051
/*\
 
1052
 * init_connections, client_connect_to_servers -- client code
 
1053
 * server_initial_connection, all_data_server -- server code 
 
1054
\*/ 
 
1055
void armci_init_connections()
 
1056
{
 
1057
int c,s;
 
1058
int sz;
 
1059
 uint32_t *tmpbuf;
 
1060
int *tmparr;
 
1061
    if(TIME_INIT)inittime0 = MPI_Wtime(); 
 
1062
    
 
1063
#if defined(PEND_BUFS)
 
1064
    armci_pbuf_init_buffer_env();
 
1065
#endif
 
1066
    /* initialize nic connection for qp numbers and lid's */
 
1067
    armci_init_nic(SRV_nic,1,1);
 
1068
    for(c=0; c<NUMOFBUFFERS+1; c++) {
 
1069
      mark_buf_send_complete[c]=1;
 
1070
    }
 
1071
    _gtmparr = (int *)calloc(armci_nproc,sizeof(int)); 
 
1072
 
 
1073
    /*qp_numbers and lids need to be exchanged globally*/
 
1074
    tmparr = (int *)calloc(armci_nproc,sizeof(int));
 
1075
    tmparr[armci_me] = SRV_nic->lid_arr[armci_me];
 
1076
    sz = armci_nproc;
 
1077
    armci_msg_gop_scope(SCOPE_ALL,tmparr,sz,"+",ARMCI_INT);
 
1078
    for(c=0;c<armci_nproc;c++){
 
1079
       SRV_nic->lid_arr[c]=tmparr[c];
 
1080
       tmparr[c]=0;
 
1081
    }
 
1082
    /*SRV_con is for client to connect to servers */
 
1083
    SRV_con=(armci_connect_t *)malloc(sizeof(armci_connect_t)*armci_nclus);
 
1084
    dassert1(1,SRV_con!=NULL,sizeof(armci_connect_t)*armci_nclus);
 
1085
    bzero(SRV_con,sizeof(armci_connect_t)*armci_nclus);
 
1086
 
 
1087
    CLN_con=(armci_connect_t*)malloc(sizeof(armci_connect_t)*armci_nproc);
 
1088
    dassert1(1,CLN_con!=NULL,sizeof(armci_connect_t)*armci_nproc);
 
1089
    bzero(CLN_con,sizeof(armci_connect_t)*armci_nproc);
 
1090
 
 
1091
    /*every client creates a qp with every server other than the one on itself*/
 
1092
    SRV_rqpnums = (uint32_t*)malloc(sizeof(uint32_t)*armci_nproc);
 
1093
    dassert(1,SRV_rqpnums);
 
1094
    tmpbuf = (uint32_t*)calloc(armci_nproc,sizeof(uint32_t));
 
1095
    dassert(1,tmpbuf);
 
1096
 
 
1097
    sz = armci_nproc*(sizeof(uint32_t)/sizeof(int));
 
1098
    armci_vapi_max_inline_size = 0;
 
1099
    for(s = 0; s < armci_nclus; s++){
 
1100
        armci_connect_t *con = SRV_con + s;
 
1101
        armci_create_qp(SRV_nic,&con->qp);
 
1102
        con->sqpnum  = con->qp->qp_num;
 
1103
        tmpbuf[armci_clus_info[s].master] = con->qp->qp_num;
 
1104
        con->lid = SRV_nic->lid_arr[s];
 
1105
    }
 
1106
    MPI_Alltoall(tmpbuf,sizeof(uint32_t),MPI_CHAR,SRV_rqpnums,
 
1107
                 sizeof(uint32_t),MPI_CHAR,MPI_COMM_WORLD);
 
1108
    free(tmpbuf);
 
1109
    if(armci_me != armci_master) {
 
1110
      free(SRV_rqpnums);
 
1111
      SRV_rqpnums=NULL;
 
1112
    }
 
1113
 
 
1114
 
 
1115
    SRV_ack = (ack_t*)calloc(armci_nclus,sizeof(ack_t));
 
1116
    dassert1(1,SRV_ack!=NULL,armci_nclus*sizeof(ack_t));
 
1117
 
 
1118
    handle_array = (armci_vapi_memhndl_t *)calloc(sizeof(armci_vapi_memhndl_t),
 
1119
            armci_nproc);
 
1120
    dassert1(1,handle_array!=NULL,sizeof(armci_vapi_memhndl_t)*armci_nproc);
 
1121
}
 
1122
 
 
1123
static void vapi_connect_client()
 
1124
{
 
1125
    int i, start, sz=0, c, rc;
 
1126
    struct ibv_qp_attr qp_attr;
 
1127
    struct ibv_qp_cap qp_cap;
 
1128
    enum ibv_qp_attr_mask qp_attr_mask;
 
1129
 
 
1130
    if (TIME_INIT) inittime0 = MPI_Wtime();
 
1131
    if (armci_me == armci_master)
 
1132
        armci_util_wait_int(&armci_vapi_server_stage1, 1, 10);
 
1133
    if (TIME_INIT) printf("\n%d:wait for server to get to stage 1 time for "
 
1134
                          "vapi_connect_client is %f",
 
1135
                          armci_me, (inittime1 = MPI_Wtime()) - inittime0);
 
1136
    sz = armci_nproc;
 
1137
    if (armci_me == armci_master) {
 
1138
       armci_msg_gop_scope(SCOPE_MASTERS, _gtmparr, sz, "+", ARMCI_INT);
 
1139
       for (c=0; c<armci_nproc; c++) {
 
1140
         CLN_nic->lid_arr[c] = _gtmparr[c];
 
1141
         _gtmparr[c] = 0;
 
1142
       }
 
1143
       if (DEBUG_CLN) {
 
1144
         printf("\n%d(svc): mylid = %d",armci_me,CLN_nic->lid_arr[armci_me]);
 
1145
         fflush(stdout);
 
1146
       }
 
1147
    }
 
1148
 
 
1149
    armci_vapi_client_stage1 = 1;
 
1150
 
 
1151
    /* allocate and initialize connection structs */
 
1152
    sz = armci_nproc*sizeof(uint32_t)/sizeof(int);
 
1153
 
 
1154
    if (armci_me == armci_master)
 
1155
       armci_util_wait_int(&armci_vapi_server_stage2, 1, 10);
 
1156
#if 0
 
1157
    for (c = 0; c < armci_nproc; c++){
 
1158
       armci_connect_t *con = CLN_con + c;
 
1159
       if (armci_me != armci_master) {
 
1160
         char *ptrr;
 
1161
         int extra;
 
1162
         ptrr = malloc(8 + sizeof(uint32_t) * armci_nproc);
 
1163
         extra = ALIGNLONGADD(ptrr);
 
1164
         ptrr = ptrr + extra;
 
1165
         con->rqpnum = (uint32_t *)ptrr;
 
1166
         bzero(con->rqpnum, sizeof(uint32_t) * armci_nproc);
 
1167
       }
 
1168
       armci_msg_gop_scope(SCOPE_ALL, con->rqpnum, sz, "+", ARMCI_INT);
 
1169
    }
 
1170
#else
 
1171
    CLN_rqpnums = (uint32_t*)malloc(sizeof(uint32_t)*armci_nproc);
 
1172
    if(armci_me != armci_master) {
 
1173
      /*just has junk*/
 
1174
      CLN_rqpnumtmpbuf = (uint32_t*)malloc(sizeof(uint32_t)*armci_nproc);
 
1175
    }
 
1176
    dassert(1, CLN_rqpnumtmpbuf);
 
1177
    MPI_Alltoall(CLN_rqpnumtmpbuf, sizeof(uint32_t), MPI_CHAR,
 
1178
                 CLN_rqpnums, sizeof(uint32_t), MPI_CHAR, MPI_COMM_WORLD);
 
1179
    free(CLN_rqpnumtmpbuf);
 
1180
    CLN_rqpnumtmpbuf=NULL;
 
1181
#endif
 
1182
 
 
1183
   if (TIME_INIT) printf("\n%d:wait for server tog et to stage 2 time for "
 
1184
                         "vapi_connect_client is %f",
 
1185
                         armci_me, (inittime2 = MPI_Wtime()) - inittime1);
 
1186
    /*armci_set_serv_mh();*/
 
1187
 
 
1188
    if (DEBUG_CLN) {
 
1189
        printf("%d:all connections ready\n", armci_me);
 
1190
        fflush(stdout);
 
1191
    }
 
1192
 
 
1193
    /* For sanity */
 
1194
    memset(&qp_attr, 0, sizeof qp_attr);
 
1195
    /* Modifying  QP to INIT */
 
1196
    qp_attr_mask = IBV_QP_STATE
 
1197
                 | IBV_QP_PKEY_INDEX
 
1198
                 | IBV_QP_PORT
 
1199
                 | IBV_QP_ACCESS_FLAGS;
 
1200
 
 
1201
    qp_attr.qp_state = IBV_QPS_INIT;
 
1202
    qp_attr.pkey_index = DEFAULT_PKEY_IX;
 
1203
    qp_attr.port_num = SRV_nic->active_port;
 
1204
    qp_attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
 
1205
 
 
1206
    /* start from from server on my_node -1 */
 
1207
    start = (armci_clus_me == 0) ? armci_nclus - 1 : armci_clus_me - 1;
 
1208
    for (i = 0; i < armci_nclus; i++) {
 
1209
       armci_connect_t *con;
 
1210
       con = SRV_con + i;
 
1211
       rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
 
1212
       dassertp(1,!rc,("%d: client RST->INIT i=%d rc=%d\n",armci_me,i,rc));
 
1213
    }
 
1214
 
 
1215
    if (TIME_INIT) printf("\n%d:to init time for vapi_connect_client is %f",
 
1216
                          armci_me, (inittime1 = MPI_Wtime()) - inittime2);
 
1217
    qp_attr_mask = IBV_QP_STATE
 
1218
                 | IBV_QP_MAX_DEST_RD_ATOMIC
 
1219
                 | IBV_QP_PATH_MTU
 
1220
                 | IBV_QP_RQ_PSN
 
1221
                 | IBV_QP_MIN_RNR_TIMER;
 
1222
    memset(&qp_attr, 0, sizeof qp_attr);
 
1223
 
 
1224
    qp_attr.qp_state        = IBV_QPS_RTR;
 
1225
    qp_attr.max_dest_rd_atomic   = 4;
 
1226
    qp_attr.path_mtu        = IBV_MTU_1024;
 
1227
    qp_attr.rq_psn          = 0;
 
1228
    qp_attr.min_rnr_timer   = RNR_TIMER;
 
1229
 
 
1230
    /* AV: Adding the service level parameter */
 
1231
    qp_attr.ah_attr.sl      = armci_openib_sl;
 
1232
 
 
1233
    start = (armci_clus_me == 0) ? armci_nclus - 1 : armci_clus_me - 1;
 
1234
    for (i = 0; i < armci_nclus; i++) {
 
1235
        armci_connect_t *con;
 
1236
        armci_connect_t *conS;
 
1237
        con = SRV_con + i;
 
1238
#if 0
 
1239
        conS = CLN_con + armci_me;
 
1240
#endif
 
1241
        qp_attr_mask |= IBV_QP_AV | IBV_QP_DEST_QPN;
 
1242
#if 0
 
1243
        qp_attr.dest_qp_num = conS->rqpnum[armci_clus_info[i].master];
 
1244
#else
 
1245
        qp_attr.dest_qp_num = CLN_rqpnums[armci_clus_info[i].master];
 
1246
#endif
 
1247
        qp_attr.ah_attr.dlid = SRV_nic->lid_arr[armci_clus_info[i].master];
 
1248
        qp_attr.ah_attr.port_num = SRV_nic->active_port;
 
1249
 
 
1250
        qp_attr.ah_attr.sl = armci_openib_sl;
 
1251
            
 
1252
        rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
 
1253
        dassertp(1,!rc,("%d: INIT->RTR client i=%d rc=%d\n",armci_me,i,rc));
 
1254
    }
 
1255
 
 
1256
    /*to to to RTS, other side must be in RTR*/
 
1257
    armci_msg_barrier();
 
1258
    if (TIME_INIT) printf("\n%d:init to rtr time for vapi_connect_client is %f",
 
1259
                          armci_me, (inittime2 = MPI_Wtime()) - inittime1);
 
1260
    armci_vapi_client_ready=1;
 
1261
 
 
1262
    qp_attr_mask = IBV_QP_STATE
 
1263
                 | IBV_QP_SQ_PSN
 
1264
                 | IBV_QP_TIMEOUT
 
1265
                 | IBV_QP_RETRY_CNT
 
1266
                 | IBV_QP_RNR_RETRY
 
1267
                 | IBV_QP_MAX_QP_RD_ATOMIC;
 
1268
 
 
1269
    memset(&qp_attr, 0, sizeof qp_attr);
 
1270
 
 
1271
    qp_attr.qp_state            = IBV_QPS_RTS;
 
1272
    qp_attr.sq_psn              = 0;
 
1273
    qp_attr.timeout             = 18;
 
1274
    qp_attr.retry_cnt           = 20;
 
1275
    qp_attr.rnr_retry           = 7;
 
1276
    qp_attr.max_rd_atomic  = 4;
 
1277
 
 
1278
    start = (armci_clus_me == 0) ? armci_nclus - 1 : armci_clus_me - 1;
 
1279
    for (i = 0; i < armci_nclus; i++){
 
1280
       armci_connect_t *con;
 
1281
       con = SRV_con + i;
 
1282
       rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
 
1283
       dassertp(1,!rc,("%d: client RTR->RTS i=%d rc=%d\n",armci_me,i,rc));
 
1284
    }
 
1285
    if (TIME_INIT) printf("\n%d:rtr to rts time for vapi_connect_client is %f",
 
1286
                          armci_me, (inittime1 = MPI_Wtime()) - inittime2);
 
1287
    free(CLN_rqpnums);
 
1288
    CLN_rqpnums=NULL;
 
1289
}
 
1290
 
 
1291
 
 
1292
void armci_client_connect_to_servers()
 
1293
{
 
1294
    extern void armci_util_wait_int(volatile int *,int,int);
 
1295
    if (TIME_INIT) inittime0 = MPI_Wtime();
 
1296
    _armci_buf_init();
 
1297
 
 
1298
    vapi_connect_client();
 
1299
    if (armci_me == armci_master) 
 
1300
       armci_util_wait_int(&armci_vapi_server_ready,1,10);
 
1301
    armci_msg_barrier();
 
1302
    if (DEBUG_CLN && armci_me == armci_master) {
 
1303
       printf("\n%d:server_ready=%d\n",armci_me,armci_vapi_server_ready);
 
1304
       fflush(stdout);
 
1305
    }
 
1306
    if (TIME_INIT) printf("\n%d:time for client_connect_to_s is %f",
 
1307
                          armci_me,MPI_Wtime()-inittime0);
 
1308
}
 
1309
 
 
1310
 
 
1311
void armci_init_vapibuf_recv(struct ibv_recv_wr *rd, struct ibv_sge *sg_entry,
 
1312
                             char *buf, int len, armci_vapi_memhndl_t *mhandle)
 
1313
{
 
1314
     memset(rd,0,sizeof(struct ibv_recv_wr));
 
1315
     rd->next = NULL;
 
1316
     rd->num_sge    = 1;
 
1317
     rd->sg_list    = sg_entry;
 
1318
     rd->wr_id      = 0;
 
1319
 
 
1320
     sg_entry->lkey     = mhandle->lkey;
 
1321
     sg_entry->addr     = (uint64_t)buf;
 
1322
     sg_entry->length   = len;
 
1323
}
 
1324
 
 
1325
 
 
1326
void armci_init_vapibuf_send(struct ibv_send_wr *sd, struct ibv_sge *sg_entry,
 
1327
                             char *buf, int len, armci_vapi_memhndl_t *mhandle)
 
1328
{
 
1329
     sd->opcode = IBV_WR_SEND;
 
1330
     sd->next = NULL;
 
1331
     sd->send_flags = IBV_SEND_SIGNALED;
 
1332
     sd->num_sge            = 1;
 
1333
     sd->sg_list            = sg_entry;
 
1334
 
 
1335
     sg_entry->lkey     = mhandle->lkey;
 
1336
     sg_entry->addr     = (uint64_t)buf;
 
1337
     sg_entry->length   = len;
 
1338
}
 
1339
 
 
1340
 
 
1341
static void armci_init_vbuf_srdma(struct ibv_send_wr *sd, struct ibv_sge *sg_entry,
 
1342
                                  char *lbuf, char *rbuf, int len,
 
1343
                                  armci_vapi_memhndl_t *lhandle,
 
1344
                                  armci_vapi_memhndl_t *rhandle)
 
1345
{
 
1346
     /* NOTE: sd->wr is a union, sr->wr.ud might conflict with sr->wr.rdma */
 
1347
     sd->opcode = IBV_WR_RDMA_WRITE;
 
1348
     sd->send_flags = IBV_SEND_SIGNALED;
 
1349
     sd->next = NULL;
 
1350
     sd->num_sge                    = 1;
 
1351
     sd->sg_list                    = sg_entry;
 
1352
     if (rhandle) sd->wr.rdma.rkey  = rhandle->rkey;
 
1353
     sd->wr.rdma.remote_addr        = (uint64_t)rbuf;
 
1354
 
 
1355
     if (lhandle) sg_entry->lkey    = lhandle->lkey;
 
1356
     sg_entry->addr                 = (uint64_t)lbuf;
 
1357
     sg_entry->length               = len;
 
1358
}
 
1359
 
 
1360
 
 
1361
static void armci_init_vbuf_rrdma(struct ibv_send_wr *sd, struct ibv_sge
 
1362
        *sg_entry, char *lbuf, char *rbuf, int len, armci_vapi_memhndl_t
 
1363
        *lhandle, armci_vapi_memhndl_t *rhandle)
 
1364
{
 
1365
     sd->opcode = IBV_WR_RDMA_READ;
 
1366
     sd->next = NULL;
 
1367
     sd->send_flags = IBV_SEND_SIGNALED;
 
1368
     sd->num_sge                    = 1;
 
1369
     sd->sg_list                    = sg_entry;
 
1370
     sd->wr.ud.remote_qkey          = 0;
 
1371
     if (rhandle) sd->wr.rdma.rkey  = rhandle->rkey;
 
1372
     sd->wr.rdma.remote_addr        = (uint64_t)rbuf;
 
1373
 
 
1374
     if (lhandle) sg_entry->lkey    = lhandle->lkey;
 
1375
     sg_entry->addr                 = (uint64_t)lbuf;
 
1376
     sg_entry->length               = len;
 
1377
     /* sd->wr is a union, sr->wr.ud might conflict with sr->wr.rdma */
 
1378
}
 
1379
 
 
1380
 
 
1381
void armci_server_initial_connection()
 
1382
{
 
1383
  int c, rc, i, j;
 
1384
    struct ibv_qp_attr qp_attr;
 
1385
    struct ibv_qp_init_attr qp_init_attr;
 
1386
    struct ibv_qp_cap qp_cap;
 
1387
    enum ibv_qp_attr_mask qp_attr_mask;
 
1388
    char *enval;
 
1389
    struct ibv_recv_wr *bad_wr;
 
1390
 
 
1391
    if (TIME_INIT) 
 
1392
        inittime0 = MPI_Wtime();
 
1393
 
 
1394
    if (DEBUG_SERVER) {
 
1395
        printf("in server after fork %d (%d)\n",armci_me,getpid());
 
1396
        fflush(stdout);
 
1397
    }
 
1398
 
 
1399
#if defined(PEND_BUFS) && !defined(SERVER_THREAD)
 
1400
    armci_pbuf_init_buffer_env();
 
1401
#endif
 
1402
    armci_init_nic(CLN_nic,1,1);
 
1403
 
 
1404
    _gtmparr[armci_me] = CLN_nic->lid_arr[armci_me];
 
1405
    armci_vapi_server_stage1 = 1;
 
1406
    armci_util_wait_int(&armci_vapi_client_stage1, 1, 10);
 
1407
 
 
1408
    CLN_rqpnumtmpbuf = (uint32_t*)malloc(sizeof(uint32_t)*armci_nproc);
 
1409
    dassert(1, CLN_rqpnumtmpbuf);
 
1410
    for (c = 0; c < armci_nproc; c++) {
 
1411
       char *ptrr;
 
1412
       int extra;
 
1413
       armci_connect_t *con = CLN_con + c;
 
1414
       armci_create_qp(CLN_nic, &con->qp);
 
1415
       con->sqpnum = con->qp->qp_num;
 
1416
       con->lid    = CLN_nic->lid_arr[c];
 
1417
       CLN_rqpnumtmpbuf[c] = con->qp->qp_num;
 
1418
    }
 
1419
 
 
1420
    armci_vapi_server_stage2 = 1;
 
1421
 
 
1422
    qp_attr_mask = IBV_QP_STATE
 
1423
                 | IBV_QP_PKEY_INDEX
 
1424
                 | IBV_QP_PORT
 
1425
                 | IBV_QP_ACCESS_FLAGS;
 
1426
 
 
1427
    memset(&qp_attr, 0, sizeof qp_attr);
 
1428
    qp_attr.qp_state        = IBV_QPS_INIT;
 
1429
    qp_attr.pkey_index      = DEFAULT_PKEY_IX;
 
1430
    qp_attr.port_num        = CLN_nic->active_port;
 
1431
    qp_attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE |
 
1432
        IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
 
1433
 
 
1434
    for (c = 0; c < armci_nproc; c++) {
 
1435
       armci_connect_t *con = CLN_con + c;
 
1436
       rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask);
 
1437
       dassertp(1,!rc,("%d: RTS->INIT server c=%d rc=%d\n",armci_me,c,rc));
 
1438
    }
 
1439
    
 
1440
    memset(&qp_attr, 0, sizeof qp_attr);
 
1441
    qp_attr_mask = IBV_QP_STATE
 
1442
                 | IBV_QP_MAX_DEST_RD_ATOMIC
 
1443
                 | IBV_QP_PATH_MTU
 
1444
                 | IBV_QP_RQ_PSN
 
1445
                 | IBV_QP_MIN_RNR_TIMER;
 
1446
    qp_attr.qp_state           = IBV_QPS_RTR;
 
1447
    qp_attr.path_mtu           = IBV_MTU_1024;          
 
1448
    qp_attr.max_dest_rd_atomic = 4;
 
1449
    qp_attr.min_rnr_timer      = RNR_TIMER;
 
1450
    qp_attr.rq_psn             = 0;
 
1451
 
 
1452
    for(c = 0; c < armci_nproc; c++) {
 
1453
       armci_connect_t *con = CLN_con + c;
 
1454
       qp_attr_mask |= IBV_QP_DEST_QPN | IBV_QP_AV;
 
1455
       qp_attr.dest_qp_num  = SRV_rqpnums[c];
 
1456
       qp_attr.ah_attr.dlid = SRV_nic->lid_arr[c];
 
1457
       qp_attr.ah_attr.port_num = CLN_nic->active_port;
 
1458
 
 
1459
       rc = ibv_modify_qp(con->qp, &qp_attr, qp_attr_mask); 
 
1460
       dassertp(1,!rc,("%d: INIT->RTR server cln=%d rc=%d\n",armci_me,c,rc));
 
1461
    }
 
1462
 
 
1463
    armci_util_wait_int(&armci_vapi_client_ready,1,10);
 
1464
    memset(&qp_attr, 0, sizeof qp_attr);
 
1465
 
 
1466
    qp_attr_mask = IBV_QP_STATE
 
1467
                 | IBV_QP_SQ_PSN
 
1468
                 | IBV_QP_TIMEOUT
 
1469
                 | IBV_QP_RETRY_CNT
 
1470
                 | IBV_QP_RNR_RETRY
 
1471
                 | IBV_QP_MAX_QP_RD_ATOMIC;
 
1472
 
 
1473
    qp_attr.qp_state            = IBV_QPS_RTS;
 
1474
    qp_attr.sq_psn              = 0;
 
1475
    qp_attr.timeout             = 18;
 
1476
    qp_attr.retry_cnt           = 20;
 
1477
    qp_attr.rnr_retry           = 7;
 
1478
    qp_attr.max_rd_atomic  = 4;
 
1479
 
 
1480
    for (c = 0; c < armci_nproc; c++) {
 
1481
       armci_connect_t *con = CLN_con + c;
 
1482
       rc = ibv_modify_qp(con->qp, &qp_attr,qp_attr_mask);
 
1483
       dassertp(1,!rc,("%d: server RTR->RTS cln=%d rc=%d\n",armci_me,c,rc));
 
1484
    }
 
1485
 
 
1486
 
 
1487
    free(SRV_rqpnums);
 
1488
    SRV_rqpnums = NULL;
 
1489
    armci_server_alloc_bufs();
 
1490
 
 
1491
    /* setup descriptors and post nonblocking receives */
 
1492
#if defined(PEND_BUFS)
 
1493
    assert(armci_nproc*(IMM_BUF_NUM+1)<DSCRID_IMMBUF_RECV_END-DSCRID_IMMBUF_RECV);
 
1494
    for(i =  0; i < armci_nproc; i++) {
 
1495
      for(j=0; j<IMM_BUF_NUM+1; j++) {
 
1496
        vapibuf_t *vbuf;
 
1497
        vbuf = serv_buf_arr[i*(IMM_BUF_NUM+1)+j];
 
1498
        armci_init_vapibuf_recv(&vbuf->dscr, &vbuf->sg_entry, vbuf->buf,
 
1499
                                IMM_BUF_LEN, &serv_memhandle);
 
1500
        /* we use index of the buffer to identify the buffer, this index is
 
1501
         * returned with a call to ibv_poll_cq inside the ibv_wr */
 
1502
        vbuf->dscr.wr_id = i*(IMM_BUF_NUM+1)+j + DSCRID_IMMBUF_RECV;
 
1503
        if (DEBUG_SERVER) {
 
1504
          printf("\n%d(s):posted rr with lkey=%d",armci_me,vbuf->sg_entry.lkey);
 
1505
          fflush(stdout);
 
1506
        }
 
1507
        rc = ibv_post_recv((CLN_con+i)->qp, &vbuf->dscr, &bad_wr);
 
1508
        dassert1(1,rc==0,rc);
 
1509
      }
 
1510
    }
 
1511
#else
 
1512
    for(i =  0; i < armci_nproc; i++) {
 
1513
      vapibuf_t *vbuf;
 
1514
      vbuf = serv_buf_arr[i];
 
1515
      armci_init_vapibuf_recv(&vbuf->dscr, &vbuf->sg_entry, vbuf->buf,
 
1516
                              VBUF_DLEN, &serv_memhandle);
 
1517
      /* we use index of the buffer to identify the buffer, this index is
 
1518
       * returned with a call to ibv_poll_cq inside the ibv_wr */
 
1519
      vbuf->dscr.wr_id = i+armci_nproc;
 
1520
      if (DEBUG_SERVER) {
 
1521
        printf("\n%d(s):posted rr with lkey=%d",armci_me,vbuf->sg_entry.lkey);
 
1522
        fflush(stdout);
 
1523
      }
 
1524
      rc = ibv_post_recv((CLN_con+i)->qp, &vbuf->dscr, &bad_wr);
 
1525
      dassert1(1,rc==0,rc);
 
1526
    }
 
1527
#endif
 
1528
 
 
1529
    if (TIME_INIT) printf("\n%d:post time for server_initial_conn is %f",
 
1530
                          armci_me, MPI_Wtime() - inittime4);
 
1531
 
 
1532
    armci_vapi_server_ready=1;
 
1533
    /* check if we can poll in the server thread */
 
1534
    enval = getenv("ARMCI_SERVER_CAN_POLL");
 
1535
    if (enval != NULL){
 
1536
       if((enval[0] != 'N') && (enval[0]!='n')) server_can_poll=1;
 
1537
    } else{
 
1538
      if(armci_clus_info[armci_clus_me].nslave < armci_getnumcpus())
 
1539
        server_can_poll=1;
 
1540
    }
 
1541
    /* server_can_poll=0; */
 
1542
 
 
1543
    if (DEBUG_SERVER) {
 
1544
       printf("%d: server connected to all clients\n",armci_me); fflush(stdout);
 
1545
    }
 
1546
    if (TIME_INIT) printf("\n%d:time for server_initial_conn is %f",
 
1547
                          armci_me, MPI_Wtime() - inittime0);
 
1548
}
 
1549
 
 
1550
static void armci_finalize_nic(vapi_nic_t *nic)
 
1551
{
 
1552
    int ret;
 
1553
 
 
1554
    ret = ibv_destroy_cq(nic->scq);
 
1555
    dassert1(1,ret==0,ret);
 
1556
    armci_vapi_check_return(DEBUG_FINALIZE,ret,"armci_finalize_nic:destroy_scq");
 
1557
 
 
1558
    ret = ibv_destroy_comp_channel(nic->sch);
 
1559
    dassert1(1,ret==0,ret);
 
1560
    armci_vapi_check_return(DEBUG_FINALIZE,ret,"armci_finalize_nic:destroy_sch");
 
1561
 
 
1562
    ret = ibv_destroy_cq(nic->rcq);
 
1563
    dassert1(1,ret==0,ret);
 
1564
    armci_vapi_check_return(DEBUG_FINALIZE,ret,"armci_finalize_nic:destroy_rcq");
 
1565
 
 
1566
    ret = ibv_destroy_comp_channel(nic->rch);
 
1567
    dassert1(1,ret==0,ret);
 
1568
    armci_vapi_check_return(DEBUG_FINALIZE,ret,"armci_finalize_nic:destroy_rch");
 
1569
 
 
1570
    ret = ibv_close_device(nic->handle);
 
1571
    dassert1(1,ret==0,ret);
 
1572
 
 
1573
    armci_vapi_check_return(DEBUG_FINALIZE,ret,"armci_finalize_nic:release_hca");
 
1574
 
 
1575
}
 
1576
 
 
1577
 
 
1578
void armci_server_transport_cleanup()
 
1579
{
 
1580
    int s;
 
1581
    int rc;
 
1582
 
 
1583
    /*first we have empty send/recv queues TBD*/
 
1584
    if(serv_malloc_buf_base){
 
1585
        rc = ibv_dereg_mr(serv_memhandle.memhndl);
 
1586
        dassert1(1,rc==0,rc);
 
1587
        armci_vapi_check_return(DEBUG_FINALIZE,rc,
 
1588
                                "armci_server_transport_cleanup:deregister_mr");
 
1589
       /*now free it*/
 
1590
       free(serv_malloc_buf_base);
 
1591
    }
 
1592
    /*now deregister all my regions from regionskk.c*/
 
1593
    armci_server_region_destroy();
 
1594
    if (CLN_con) {
 
1595
        for (s = 0; s < armci_nproc; s++) {
 
1596
            armci_connect_t *con = CLN_con + s;
 
1597
            if (con->qp) {
 
1598
                rc = ibv_destroy_qp(con->qp);
 
1599
                armci_vapi_check_return(DEBUG_FINALIZE,rc,
 
1600
                                        "armci_server_transport_cleanup:destroy_qp");
 
1601
            }
 
1602
#if 0
 
1603
            free(con->rqpnum);
 
1604
#endif
 
1605
        }
 
1606
        free(CLN_con);
 
1607
    }
 
1608
    armci_finalize_nic(CLN_nic);
 
1609
}
 
1610
 
 
1611
void armci_transport_cleanup()
 
1612
{
 
1613
    int s;
 
1614
    int rc;
 
1615
 
 
1616
    /*first deregister buffers memory */
 
1617
    if (client_malloc_buf_base) {
 
1618
        rc = ibv_dereg_mr(client_memhandle.memhndl);
 
1619
        dassert1(1,rc==0,rc);
 
1620
        armci_vapi_check_return(DEBUG_FINALIZE,rc,"armci_client_transport_cleanup:deregister_mr");
 
1621
        /*now free it*/
 
1622
        free(client_malloc_buf_base);
 
1623
    }
 
1624
    /*now deregister all my regions from regions.c*/
 
1625
    armci_region_destroy();
 
1626
    if (SRV_con) {
 
1627
        for (s = 0; s < armci_nclus; s++) {
 
1628
            armci_connect_t *con = SRV_con + s;
 
1629
            if (con->qp) {
 
1630
                rc = ibv_destroy_qp(con->qp);
 
1631
                dassert1(1,rc==0,rc);
 
1632
                armci_vapi_check_return(DEBUG_FINALIZE,rc,"armci_client_transport_cleanup:destroy_qp");
 
1633
            }
 
1634
#if 0
 
1635
            free(con->rqpnum);
 
1636
#endif
 
1637
        }
 
1638
        free(SRV_con);
 
1639
    }
 
1640
    armci_finalize_nic(SRV_nic);
 
1641
}
 
1642
 
 
1643
/** Post an immediate buffer back for the client to send.
 
1644
 */
 
1645
static void _armci_pendbuf_post_immbuf(vapibuf_t *vbuf, int to) {
 
1646
  int rc; 
 
1647
  struct ibv_recv_wr *bad_wr;
 
1648
#if defined(PEND_BUFS)
 
1649
  assert(vbuf->dscr.wr_id == vbuf-serv_buf_arr[0]+DSCRID_IMMBUF_RECV);
 
1650
#endif
 
1651
  rc = ibv_post_recv((CLN_con+to)->qp, &(vbuf->dscr), &bad_wr);
 
1652
  dassert1(1,rc==0,rc);
 
1653
}
 
1654
 
 
1655
#if defined(PEND_BUFS)
 
1656
#define DSCRID_TO_IMMBUFID(x) (x-DSCRID_IMMBUF_RECV)
 
1657
#else
 
1658
#define DSCRID_TO_IMMBUFID(x) ((x)-armci_nproc)
 
1659
#endif
 
1660
 
 
1661
#if defined(PEND_BUFS)
 
1662
 
 
1663
/**Obtain a message receive buffer to receive a message. Used in place
 
1664
 *  of MessageRcvBuffer. Should not be used.
 
1665
 */
 
1666
char *armci_openib_get_msg_rcv_buf(int proc)
 
1667
{
 
1668
  armci_die("PEND_BUFS in OPENIB: MessageRcvBuffer not available. Should use the in-place buffers to receive data", proc);
 
1669
  return NULL;
 
1670
}
 
1671
 
 
1672
/** Check that the data is in a server allocated buffer. This is
 
1673
 *   guaranteed to be pinned. Ideally, this should always be true. Any
 
1674
 *   operation that request alternative support will have to fix this
 
1675
 *   function and possibly @armci_openib_get_msg_rcv_buf().
 
1676
 * @param br IN Buffer pointer being checked
 
1677
 * @return 1 if it is a server-allocated buffer. 0 otherwise.
 
1678
 */
 
1679
int armci_data_in_serv_buf(void *br)
 
1680
{
 
1681
  if(br>=(void *)serv_malloc_buf_base && br<(void *)serv_tail)
 
1682
    return 1;
 
1683
  if(DEBUG_SERVER) {
 
1684
    printf("%d:: serv_bufs=%p<->%p. br=%p out of range\n",
 
1685
           armci_me, serv_malloc_buf_base, serv_tail, br); 
 
1686
    fflush(stdout);
 
1687
  }
 
1688
  return 0;
 
1689
}
 
1690
 
 
1691
#define PBUF_BUFID_TO_PUT_WRID(_pbufid) (DSCRID_PENDBUF+(_pbufid)*2)
 
1692
#define PBUF_BUFID_TO_GET_WRID(_pbufid) (DSCRID_PENDBUF+(_pbufid)*2+1)
 
1693
#define PBUF_WRID_TO_PBUFID(_id) (((_id)-DSCRID_PENDBUF)/2)
 
1694
#define PBUF_IS_GET_WRID(_id) (((_id)-DSCRID_PENDBUF)&1)
 
1695
#define PBUF_IS_PUT_WRID(_id) (!(((_id)-DSCRID_PENDBUF)&1))
 
1696
 
 
1697
/**Complete processing this immediate buffer. Parameters is void *,
 
1698
 * since vapibuf_t*|immbuf_t* is not available in armci-vapi.h
 
1699
 */
 
1700
void armci_complete_immbuf(void *buf) {
 
1701
  vapibuf_t *vbuf = (vapibuf_t*)buf;
 
1702
  request_header_t *msginfo=(request_header_t*)vbuf->buf;
 
1703
  
 
1704
#if SRI_CORRECT
 
1705
  vbuf->send_pending = 0;
 
1706
#else
 
1707
    _armci_pendbuf_post_immbuf(vbuf,msginfo->from);
 
1708
#endif
 
1709
  armci_data_server(vbuf);
 
1710
  if(msginfo->operation==PUT || ARMCI_ACC(msginfo->operation)) {
 
1711
    SERVER_SEND_ACK(msginfo->from);
 
1712
  }  
 
1713
#if SRI_CORRECT
 
1714
  if(!vbuf->send_pending) {
 
1715
    _armci_pendbuf_post_immbuf(vbuf,msginfo->from);
 
1716
  }
 
1717
#endif
 
1718
}
 
1719
 
 
1720
/**Complete processing this pending buffer. Parameters is void *,
 
1721
 * since vapibuf_t*|immbuf_t* is not available in armci-vapi.h. Note
 
1722
 * that the pending buffer may not yet be available for reuse. This
 
1723
 * will depend on the state of the pending buffer (which might have to
 
1724
 * wait for a communication innitiated by armci_data_server() to
 
1725
 * complete.
 
1726
 */
 
1727
void armci_complete_pendbuf(void *buf) {
 
1728
  vapibuf_pend_t *pbuf = (vapibuf_pend_t *)buf;
 
1729
  request_header_t *msginfo=(request_header_t*)pbuf->buf;
 
1730
 
 
1731
  assert(pbuf->vbuf);
 
1732
#if SRI_CORRECT
 
1733
  pbuf->vbuf->send_pending=0;
 
1734
#else
 
1735
  _armci_pendbuf_post_immbuf(pbuf->vbuf,msginfo->from);
 
1736
#endif
 
1737
  armci_data_server(pbuf);
 
1738
  if(msginfo->operation==PUT || ARMCI_ACC(msginfo->operation)) {
 
1739
    SERVER_SEND_ACK(msginfo->from);
 
1740
  }
 
1741
#if SRI_CORRECT
 
1742
#error  
 
1743
 assert(!pbuf->vbuf->send_pending);
 
1744
  _armci_pendbuf_post_immbuf(pbuf->vbuf,msginfo->from);
 
1745
#endif
 
1746
}
 
1747
 
 
1748
void _armci_get_data_from_client(int proc, struct ibv_send_wr *sdscr, 
 
1749
                                 int dscrid, struct ibv_sge *ssg_entry, 
 
1750
                                 void *rbuf, void *lbuf, int bytes) ;
 
1751
void _armci_send_data_to_client_pbuf(int proc, struct ibv_send_wr *sdscr, 
 
1752
                                     int dscrid, struct ibv_sge *ssg_entry, 
 
1753
                                     void *rbuf, void *lbuf, int bytes);
 
1754
 
 
1755
int no_srv_copy_nsegs_ulimit() {
 
1756
  return armci_max_qp_ous_swr*armci_max_num_sg_ent/10;
 
1757
}
 
1758
 
 
1759
/** Initiate a get operation to progress a pending buffer.
 
1760
 * @param msginfo Request header for any additional processing
 
1761
 * @param src Pointer to src of data (remote for GET)
 
1762
 * @param dst Pointer to dst
 
1763
 * @param bytes #bytes to transfer
 
1764
 * @param proc proc to transfer from(for get)/to(for put)
 
1765
 * @param pbufid Index of pending buffer
 
1766
 */
 
1767
void armci_pbuf_start_get(void *msg_info, void *src, void *dst, 
 
1768
                          int bytes, int proc, int pbufid) {
 
1769
  struct ibv_send_wr sdscr;
 
1770
  struct ibv_sge sg_entry;
 
1771
  int wrid = PBUF_BUFID_TO_GET_WRID(pbufid);
 
1772
  request_header_t *msginfo=(request_header_t *)msg_info;
 
1773
  void armci_server_rdma_contig_to_strided(char *src_ptr, int proc,
 
1774
                                           char *dst_ptr, 
 
1775
                                           int dst_stride_arr[],
 
1776
                                           int seg_count[],
 
1777
                                           int stride_levels,
 
1778
                                           request_header_t *msginfo);
 
1779
 
 
1780
 
 
1781
#if defined(PUT_NO_SRV_COPY)
 
1782
  if(msginfo->operation==PUT && msginfo->format==STRIDED 
 
1783
     && !msginfo->pinned && src==msginfo->tag.data_ptr)   {
 
1784
    char *loc_ptr, *rem_ptr;
 
1785
    int stride_levels, *count;
 
1786
    int *loc_stride_arr;
 
1787
    char *dscr = (char *)(msginfo+1);
 
1788
    ARMCI_MEMHDL_T *mhloc=NULL;
 
1789
    int nsegs, i;
 
1790
 
 
1791
    /* unpack descriptor record */
 
1792
    loc_ptr = *(void**)dscr;           dscr += sizeof(void*);
 
1793
    stride_levels = *(int*)dscr;       dscr += sizeof(int);
 
1794
    loc_stride_arr = (int*)dscr;       dscr += stride_levels*sizeof(int);
 
1795
    count = (int*)dscr;
 
1796
 
 
1797
    rem_ptr = msginfo->tag.data_ptr;
 
1798
 
 
1799
    nsegs = 1;
 
1800
    for(i=0; i<stride_levels; i++) 
 
1801
      nsegs *= count[i+1];    
 
1802
 
 
1803
    dassert(1,proc==msginfo->from);
 
1804
    if(nsegs<no_srv_copy_nsegs_ulimit() &&
 
1805
       get_armci_region_local_hndl(loc_ptr,armci_clus_id(armci_me),&mhloc)) {
 
1806
/*       printf("%d(s): direct rdma from client buffers to server-side memory\n",armci_me); */
 
1807
/*       fflush(stdout); */
 
1808
   
 
1809
      armci_server_rdma_contig_to_strided(rem_ptr, proc,
 
1810
                                          loc_ptr,loc_stride_arr,
 
1811
                                          count, stride_levels,
 
1812
                                          msginfo);
 
1813
    return;
 
1814
   }
 
1815
  }
 
1816
#endif
 
1817
/*   printf("%d(s): rdma from client buffers to pending buffers\n",armci_me); */
 
1818
/*   fflush(stdout);   */
 
1819
  _armci_get_data_from_client(proc,&sdscr,wrid,&sg_entry,src,dst,bytes);
 
1820
}
 
1821
 
 
1822
/** Initiate a put operation to progress a pending buffer.
 
1823
 * @param src Pointer to src of data (local for PUT)
 
1824
 * @param dst Pointer to dst
 
1825
 * @param bytes #bytes to transfer
 
1826
 * @param proc proc to transfer from(for get)/to(for put)
 
1827
 * @param pbufid Index of pending buffer
 
1828
 */
 
1829
void armci_pbuf_start_put(void *src, void *dst, int bytes, int proc, 
 
1830
                          int pbufid) {
 
1831
  struct ibv_send_wr sdscr;
 
1832
  struct ibv_sge sg_entry;
 
1833
  int wrid = PBUF_BUFID_TO_PUT_WRID(pbufid);
 
1834
 
 
1835
  _armci_send_data_to_client_pbuf(proc,&sdscr,wrid,&sg_entry,src,dst,bytes);
 
1836
}
 
1837
 
 
1838
/**
 
1839
  * function to get data from remote client called by data
 
1840
  * server. Note that this is only called for pending buffers.
 
1841
  * @param proc IN the id of remote client
 
1842
  * @param sdscr IN/OUT Descriptor to be used to post the get
 
1843
  * @param dscrid IN ID to be used for the descriptor
 
1844
  * @param ssg_entry IN Scatter/gather list
 
1845
  * @param rbuf IN the remote buffer to get from
 
1846
  * @param lbuf IN local buf to get the data into, this is the queue buffer for SERVER_QUEUE path
 
1847
  * @param bytes IN the size of get
 
1848
  * @see SERVER_QUEUE
 
1849
  * @see armci_send_data_to_client
 
1850
  */
 
1851
/*static*/ void _armci_get_data_from_client(int proc, struct ibv_send_wr *sdscr, 
 
1852
                                int dscrid, struct ibv_sge *ssg_entry, 
 
1853
                                void *rbuf, void *lbuf, int bytes) 
 
1854
{
 
1855
    int rc = 0;
 
1856
 
 
1857
    if(DEBUG_SERVER){
 
1858
       printf("\n%d(s):sending data to client %d at %p flag = %p bytes=%d\n",
 
1859
               armci_me,
 
1860
               proc,lbuf,(char *)lbuf+bytes-sizeof(int),bytes);fflush(stdout);
 
1861
    }
 
1862
 
 
1863
    memset(sdscr,0,sizeof(struct ibv_send_wr));
 
1864
    armci_init_vbuf_rrdma(sdscr,ssg_entry,lbuf,rbuf,bytes,
 
1865
                          &serv_memhandle,(handle_array+proc));
 
1866
 
 
1867
    if(DEBUG_SERVER){
 
1868
       printf("\n%d(s):handle_array[%d]=%p lbuf=%p flag=%p bytes=%d\n",armci_me,
 
1869
              proc,&handle_array[proc],(char *)lbuf,
 
1870
              (char *)lbuf+bytes-sizeof(int),bytes);
 
1871
       fflush(stdout);
 
1872
    }
 
1873
 
 
1874
    assert(sizeof(request_header_t)+bytes<PENDING_BUF_LEN);
 
1875
 
 
1876
    sdscr->wr_id = dscrid;
 
1877
    struct ibv_send_wr *bad_wr;
 
1878
    rc = ibv_post_send((CLN_con+proc)->qp, sdscr, &bad_wr);
 
1879
    dassert1(1,rc==0,rc);
 
1880
}
 
1881
 
 
1882
void _armci_send_data_to_client_pbuf(int proc, struct ibv_send_wr *sdscr, 
 
1883
                                     int dscrid, struct ibv_sge *ssg_entry, 
 
1884
                                     void *rbuf, void *lbuf, int bytes)  {
 
1885
    int rc = 0;
 
1886
 
 
1887
    if(DEBUG_SERVER) {
 
1888
       printf("\n%d(s):sending data to client %d at %p flag = %p bytes=%d\n",
 
1889
               armci_me,
 
1890
               proc,rbuf,(char *)rbuf+bytes-sizeof(int),bytes);fflush(stdout);
 
1891
    }
 
1892
    memset(sdscr,0,sizeof(struct ibv_send_wr));
 
1893
    armci_init_vbuf_srdma(sdscr,ssg_entry,lbuf,rbuf,bytes,
 
1894
                          &serv_memhandle,(handle_array+proc));
 
1895
    if(DEBUG_SERVER){
 
1896
       printf("\n%d(s):handle_array[%d]=%p dbuf=%p flag=%p bytes=%d\n",armci_me,
 
1897
              proc,&handle_array[proc],(char *)rbuf,
 
1898
              (char *)rbuf+bytes-sizeof(int),bytes);
 
1899
       fflush(stdout);
 
1900
    }
 
1901
    sdscr->wr_id = dscrid;
 
1902
    struct ibv_send_wr *bad_wr;
 
1903
    rc = ibv_post_send((CLN_con+proc)->qp, sdscr, &bad_wr);
 
1904
    dassert1(1,rc==0,rc);
 
1905
}
 
1906
#endif
 
1907
 
 
1908
#define DATA_SERVER_YIELD_CPU
 
1909
void armci_call_data_server()
 
1910
{
 
1911
int rc = 0;
 
1912
int rc1 = 0;
 
1913
vapibuf_t *vbuf,*vbufs;
 
1914
request_header_t *msginfo,*msg;
 
1915
int c,i,need_ack,pollcount;
 
1916
static int mytag=1;
 
1917
static int doineednotify=1;
 
1918
int rrr,serverwcount=0;
 
1919
 
 
1920
#ifdef CHANGE_SERVER_AFFINITY
 
1921
cpu_set_t mycpuid,new_mask;
 
1922
char str[CPU_SETSIZE];
 
1923
char cid[8];
 
1924
extern char * cpuset_to_cstr(cpu_set_t *mask, char *str);
 
1925
int nslave=armci_clus_info[armci_clus_me].nslave;
 
1926
    rrr=sched_getaffinity(0, sizeof(mycpuid), &mycpuid);
 
1927
#endif
 
1928
 
 
1929
/* #ifdef ARMCI_ENABLE_GPC_CALLS */
 
1930
/*     unblock_thread_signal(GPC_COMPLETION_SIGNAL); */
 
1931
/* #endif */
 
1932
#if defined(PEND_BUFS)
 
1933
    armci_pendbuf_init(); 
 
1934
#endif
 
1935
 
 
1936
    for (;;) {
 
1937
      struct ibv_wc *pdscr=NULL;
 
1938
      struct ibv_wc pdscr1;
 
1939
      pdscr = &pdscr1;
 
1940
      pdscr->status = IBV_WC_SUCCESS;
 
1941
      rc = 0;
 
1942
#ifdef CHANGE_SERVER_AFFINITY
 
1943
      static int ccc;
 
1944
      serverwcount++;
 
1945
      if(serverwcount==100){
 
1946
        serverwcount=0;
 
1947
        ccc=(ccc+1)%nslave;
 
1948
        sprintf (cid, "%d", ccc);
 
1949
        rrr = cstr_to_cpuset(&new_mask,cid);
 
1950
        if (sched_setaffinity(0, sizeof (new_mask), &new_mask)) {
 
1951
          perror("sched_setaffinity");
 
1952
          printf("failed to set pid %d's affinity.\n", getpid());
 
1953
        }
 
1954
        rrr=sched_getaffinity(0, sizeof(mycpuid), &mycpuid);
 
1955
        if(rrr)perror("sched_getaffinity");
 
1956
      }
 
1957
#else
 
1958
#ifdef DATA_SERVER_YIELD_CPU_
 
1959
      serverwcount++;
 
1960
      if(serverwcount==50){
 
1961
        serverwcount=0;usleep(1);
 
1962
      }
 
1963
#endif
 
1964
#endif
 
1965
 
 
1966
/* #ifdef ARMCI_ENABLE_GPC_CALLS */
 
1967
/*       block_thread_signal(GPC_COMPLETION_SIGNAL); */
 
1968
/* #endif */
 
1969
      bzero(pdscr, sizeof(*pdscr));
 
1970
      if (server_can_poll) {
 
1971
        rc = ibv_poll_cq(CLN_nic->rcq, 1, pdscr);
 
1972
        while (rc == 0) {
 
1973
          rc = ibv_poll_cq(CLN_nic->rcq, 1, pdscr);
 
1974
          if (armci_server_terminating) {
 
1975
            /* server got interrupted when clients terminate connections */
 
1976
            armci_server_transport_cleanup();
 
1977
            sleep(1);
 
1978
            _exit(0);
 
1979
          }
 
1980
        }
 
1981
      } else {
 
1982
          rc = ibv_poll_cq(CLN_nic->rcq, 1, pdscr);
 
1983
          if(rc==0){doineednotify=1;/*continue;*/}
 
1984
          if(doineednotify){
 
1985
            rc1 = ibv_req_notify_cq(CLN_nic->rcq, 0);
 
1986
            dassert1(1,rc1==0,rc1);
 
1987
            rc1=ibv_get_cq_event(CLN_nic->rch,&CLN_nic->rcq,&CLN_nic->rcq_cntx);
 
1988
            dassert1(1,rc1==0,rc1);
 
1989
            doineednotify=0;
 
1990
            ibv_ack_cq_events(CLN_nic->rcq, 1);
 
1991
            rc = ibv_poll_cq(CLN_nic->rcq, 1, pdscr);
 
1992
          }
 
1993
 
 
1994
          if (armci_server_terminating) {
 
1995
            /* server got interrupted when clients terminate connections */
 
1996
            armci_server_transport_cleanup();
 
1997
            sleep(1);
 
1998
            _exit(0);
 
1999
          }
 
2000
      }
 
2001
 
 
2002
      if(DEBUG_SERVER) {
 
2003
        printf("\n%d:pdscr=%p %p %d %d %d %d\n",armci_me,pdscr,&pdscr1,
 
2004
                           pdscr->status,pdscr->opcode,pdscr->vendor_err,
 
2005
                           pdscr->src_qp);
 
2006
        fflush(stdout);
 
2007
      }
 
2008
      dassertp(1,rc>=0,("%d: rc=%d id=%d status=%d",
 
2009
                        armci_me,rc,(int)pdscr->wr_id,pdscr->status));
 
2010
      dassert1(1,pdscr->status==IBV_WC_SUCCESS,pdscr->status);
 
2011
                            
 
2012
       if (DEBUG_SERVER) {
 
2013
         printf("%d(s) : NEW MESSAGE bytelen %d \n",armci_me,pdscr->byte_len);
 
2014
         printf("%d(s) : NEW MESSAGE id is %ld \n",armci_me,pdscr->wr_id);
 
2015
         fflush(stdout);
 
2016
       }
 
2017
#if defined(PEND_BUFS)
 
2018
      if(pdscr->wr_id>=DSCRID_IMMBUF_RESP && pdscr->wr_id<DSCRID_IMMBUF_RESP_END) {
 
2019
/*      fprintf(stderr, "%d(s) : Got server response msg completion\n", armci_me); */
 
2020
#if SRI_CORRECT
 
2021
        int id = pdscr->wr_id - DSCRID_IMMBUF_RESP;
 
2022
        if(id>=0 && id<armci_nproc*(IMM_BUF_NUM+1)) {
 
2023
          int dest = id/(IMM_BUF_NUM+1);
 
2024
          dassert(1,serv_buf_arr[id]->send_pending==1);
 
2025
          serv_buf_arr[id]->send_pending = 0;
 
2026
          _armci_pendbuf_post_immbuf(serv_buf_arr[id],dest);
 
2027
        }
 
2028
#endif
 
2029
        continue;
 
2030
      }
 
2031
       if (pdscr->wr_id>=DSCRID_PENDBUF && pdscr->wr_id<DSCRID_PENDBUF_END) {
 
2032
         int pbufid = PBUF_WRID_TO_PBUFID(pdscr->wr_id);
 
2033
/*       printf("%d(s) : Progressing pending msg (something completed) pbufid=%d id=%ld byte_len=%d status=%d\n", armci_me, pbufid,pdscr->wr_id,pdscr->byte_len,done_status); */
 
2034
/*       fflush(stdout); */
 
2035
         if(PBUF_IS_GET_WRID(pdscr->wr_id))
 
2036
           armci_pendbuf_done_get(pbufid);
 
2037
         else if(PBUF_IS_PUT_WRID(pdscr->wr_id))
 
2038
           armci_pendbuf_done_put(pbufid);
 
2039
         else
 
2040
           armci_die("Pending buffer op completed. But not PUT or GET!",pdscr->wr_id);
 
2041
         continue;
 
2042
       }
 
2043
#endif
 
2044
       if (pdscr->wr_id >= DSCRID_SCATGAT && pdscr->wr_id < DSCRID_SCATGAT_END) {
 
2045
         sr_descr_t *sdscr_arr, *rdscr_arr;
 
2046
         if (DEBUG_SERVER) {
 
2047
           printf("%d(s) : received SCATGAT DATA id = %ld, length = %d\n",
 
2048
                  armci_me,pdscr->wr_id, pdscr->byte_len);
 
2049
           fflush(stdout);
 
2050
         }
 
2051
#if defined(PEND_BUFS)
 
2052
         sdscr_arr = armci_vapi_serv_nbsdscr_array;
 
2053
         assert(sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends>0);
 
2054
         sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends--;
 
2055
         if(sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofsends==0)
 
2056
             sdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].tag=0;
 
2057
#else
 
2058
         rdscr_arr;
 
2059
         rdscr_arr = armci_vapi_serv_nbrdscr_array;
 
2060
         rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofrecvs--;
 
2061
         if(rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].numofrecvs==0)
 
2062
           rdscr_arr[pdscr->wr_id-DSCRID_SCATGAT].tag=0;
 
2063
#endif
 
2064
         continue;
 
2065
       }
 
2066
 
 
2067
#if defined(PEND_BUFS)
 
2068
       assert(pdscr->wr_id>=DSCRID_IMMBUF_RECV && pdscr->wr_id<DSCRID_IMMBUF_RECV_END);
 
2069
#endif
 
2070
       vbuf = serv_buf_arr[DSCRID_TO_IMMBUFID(pdscr->wr_id)];
 
2071
       assert(vbuf->dscr.wr_id == pdscr->wr_id);
 
2072
 
 
2073
       msginfo = (request_header_t*)vbuf->buf;
 
2074
       armci_ack_proc = c = msginfo->from;
 
2075
 
 
2076
       if (DEBUG_SERVER) {
 
2077
         printf("%d(s) : request id is %ld operation is %d, length is %d from=%d vbuf->dscr.wr_id=%d\n",
 
2078
                armci_me,pdscr->wr_id,msginfo->operation,pdscr->byte_len,msginfo->from, (int)vbuf->dscr.wr_id);
 
2079
         fflush(stdout);
 
2080
       }
 
2081
 
 
2082
#if defined(PEND_BUFS)
 
2083
       vbufs = vbuf;
 
2084
       armci_init_vapibuf_recv(&vbufs->dscr, &vbufs->sg_entry,vbufs->buf,
 
2085
                               IMM_BUF_LEN, &serv_memhandle);
 
2086
       vbufs->dscr.wr_id = pdscr->wr_id;
 
2087
#else
 
2088
       vbufs = serv_buf_arr[pdscr->wr_id - armci_nproc] = spare_serv_buf;
 
2089
       armci_init_vapibuf_recv(&vbufs->dscr, &vbufs->sg_entry,vbufs->buf,
 
2090
                               VBUF_DLEN, &serv_memhandle);
 
2091
       vbufs->dscr.wr_id = c + armci_nproc;
 
2092
 
 
2093
       spare_serv_buf = vbuf;
 
2094
#endif
 
2095
 
 
2096
       if(DEBUG_SERVER) {
 
2097
         printf("%d(s):Came out of poll id=%ld\n",armci_me,pdscr->wr_id);
 
2098
         fflush(stdout);
 
2099
       }
 
2100
 
 
2101
       if(msginfo->operation == PUT &&msginfo->pinned == 1){
 
2102
         int found, num;
 
2103
         int stride_arr[MAX_STRIDE_LEVEL]; /*should be MAX_STRIDE_LEVELS*/
 
2104
         int count[MAX_STRIDE_LEVEL];
 
2105
         void *dest_ptr;
 
2106
         int stride_levels;
 
2107
         ARMCI_MEMHDL_T *loc_memhandle;
 
2108
         void armci_post_scatter(void *,int *,int *,int, armci_vapi_memhndl_t *,int,int,int,sr_descr_t **);
 
2109
 
 
2110
         /*unpack decsriptor_record : should call a function instead */
 
2111
         msg = msginfo + 1;
 
2112
         test_ptr = dest_ptr = *(void**)msg;
 
2113
         msg = (request_header_t *) ((char*)msg + sizeof(void*));
 
2114
         test_stride_levels=stride_levels = *(int*)msg;
 
2115
         msg = (request_header_t *) ((char*)msg + sizeof(int));
 
2116
         for(i =0; i<stride_levels; i++){
 
2117
           test_stride_arr[i] = stride_arr[i] = *(int*)msg;
 
2118
           msg = (request_header_t*) ((int*)msg + 1);
 
2119
         }
 
2120
         for(i=0; i<stride_levels+1; i++){
 
2121
           test_count[i] = count[i] = *(int*)msg;
 
2122
           msg = (request_header_t*) ((int*)msg + 1);
 
2123
         }
 
2124
 
 
2125
         if (DEBUG_SERVER) {
 
2126
           printf(" server:the dest_ptr is %p\n", dest_ptr);
 
2127
           for(i =0; i<stride_levels; i++)
 
2128
             printf("stride_arr[i] is %d,value of count[i] is %d\n",
 
2129
                    stride_arr[i], count[i]);
 
2130
           printf("the value of stride_levels is %d\n", stride_levels);
 
2131
           fflush(stdout);
 
2132
         }
 
2133
 
 
2134
         found =get_armci_region_local_hndl(dest_ptr,armci_me, &loc_memhandle);
 
2135
         dassertp(1,found!=0,("%d:SERVER : local region not found id=%d",
 
2136
                              armci_me,pdscr->wr_id));
 
2137
 
 
2138
         if(DEBUG_SERVER) {
 
2139
           printf("%d(s) : about to call armci_post_scatter\n",armci_me);
 
2140
           fflush(stdout);
 
2141
         }
 
2142
 
 
2143
         armci_post_scatter(dest_ptr, stride_arr, count, stride_levels,
 
2144
                            loc_memhandle,msginfo->from, mytag, SERV,NULL );
 
2145
 
 
2146
         mytag = (mytag+1)%(MAX_PENDING);
 
2147
         if(mytag==0)mytag=1;
 
2148
 
 
2149
         if(DEBUG_SERVER) {
 
2150
           printf("%d(s) : finished posting %d scatter\n",armci_me,num);
 
2151
           fflush(stdout);
 
2152
         }
 
2153
         _armci_pendbuf_post_immbuf(vbufs, msginfo->from);
 
2154
         SERVER_SEND_ACK(msginfo->from);
 
2155
         need_ack = 0;
 
2156
       }
 
2157
       else if(msginfo->operation == REGISTER){
 
2158
         if (DEBUG_SERVER) {
 
2159
            printf("%d(s) : Register_op id is %d, comp_dscr_id is  %ld\n",
 
2160
                     armci_me,msginfo->operation,pdscr->wr_id);
 
2161
            fflush(stdout);
 
2162
          }
 
2163
 
 
2164
          armci_server_register_region(*((void **)(msginfo+1)),
 
2165
                           *((long *)((char *)(msginfo+1)+sizeof(void *))),
 
2166
                           (ARMCI_MEMHDL_T *)(msginfo->tag.data_ptr));
 
2167
          _armci_pendbuf_post_immbuf(vbufs, msginfo->from);
 
2168
          *(long *)(msginfo->tag.ack_ptr) = ARMCI_STAMP;
 
2169
          continue;
 
2170
       }
 
2171
       else {
 
2172
         if(DEBUG_SERVER) {
 
2173
           printf("%d(s) : request is %ld about to call armci_data_server\n",
 
2174
                  armci_me, pdscr->wr_id);
 
2175
           fflush(stdout);
 
2176
         }
 
2177
#if defined(PEND_BUFS)
 
2178
         armci_pendbuf_service_req(vbuf);
 
2179
#else
 
2180
         _armci_pendbuf_post_immbuf(vbufs, msginfo->from);
 
2181
         armci_data_server(vbuf);
 
2182
         
 
2183
         if((msginfo->operation == PUT) || ARMCI_ACC(msginfo->operation)) { 
 
2184
           /* for operations that do not send data back we can send ACK now */
 
2185
           SERVER_SEND_ACK(msginfo->from);
 
2186
           need_ack=0;
 
2187
           if(DEBUG_SERVER){
 
2188
             printf("%d(s) : posted ack\n\n",armci_me);
 
2189
             fflush(stdout);
 
2190
           }
 
2191
         } else need_ack=1;
 
2192
#endif
 
2193
       }
 
2194
       if (0) {
 
2195
         printf("%d(s):Done processed request\n\n",armci_me);
 
2196
         fflush(stdout);
 
2197
       }
 
2198
       
 
2199
/* #ifdef ARMCI_ENABLE_GPC_CALLS */
 
2200
/*        unblock_thread_signal(GPC_COMPLETION_SIGNAL); */
 
2201
/* #endif */
 
2202
   }/* end of for */
 
2203
}
 
2204
 
 
2205
 
 
2206
void armci_vapi_complete_buf(armci_vapi_field_t *field,int snd,int rcv,int to,int op) {
 
2207
  struct ibv_send_wr *snd_dscr;
 
2208
 
 
2209
  BUF_INFO_T *info;
 
2210
  info = (BUF_INFO_T *)((char *)field-sizeof(BUF_INFO_T));
 
2211
 
 
2212
  if(info->tag && op==GET)return;
 
2213
 
 
2214
  if(snd){
 
2215
    request_header_t *msginfo = (request_header_t *)(field+1);
 
2216
    snd_dscr=&(field->sdscr);
 
2217
    if(mark_buf_send_complete[snd_dscr->wr_id]==0)
 
2218
      armci_send_complete(snd_dscr,"armci_vapi_complete_buf",1);
 
2219
  }
 
2220
 
 
2221
  if(rcv){
 
2222
    int *last;
 
2223
    long *flag;
 
2224
    int loop = 0;
 
2225
    request_header_t *msginfo = (request_header_t *)(field+1);
 
2226
    flag = (long *)&msginfo->tag.ack;
 
2227
 
 
2228
  
 
2229
    if(op==PUT || ARMCI_ACC(op)){
 
2230
      if(msginfo->bypass && msginfo->pinned && msginfo->format == STRIDED &&
 
2231
         op == PUT);
 
2232
      else{
 
2233
        while(armci_util_long_getval(flag) != ARMCI_STAMP) {
 
2234
          loop++;
 
2235
          loop %=100000;
 
2236
          if(loop==0){
 
2237
          }
 
2238
        }
 
2239
      }
 
2240
      /*         printf("%d: client complete_buf. op=%d loop=%d till *flag=ARMCI_STAMP\n", armci_me,op,loop); */
 
2241
      /*         fflush(stdout); */
 
2242
      *flag = 0L;
 
2243
    }
 
2244
    else{
 
2245
      /*SK: I think we get here only for GET with result directly
 
2246
        going to client's pinned memory. (info.tag==0 && op==GET)*/
 
2247
      last = (int *)((char *)msginfo+msginfo->datalen-sizeof(int));
 
2248
      while(armci_util_int_getval(last) == ARMCI_STAMP &&
 
2249
            armci_util_long_getval(flag)  != ARMCI_STAMP){
 
2250
        loop++;
 
2251
        loop %=100000;
 
2252
        if(loop==0){
 
2253
          if(DEBUG_CLN){
 
2254
            printf("%d: client last(%p)=%d flag(%p)=%ld off=%d\n",
 
2255
                   armci_me,last,*last,flag,*flag,msginfo->datalen);
 
2256
            fflush(stdout);
 
2257
          }
 
2258
        }
 
2259
      }
 
2260
    }
 
2261
  }
 
2262
}
 
2263
 
 
2264
void armci_vapi_test_buf(armci_vapi_field_t *field,int snd,int rcv,int to,int op, int *retval) {
 
2265
  struct ibv_send_wr *snd_dscr;
 
2266
 
 
2267
  BUF_INFO_T *info;
 
2268
  info = (BUF_INFO_T *)((char *)field-sizeof(BUF_INFO_T));
 
2269
 
 
2270
  *retval = 0;
 
2271
 
 
2272
  if(info->tag && op==GET)return;
 
2273
 
 
2274
  if(snd){
 
2275
    request_header_t *msginfo = (request_header_t *)(field+1);
 
2276
    snd_dscr=&(field->sdscr);
 
2277
    if(mark_buf_send_complete[snd_dscr->wr_id]==0) {
 
2278
/*       printf("%d: test buf. send not complete\n",armci_me); */
 
2279
/*       fflush(stdout); */
 
2280
      return;
 
2281
    }
 
2282
  }
 
2283
 
 
2284
  if(rcv){
 
2285
    int *last;
 
2286
    long *flag;
 
2287
    int loop = 0;
 
2288
    request_header_t *msginfo = (request_header_t *)(field+1);
 
2289
    flag = (long *)&msginfo->tag.ack;
 
2290
  
 
2291
    if(op==PUT || ARMCI_ACC(op)){
 
2292
      if(msginfo->bypass && msginfo->pinned && msginfo->format == STRIDED &&
 
2293
         op == PUT) 
 
2294
        *retval=1;
 
2295
      else{
 
2296
        if(armci_util_long_getval(flag) == ARMCI_STAMP) {
 
2297
          *retval = 1;
 
2298
        }
 
2299
      }
 
2300
      return;
 
2301
    }
 
2302
    else{
 
2303
      /*SK: I think we get here only for GET with result directly
 
2304
        going to client's pinned memory. (info.tag==0 && op==GET)*/
 
2305
      last = (int *)((char *)msginfo+msginfo->datalen-sizeof(int));
 
2306
      if(armci_util_int_getval(last) != ARMCI_STAMP ||
 
2307
            armci_util_long_getval(flag)  == ARMCI_STAMP){
 
2308
        *retval=1;
 
2309
      }
 
2310
      return;
 
2311
    }
 
2312
  }
 
2313
}
 
2314
 
 
2315
 
 
2316
static inline void armci_vapi_post_send(int isclient,int con_offset,
 
2317
                                        struct ibv_send_wr *snd_dscr,char *from)
 
2318
{
 
2319
    int rc = 0;
 
2320
    vapi_nic_t *nic;
 
2321
    armci_connect_t *con;
 
2322
    int total = 0;
 
2323
 
 
2324
    if(!isclient){
 
2325
       nic = CLN_nic;
 
2326
       con = CLN_con+con_offset;
 
2327
    }
 
2328
    else{
 
2329
       nic = SRV_nic;
 
2330
       con = SRV_con+con_offset;
 
2331
    }
 
2332
 
 
2333
    if(DEBUG_CLN){
 
2334
       printf("vapi_post_send: snd_dscr->num_sge=%d, snd_dscr->sg_list->length=%d\n",
 
2335
              snd_dscr->num_sge, snd_dscr->sg_list->length);
 
2336
       fflush(stdout);
 
2337
    }
 
2338
 
 
2339
 
 
2340
    /* find the total length of all the segments */
 
2341
    total = snd_dscr->sg_list->length * snd_dscr->num_sge;
 
2342
    if(DEBUG_CLN){
 
2343
       printf("%d(c) : total is %d\t, max_size is %d\n",armci_me,total,
 
2344
                    armci_vapi_max_inline_size);
 
2345
    }
 
2346
 
 
2347
    struct ibv_send_wr *bad_wr;
 
2348
    if (total > armci_vapi_max_inline_size) {
 
2349
        rc = ibv_post_send(con->qp, snd_dscr, &bad_wr);
 
2350
    } else {
 
2351
        rc = ibv_post_send(con->qp, snd_dscr, &bad_wr);
 
2352
        /* no corresponding call, using ibv_post_send
 
2353
       rc = EVAPI_post_inline_sr(nic->handle,con->qp,snd_dscr);*/
 
2354
    }
 
2355
    dassert1(1,rc==0,rc);
 
2356
}
 
2357
 
 
2358
/** Send request to server. 
 
2359
  */
 
2360
int armci_send_req_msg(int proc, void *buf, int bytes)
 
2361
{
 
2362
  int cluster = armci_clus_id(proc), i;
 
2363
    request_header_t *msginfo = (request_header_t *)buf;
 
2364
    struct ibv_send_wr *snd_dscr;
 
2365
    struct ibv_sge *ssg_lst;
 
2366
 
 
2367
    THREAD_LOCK(armci_user_threads.net_lock);   
 
2368
 
 
2369
    snd_dscr = BUF_TO_SDESCR((char *)buf);
 
2370
    ssg_lst  = BUF_TO_SSGLST((char *)buf);
 
2371
 
 
2372
    /*Stamp end of buffers as needed*/
 
2373
    if(msginfo->operation == GET && !msginfo->pinned) {
 
2374
      const int dscrlen = msginfo->dscrlen;
 
2375
      const int datalen = msginfo->datalen;
 
2376
      int *last;
 
2377
      if(dscrlen < (datalen - sizeof(int)))
 
2378
        last = (int*)(((char*)(msginfo+1))+(datalen-sizeof(int)));
 
2379
      else
 
2380
        last = (int*)(((char*)(msginfo+1))+(dscrlen+datalen-sizeof(int)));
 
2381
      *last = ARMCI_STAMP;
 
2382
#ifdef GET_STRIDED_COPY_PIPELINED
 
2383
      if(msginfo->format == STRIDED) {
 
2384
        const int ssize = GET_STRIDED_COPY_PIPELINED_SIZE/sizeof(int);
 
2385
        int *sfirst = (int*)(dscrlen+(char*)(msginfo+1))+ssize; /*stamping
 
2386
                                                            can start here*/
 
2387
        int *slast = last, *ptr;
 
2388
        for(ptr=sfirst; ptr<slast; ptr+=ssize) {
 
2389
          *ptr = ARMCI_STAMP;
 
2390
        }
 
2391
      }
 
2392
#endif
 
2393
    }
 
2394
    if(msginfo->operation == ACK) {
 
2395
      *(int *)(msginfo +1) = ARMCI_STAMP+1;
 
2396
      *(((int *)(msginfo +1))+1) = ARMCI_STAMP+1;
 
2397
    }
 
2398
    
 
2399
 
 
2400
#if defined(PEND_BUFS)
 
2401
    if((msginfo->operation==PUT || ARMCI_ACC(msginfo->operation)) 
 
2402
       && bytes > IMM_BUF_LEN) {
 
2403
      msginfo->tag.imm_msg=0;
 
2404
      assert(sizeof(request_header_t)<IMM_BUF_LEN); /*sanity check*/
 
2405
      bytes = ARMCI_MIN(bytes-msginfo->datalen, IMM_BUF_LEN);
 
2406
      assert(bytes==IMM_BUF_LEN||(bytes==sizeof(*msginfo)+msginfo->dscrlen));
 
2407
    }
 
2408
    else if(msginfo->operation==GET
 
2409
            && !(msginfo->datalen+sizeof(request_header_t)+msginfo->dscrlen<IMM_BUF_LEN)) {
 
2410
      assert(sizeof(request_header_t) < IMM_BUF_LEN);
 
2411
      msginfo->tag.imm_msg=0;
 
2412
      bytes = ARMCI_MIN(sizeof(request_header_t)+msginfo->dscrlen, IMM_BUF_LEN);
 
2413
    }
 
2414
#if defined(PUT_NO_SRV_COPY) && 0 /*SK:disabled. Imm msgs are sent inline
 
2415
                                    for latency reasons*/
 
2416
    else if(msginfo->operation==PUT && !msginfo->pinned && msginfo->format==STRIDED && msginfo->tag.data_len>=2048) {
 
2417
      msginfo->tag.imm_msg = 0;
 
2418
      assert(sizeof(request_header_t)<IMM_BUF_LEN); /*sanity check*/
 
2419
      bytes = ARMCI_MIN(bytes-msginfo->datalen, IMM_BUF_LEN);
 
2420
      assert(bytes==IMM_BUF_LEN||(bytes==sizeof(*msginfo)+msginfo->dscrlen));
 
2421
    }
 
2422
#endif
 
2423
    else{
 
2424
      msginfo->tag.imm_msg=1;
 
2425
    }
 
2426
/*    printf("%d: send_req: op=%d bytes=%d data_len=%d imm=%d\n",*/
 
2427
/*         armci_me, msginfo->operation, bytes, msginfo->datalen,msginfo->tag.imm_msg);*/
 
2428
/*    fflush(stdout);*/
 
2429
    if(bytes<0 || bytes>IMM_BUF_LEN) {
 
2430
      printf("%d(pid=%d): Trying to send too large a mesg. op=%d bytes=%d(max=%d) to=%d\n", armci_me, getpid(),msginfo->operation,bytes,IMM_BUF_LEN, proc);
 
2431
      fflush(stdout);
 
2432
      pause();
 
2433
      assert(bytes>=0);
 
2434
      assert(bytes <= IMM_BUF_LEN);
 
2435
    }
 
2436
    _armci_buf_ensure_pend_outstanding_op_per_node(buf,cluster);
 
2437
/*     printf("%d: send_req. ensured pend os per node. to=%d op=%d\n", armci_me, msginfo->to,msginfo->operation); */
 
2438
/*     fflush(stdout); */
 
2439
#else
 
2440
    _armci_buf_ensure_one_outstanding_op_per_node(buf,cluster);
 
2441
#endif
 
2442
 
 
2443
    if(msginfo->operation == PUT || ARMCI_ACC(msginfo->operation)){
 
2444
#if defined(PEND_BUFS)
 
2445
      if(!msginfo->tag.imm_msg){
 
2446
        msginfo->tag.data_ptr = (char *)(msginfo+1)+msginfo->dscrlen;
 
2447
        msginfo->tag.data_len = msginfo->datalen;
 
2448
      }
 
2449
      else
 
2450
        msginfo->tag.data_ptr = NULL;
 
2451
#else
 
2452
      {
 
2453
          msginfo->tag.data_ptr = (void *)&msginfo->tag.ack;
 
2454
      }
 
2455
#endif
 
2456
    }
 
2457
    else {
 
2458
       if(msginfo->operation == GET && !msginfo->bypass && msginfo->dscrlen
 
2459
                       >= (msginfo->datalen-sizeof(int)))
 
2460
         msginfo->tag.data_ptr = (char *)(msginfo+1)+msginfo->dscrlen;
 
2461
       else
 
2462
         msginfo->tag.data_ptr = GET_DATA_PTR(buf);
 
2463
    }
 
2464
 
 
2465
    /*this has to be reset so that we can wait on it
 
2466
      see ReadFromDirect*/
 
2467
    msginfo->tag.ack = 0;
 
2468
    msginfo->tag.ack_ptr = &(msginfo->tag.ack);
 
2469
 
 
2470
    if(DEBUG_CLN){
 
2471
       printf("%d:the ack_ptr is initialised to %p, ack->value is %ld\n",
 
2472
                 armci_me,msginfo->tag.ack_ptr,msginfo->tag.ack);fflush(stdout);
 
2473
    }
 
2474
 
 
2475
    armci_init_vapibuf_send(snd_dscr, ssg_lst,buf, 
 
2476
                            bytes, &client_memhandle);
 
2477
 
 
2478
/*    printf("%d: Sending req wr_id=%d to=%d\n",armci_me,snd_dscr->wr_id,proc);*/
 
2479
/*    fflush(stdout);*/
 
2480
    armci_vapi_post_send(1,cluster,snd_dscr,"send_req_msg:post_send");
 
2481
 
 
2482
    THREAD_UNLOCK(armci_user_threads.net_lock);
 
2483
 
 
2484
    if(DEBUG_CLN){
 
2485
       printf("%d:client sent REQ=%d %d bytes serv=%d qp=%ld id =%ld lkey=%d\n",
 
2486
               armci_me,msginfo->operation,bytes,cluster,
 
2487
               (SRV_con+cluster)->qp,snd_dscr->wr_id,ssg_lst->lkey);
 
2488
       fflush(stdout);
 
2489
    }
 
2490
    return(0);
 
2491
}
 
2492
 
 
2493
 
 
2494
/*\
 
2495
 *  client waits for first phase ack before posting gather desr
 
2496
\*/
 
2497
void armci_wait_ack(char *buffer)
 
2498
{
 
2499
   long *flag;
 
2500
   request_header_t *msginfo = (request_header_t *)(buffer);
 
2501
   flag = (long*)&msginfo->tag.ack;
 
2502
 
 
2503
   while(armci_util_long_getval(flag) != ARMCI_STAMP);
 
2504
   flag = 0;
 
2505
}
 
2506
 
 
2507
 
 
2508
 
 
2509
 
 
2510
void armci_client_direct_send(int p,void *src_buf, void *dst_buf, int len,void** contextptr,int nbtag,ARMCI_MEMHDL_T *lochdl,ARMCI_MEMHDL_T *remhdl)
 
2511
{
 
2512
sr_descr_t *dirdscr;
 
2513
int clus = armci_clus_id(p);
 
2514
 
 
2515
    THREAD_LOCK(armci_user_threads.net_lock);
 
2516
 
 
2517
    /*ID for the desr that comes from get_next_descr is already set*/
 
2518
    dirdscr = armci_vapi_get_next_sdescr(nbtag,0);
 
2519
    if(nbtag)*contextptr = dirdscr;
 
2520
 
 
2521
    armci_init_vbuf_srdma(&dirdscr->sdescr,dirdscr->sg_entry,src_buf,dst_buf,
 
2522
                          len,lochdl,remhdl);
 
2523
 
 
2524
    armci_vapi_post_send(1,clus,&(dirdscr->sdescr),
 
2525
                         "client_direct_send:post_send");
 
2526
 
 
2527
    /* the following unlock/lock ensures fairness (in case other threads are waiting
 
2528
       on the lock) not required to work */
 
2529
#if 1 
 
2530
    THREAD_UNLOCK(armci_user_threads.net_lock);
 
2531
    THREAD_LOCK(armci_user_threads.net_lock);
 
2532
#endif
 
2533
 
 
2534
    if(nbtag==0)
 
2535
       armci_send_complete(&(dirdscr->sdescr),"armci_client_direct_send",1);
 
2536
 
 
2537
    THREAD_UNLOCK(armci_user_threads.net_lock);
 
2538
}
 
2539
 
 
2540
/*\ RDMA get
 
2541
\*/
 
2542
void armci_client_direct_get(int p, void *src_buf, void *dst_buf, int len,
 
2543
                             void** cptr,int nbtag,ARMCI_MEMHDL_T *lochdl,
 
2544
                             ARMCI_MEMHDL_T *remhdl)
 
2545
{
 
2546
int rc = 0;
 
2547
sr_descr_t *dirdscr;
 
2548
int clus = armci_clus_id(p);
 
2549
struct ibv_send_wr *bad_wr;
 
2550
 
 
2551
    THREAD_LOCK(armci_user_threads.net_lock);
 
2552
 
 
2553
    /*ID for the desr that comes from get_next_descr is already set*/
 
2554
    dirdscr = armci_vapi_get_next_sdescr(nbtag,0);
 
2555
    if(nbtag)*cptr = dirdscr;
 
2556
 
 
2557
    if(DEBUG_CLN){
 
2558
      printf("\n%d: in direct get lkey=%d rkey=%d\n",armci_me,lochdl->lkey,
 
2559
               remhdl->rkey);fflush(stdout);
 
2560
    }
 
2561
 
 
2562
    armci_init_vbuf_rrdma(&dirdscr->sdescr,dirdscr->sg_entry,dst_buf,src_buf,
 
2563
                          len,lochdl,remhdl);
 
2564
    rc = ibv_post_send((SRV_con+clus)->qp, &(dirdscr->sdescr), &bad_wr);
 
2565
    dassert1(1,rc==0,rc);
 
2566
 
 
2567
    /* unlock/lock to ensure fairness: allows others thread post before
 
2568
       waiting for completion */
 
2569
    /*VT?check to see if this should be UNLOCK followed by lock*/
 
2570
#if 1
 
2571
    THREAD_UNLOCK(armci_user_threads.net_lock);
 
2572
    THREAD_LOCK(armci_user_threads.net_lock);
 
2573
#endif
 
2574
 
 
2575
    if(!nbtag){
 
2576
       armci_send_complete(&(dirdscr->sdescr),"armci_client_direct_get",1);
 
2577
    }
 
2578
 
 
2579
    THREAD_UNLOCK(armci_user_threads.net_lock);
 
2580
}
 
2581
 
 
2582
#define WQE_LIST_LENGTH 32
 
2583
#define WQE_LIST_COUNT  1
 
2584
 
 
2585
/** Direct put into remote processor memory. Assumes that (and invoked
 
2586
 *  only when) the source buffers in user memory are pinned as well.
 
2587
 * @param operation PUT/GET
 
2588
 * @param src_ptr Source pointer for data
 
2589
 * @param src_stride_arr Strides on the source array
 
2590
 * @param dst_ptr Destination pointer to start writing to
 
2591
 * @param seq_count[stride_levels+1] #els in each stride
 
2592
 * level. seg_count[0] is contiguous bytes
 
2593
 * @param proc Destimation process
 
2594
 * @param cptr OUT Pointer to store the descriptor to wait on for completion
 
2595
 * @param nbtag IN Non-blocking tag (non-blocking op if nbtag!=0)
 
2596
 * @param lochdl IN Local memory handle/key (registered memory stuff)
 
2597
 * @param remhdl IN Remote memory handle/key
 
2598
 * 
 
2599
 */
 
2600
#if 0
 
2601
void armci_client_direct_rdma_strided(int operation, int proc,
 
2602
                                      char *src_ptr, int src_stride_arr[],
 
2603
                                      char *dst_ptr, int dst_stride_arr[],
 
2604
                                      int seg_count[],
 
2605
                                      int stride_levels,
 
2606
                                      void **cptr, int nbtag,
 
2607
                                      ARMCI_MEMHDL_T *lochdl,
 
2608
                                      ARMCI_MEMHDL_T *remhdl) {
 
2609
  
 
2610
  int rc;
 
2611
  sr_descr_t *dirdscr;
 
2612
  const int clus = armci_clus_id(proc);
 
2613
  struct ibv_send_wr *bad_wr;
 
2614
  struct ibv_send_wr sdscr[WQE_LIST_COUNT][WQE_LIST_LENGTH];
 
2615
  struct ibv_sge     sg_entry[WQE_LIST_COUNT][WQE_LIST_LENGTH];
 
2616
  int busy[WQE_LIST_COUNT], wait_count[WQE_LIST_COUNT],clst;
 
2617
  int i, j, c, numposts;
 
2618
  int idx[MAX_STRIDE_LEVEL];
 
2619
 
 
2620
  THREAD_LOCK(armci_user_threads.net_lock);
 
2621
 
 
2622
  assert(stride_levels >= 0);
 
2623
  assert(stride_levels<=MAX_STRIDE_LEVEL);
 
2624
  /*ID for the desr that comes from get_next_descr is already set*/
 
2625
  dirdscr = armci_vapi_get_next_sdescr(nbtag,0);
 
2626
  if(nbtag)*cptr = dirdscr;
 
2627
  assert(dirdscr->tag == nbtag);
 
2628
 
 
2629
  if(DEBUG_CLN) {
 
2630
    printf("\n%d: in direct rdma strided id=%d lkey=%ld rkey=%ld\n",
 
2631
           armci_me,dirdscr->sdescr.wr_id,lochdl->lkey,remhdl->rkey);fflush(stdout);
 
2632
  }
 
2633
 
 
2634
  for(c=0; c<WQE_LIST_COUNT; c++) {
 
2635
    busy[c]=0;
 
2636
  }
 
2637
  /*initialize fixed values for descriptors*/
 
2638
  bzero(sdscr, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_send_wr));
 
2639
  bzero(sg_entry, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_sge));
 
2640
  for(j=0; j<WQE_LIST_COUNT; j++) {
 
2641
    for(i=0; i<WQE_LIST_LENGTH; i++) {
 
2642
      if(operation == PUT) 
 
2643
        armci_init_vbuf_srdma(&sdscr[j][i],&sg_entry[j][i],NULL,NULL,seg_count[0],lochdl,remhdl);
 
2644
      else if(operation == GET) 
 
2645
        armci_init_vbuf_rrdma(&sdscr[j][i],&sg_entry[j][i],NULL,NULL,seg_count[0],lochdl,remhdl);
 
2646
      else
 
2647
        armci_die("rdma_strided: unsupported operation",operation);
 
2648
      sdscr[j][i].wr_id = dirdscr->sdescr.wr_id;
 
2649
      sdscr[j][i].send_flags = 0; /*non-signalled*/
 
2650
      if(i<WQE_LIST_LENGTH-1)
 
2651
        sdscr[j][i].next = &sdscr[j][i+1];
 
2652
    }
 
2653
  }
 
2654
  /*post requests in a loop*/
 
2655
  numposts=1;
 
2656
  for(i=1; i<=stride_levels; i++) {
 
2657
    numposts *= seg_count[i];
 
2658
  }
 
2659
/*   printf("%d: client rdma op=%d numposts=%d\n",armci_me,operation,numposts); */
 
2660
  
 
2661
  dirdscr->numofsends=0;
 
2662
  bzero(idx, stride_levels*sizeof(int));
 
2663
  int count = (numposts%WQE_LIST_LENGTH) ? (numposts%WQE_LIST_LENGTH):WQE_LIST_LENGTH;
 
2664
  assert(count == ARMCI_MIN(count, numposts));
 
2665
  clst=0;
 
2666
  for(i=0; i<numposts; ) {
 
2667
    for(j=i; j<i+count; j++) {
 
2668
      int src_offset=0, dst_offset=0;
 
2669
      for(c=0; c<stride_levels; c++) {
 
2670
        src_offset += idx[c]*src_stride_arr[c];
 
2671
        dst_offset += idx[c]*dst_stride_arr[c];
 
2672
      }
 
2673
 
 
2674
/*       armci_client_direct_send(proc,src_ptr+src_offset,  */
 
2675
/*                             dst_ptr+dst_offset, seg_count[0], */
 
2676
/*                             NULL,0,lochdl,remhdl); */
 
2677
      if(busy[clst]) {
 
2678
        assert(wait_count[clst]>0);
 
2679
        armci_send_complete(&dirdscr->sdescr,"client_direct_rdma_strided",wait_count[clst]);
 
2680
        dirdscr->numofsends -= wait_count[clst];
 
2681
        busy[clst]=0;
 
2682
        wait_count[clst]=0;
 
2683
      }
 
2684
 
 
2685
      if(operation == PUT) {
 
2686
        sg_entry[clst][j-i].addr        = (uint64_t)(src_ptr + src_offset);
 
2687
        sdscr[clst][j-i].wr.rdma.remote_addr = (uint64_t)(dst_ptr + dst_offset);
 
2688
      }
 
2689
      else if (operation == GET) {
 
2690
        sg_entry[clst][j-i].addr        = (uint64_t)(dst_ptr + dst_offset);
 
2691
        sdscr[clst][j-i].wr.rdma.remote_addr = (uint64_t)(src_ptr + src_offset);
 
2692
      }
 
2693
      assert(sg_entry[clst][j-i].length == seg_count[0]);
 
2694
 
 
2695
      idx[0] += 1;
 
2696
      for(c=0;c<stride_levels-1 && idx[c]==seg_count[c+1]; c++) {
 
2697
        idx[c]=0; idx[c+1]++;
 
2698
      }
 
2699
    }
 
2700
    sdscr[clst][count-1].next=NULL;
 
2701
    sdscr[clst][count-1].send_flags=IBV_SEND_SIGNALED; /*only the last one*/
 
2702
    for(c=0; c<count-1; c++) {
 
2703
      assert(sdscr[clst][c].next == &sdscr[clst][c+1]);
 
2704
    }
 
2705
    rc = ibv_post_send(SRV_con[clus].qp, sdscr[clst], &bad_wr);
 
2706
    dassert1(1,rc==0,rc);
 
2707
    dirdscr->numofsends += 1;
 
2708
    wait_count[clst] = 1;
 
2709
/*     armci_send_complete(&dirdscr->sdescr,"armci_client_direct_rdma_strided",count); */
 
2710
 
 
2711
    if(count < WQE_LIST_LENGTH) {
 
2712
      sdscr[clst][count-1].next=&sdscr[clst][count]; /*reset it*/ 
 
2713
    }
 
2714
    sdscr[clst][count-1].send_flags=0; /*reset it*/
 
2715
    i += count;
 
2716
    count = ARMCI_MIN(WQE_LIST_LENGTH,numposts-i);
 
2717
    assert(count==0 || count==WQE_LIST_LENGTH);
 
2718
    clst = (clst+1)%WQE_LIST_COUNT;
 
2719
  }
 
2720
 
 
2721
  if(!nbtag) {
 
2722
    armci_send_complete(&dirdscr->sdescr,"armci_client_direct_get",dirdscr->numofsends);
 
2723
    dirdscr->numofsends = 0;
 
2724
    dirdscr->tag = 0;
 
2725
  }  
 
2726
  THREAD_UNLOCK(armci_user_threads.net_lock);
 
2727
}
 
2728
#else
 
2729
void armci_client_direct_rdma_strided(int operation, int proc,
 
2730
                                      char *src_ptr, int src_stride_arr[],
 
2731
                                      char *dst_ptr, int dst_stride_arr[],
 
2732
                                      int seg_count[],
 
2733
                                      int stride_levels,
 
2734
                                      void **cptr, int nbtag,
 
2735
                                      ARMCI_MEMHDL_T *lochdl,
 
2736
                                      ARMCI_MEMHDL_T *remhdl) {
 
2737
  int rc, i, j, c, busy[WQE_LIST_COUNT], clst, ctr;
 
2738
  sr_descr_t *dirdscr;
 
2739
  const int clus = armci_clus_id(proc);
 
2740
  struct ibv_send_wr *bad_wr;
 
2741
  struct ibv_send_wr sdscr[WQE_LIST_COUNT][WQE_LIST_LENGTH];
 
2742
  struct ibv_sge     sg_entry[WQE_LIST_COUNT][WQE_LIST_LENGTH];
 
2743
  stride_info_t sinfo, dinfo;
 
2744
 
 
2745
  THREAD_LOCK(armci_user_threads.net_lock);
 
2746
 
 
2747
  assert(stride_levels >= 0);
 
2748
  assert(stride_levels<=MAX_STRIDE_LEVEL);
 
2749
  /*ID for the desr that comes from get_next_descr is already set*/
 
2750
  dirdscr = armci_vapi_get_next_sdescr(nbtag,0);
 
2751
  if(nbtag)*cptr = dirdscr;
 
2752
  assert(dirdscr->tag == nbtag);
 
2753
 
 
2754
  if(DEBUG_CLN) {
 
2755
    printf("\n%d: in direct rdma strided id=%d lkey=%ld rkey=%ld\n",
 
2756
           armci_me,dirdscr->sdescr.wr_id,lochdl->lkey,remhdl->rkey);fflush(stdout);
 
2757
  }
 
2758
 
 
2759
  /*initialize fixed values for descriptors*/
 
2760
  bzero(sdscr, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_send_wr));
 
2761
  bzero(sg_entry, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_sge));
 
2762
  for(j=0; j<WQE_LIST_COUNT; j++) {
 
2763
    for(i=0; i<WQE_LIST_LENGTH; i++) {
 
2764
      if(operation == PUT) 
 
2765
        armci_init_vbuf_srdma(&sdscr[j][i],&sg_entry[j][i],NULL,NULL,seg_count[0],lochdl,remhdl);
 
2766
      else if(operation == GET) 
 
2767
        armci_init_vbuf_rrdma(&sdscr[j][i],&sg_entry[j][i],NULL,NULL,seg_count[0],lochdl,remhdl);
 
2768
      else
 
2769
        armci_die("rdma_strided: unsupported operation",operation);
 
2770
      sdscr[j][i].wr_id = dirdscr->sdescr.wr_id;
 
2771
      sdscr[j][i].send_flags = 0; /*non-signalled*/
 
2772
      if(i<WQE_LIST_LENGTH-1)
 
2773
        sdscr[j][i].next = &sdscr[j][i+1];
 
2774
    }
 
2775
  }
 
2776
 
 
2777
  /*post requests in a loop*/
 
2778
  armci_stride_info_init(&sinfo,src_ptr,stride_levels,src_stride_arr,seg_count);
 
2779
  armci_stride_info_init(&dinfo,dst_ptr,stride_levels,dst_stride_arr,seg_count);
 
2780
  assert(armci_stride_info_size(&sinfo)==armci_stride_info_size(&dinfo));
 
2781
  
 
2782
  dirdscr->numofsends=0;
 
2783
  clst=ctr=0;
 
2784
  bzero(busy, sizeof(int)*WQE_LIST_COUNT);
 
2785
  while(armci_stride_info_has_more(&sinfo)) {
 
2786
    assert(armci_stride_info_has_more(&dinfo));
 
2787
    uint64_t saddr = (uint64_t)armci_stride_info_seg_ptr(&sinfo);
 
2788
    uint64_t daddr = (uint64_t)armci_stride_info_seg_ptr(&dinfo);
 
2789
    if(operation == PUT) {
 
2790
      sg_entry[clst][ctr].addr = saddr;
 
2791
      sdscr[clst][ctr].wr.rdma.remote_addr = daddr;
 
2792
    }
 
2793
    else if (operation == GET) {
 
2794
      sg_entry[clst][ctr].addr = daddr;
 
2795
      sdscr[clst][ctr].wr.rdma.remote_addr = saddr;
 
2796
    }
 
2797
    assert(sg_entry[clst][ctr].length == seg_count[0]);
 
2798
 
 
2799
    ctr+=1;
 
2800
    armci_stride_info_next(&sinfo);
 
2801
    armci_stride_info_next(&dinfo);
 
2802
    if(ctr == WQE_LIST_LENGTH || !armci_stride_info_has_more(&sinfo)) {
 
2803
      sdscr[clst][ctr-1].next=NULL;
 
2804
      sdscr[clst][ctr-1].send_flags=IBV_SEND_SIGNALED; /*only the last one*/
 
2805
      for(c=0; c<ctr-1; c++) {
 
2806
        assert(sdscr[clst][c].next == &sdscr[clst][c+1]);
 
2807
      }
 
2808
      rc = ibv_post_send(SRV_con[clus].qp, sdscr[clst], &bad_wr);
 
2809
      dassert1(1,rc==0,rc);
 
2810
      busy[clst] = 1;
 
2811
      dirdscr->numofsends += 1;
 
2812
      if(ctr<WQE_LIST_LENGTH) 
 
2813
        sdscr[clst][ctr-1].next = &sdscr[clst][ctr];
 
2814
      sdscr[clst][ctr-1].send_flags = 0;
 
2815
 
 
2816
      ctr=0;
 
2817
      clst = (clst+1)%WQE_LIST_COUNT;
 
2818
      if(busy[clst]) {
 
2819
        armci_send_complete(&dirdscr->sdescr,"client_direct_rdma_strided",1);
 
2820
        dirdscr->numofsends -= 1;
 
2821
        busy[clst]=0;   
 
2822
      }
 
2823
    }
 
2824
  }
 
2825
  armci_stride_info_destroy(&sinfo);
 
2826
  armci_stride_info_destroy(&dinfo);
 
2827
 
 
2828
  if(!nbtag) {
 
2829
    armci_send_complete(&dirdscr->sdescr,"armci_client_direct_get",dirdscr->numofsends);
 
2830
    dirdscr->numofsends = 0;
 
2831
    dirdscr->tag = 0;
 
2832
  }  
 
2833
  THREAD_UNLOCK(armci_user_threads.net_lock);
 
2834
}
 
2835
#endif
 
2836
 
 
2837
#if defined(PEND_BUFS)
 
2838
int armci_server_msginfo_to_pbuf_index(request_header_t *msginfo) {
 
2839
  int index=-1, i;
 
2840
  vapibuf_pend_t *pbuf=NULL;
 
2841
  
 
2842
  assert(!msginfo->tag.imm_msg);
 
2843
  for(i = 0; i<PENDING_BUF_NUM; i++) {
 
2844
    if(serv_pendbuf_arr[i].buf == (char *)msginfo) {
 
2845
      pbuf = &serv_pendbuf_arr[i];
 
2846
      index = i;
 
2847
      break;
 
2848
    }
 
2849
  }
 
2850
  return index;
 
2851
}
 
2852
 
 
2853
 
 
2854
/** Routine for server to RDMA strided data to the client-side buffers
 
2855
 * (allocated through buffers.c). This is to be used instead of
 
2856
 * copying the data to immediate or pending buffers when possible.
 
2857
 */
 
2858
#if 0
 
2859
void armci_server_rdma_strided_to_contig(char *src_ptr, int src_stride_arr[],
 
2860
                                         int seg_count[],
 
2861
                                         int stride_levels,
 
2862
                                         char *dst_ptr, int proc,
 
2863
                                         request_header_t *msginfo) {
 
2864
  int rc, i, j, c, busy[WQE_LIST_COUNT], clst, ctr, wr_id;
 
2865
  sr_descr_t *dirdscr;
 
2866
  struct ibv_send_wr *bad_wr, sdscr1;
 
2867
  struct ibv_send_wr sdscr[WQE_LIST_COUNT][WQE_LIST_LENGTH];
 
2868
  struct ibv_sge     sg_entry[WQE_LIST_COUNT][WQE_LIST_LENGTH];
 
2869
  stride_info_t sinfo;
 
2870
  uint64_t daddr;
 
2871
  ARMCI_MEMHDL_T *loc_memhdl;
 
2872
  ARMCI_MEMHDL_T *rem_memhdl = &handle_array[proc];
 
2873
  
 
2874
  THREAD_LOCK(armci_user_threads.net_lock);
 
2875
 
 
2876
  assert(msginfo->operation == GET);
 
2877
  assert(stride_levels >= 0);
 
2878
  assert(stride_levels<=MAX_STRIDE_LEVEL);
 
2879
 
 
2880
  if(!get_armci_region_local_hndl(src_ptr,armci_clus_id(armci_me), &loc_memhdl)) {
 
2881
    armci_die("rdma_strided_to_contig: failed to get local handle\n",0);
 
2882
  }
 
2883
 
 
2884
  if(!msginfo->tag.imm_msg) {
 
2885
    int index = armci_server_msginfo_to_pbuf_index(msginfo);
 
2886
    assert(index>=0);
 
2887
    wr_id = PBUF_BUFID_TO_PUT_WRID(index);
 
2888
  }
 
2889
  else {
 
2890
    wr_id = DSCRID_IMMBUF_RESP_END-1-proc;
 
2891
  }
 
2892
  bzero(&sdscr1, sizeof(sdscr1));
 
2893
  sdscr1.wr_id = wr_id;
 
2894
 
 
2895
  if(DEBUG_CLN) {
 
2896
    printf("\n%d: in rdma strided to contig id=%d lkey=%ld rkey=%ld\n",
 
2897
           armci_me,wr_id,loc_memhdl->lkey,rem_memhdl->rkey);
 
2898
    fflush(stdout);
 
2899
  }
 
2900
 
 
2901
  /*initialize fixed values for descriptors*/
 
2902
  bzero(sdscr, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_send_wr));
 
2903
  bzero(sg_entry, WQE_LIST_COUNT*WQE_LIST_LENGTH*sizeof(struct ibv_sge));
 
2904
  for(j=0; j<WQE_LIST_COUNT; j++) {
 
2905
    for(i=0; i<WQE_LIST_LENGTH; i++) {
 
2906
      armci_init_vbuf_srdma(&sdscr[j][i],&sg_entry[j][i],NULL,NULL,seg_count[0],loc_memhdl,rem_memhdl);
 
2907
      sdscr[j][i].wr_id = wr_id;
 
2908
      sdscr[j][i].send_flags = 0; /*non-signalled*/
 
2909
/*       sdscr[j][i].send_flags = IBV_SEND_SIGNALED; /\*signalled*\/ */
 
2910
      if(i<WQE_LIST_LENGTH-1)
 
2911
        sdscr[j][i].next = &sdscr[j][i+1];
 
2912
    }
 
2913
  }
 
2914
 
 
2915
  /*post requests in a loop*/
 
2916
  armci_stride_info_init(&sinfo,src_ptr,stride_levels,src_stride_arr,seg_count);
 
2917
  
 
2918
  clst=ctr=0;
 
2919
  bzero(busy, sizeof(int)*WQE_LIST_COUNT);
 
2920
  daddr = (uint64_t)dst_ptr;
 
2921
  while(armci_stride_info_has_more(&sinfo)) {
 
2922
    uint64_t saddr = (uint64_t)armci_stride_info_seg_ptr(&sinfo);
 
2923
    sg_entry[clst][ctr].addr = saddr;
 
2924
    sdscr[clst][ctr].wr.rdma.remote_addr = daddr;
 
2925
    assert(sg_entry[clst][ctr].length == seg_count[0]);
 
2926
 
 
2927
    ctr+=1;
 
2928
    daddr += seg_count[0];
 
2929
    armci_stride_info_next(&sinfo);
 
2930
    if(ctr == WQE_LIST_LENGTH || !armci_stride_info_has_more(&sinfo)) {
 
2931
      sdscr[clst][ctr-1].next=NULL;
 
2932
      if(!armci_stride_info_has_more(&sinfo)) {
 
2933
        sdscr[clst][ctr-1].send_flags=IBV_SEND_SIGNALED; /*only the last one*/
 
2934
      }
 
2935
      for(c=0; c<ctr-1; c++) {
 
2936
        assert(sdscr[clst][c].next == &sdscr[clst][c+1]);
 
2937
      }
 
2938
      rc = ibv_post_send(CLN_con[proc].qp, sdscr[clst], &bad_wr);
 
2939
      dassert1(1,rc==0,rc);
 
2940
      busy[clst] = 1;
 
2941
#if 0
 
2942
      armci_send_complete(&sdscr1,"serv_rdma_to_contig",ctr);
 
2943
      busy[clst] = 0;
 
2944
#endif
 
2945
      if(ctr<WQE_LIST_LENGTH) 
 
2946
        sdscr[clst][ctr-1].next = &sdscr[clst][ctr];
 
2947
      sdscr[clst][ctr-1].send_flags = 0;
 
2948
 
 
2949
      ctr=0;
 
2950
      clst = (clst+1)%WQE_LIST_COUNT;
 
2951
#if 0
 
2952
      if(busy[clst]) {
 
2953
        armci_send_complete(&sdscr1,"client_direct_rdma_strided",1);
 
2954
        busy[clst]=0;   
 
2955
      }
 
2956
#endif
 
2957
    }
 
2958
  }
 
2959
  armci_stride_info_destroy(&sinfo);
 
2960
  assert(proc == msginfo->from);
 
2961
  THREAD_UNLOCK(armci_user_threads.net_lock);
 
2962
}
 
2963
#else
 
2964
 
 
2965
#define MAX_NUM_SGE 64
 
2966
 
 
2967
/*same as above, but uses gather rdma writes*/
 
2968
void armci_server_rdma_strided_to_contig(char *src_ptr, int src_stride_arr[],
 
2969
                                         int seg_count[],
 
2970
                                         int stride_levels,
 
2971
                                         char *dst_ptr, int proc,
 
2972
                                         request_header_t *msginfo) {
 
2973
  int rc, ctr, wr_id, bytes;
 
2974
  struct ibv_send_wr *bad_wr, sdscr1, sdscr;
 
2975
  struct ibv_sge     sg_entry[MAX_NUM_SGE];
 
2976
  stride_info_t sinfo;
 
2977
  uint64_t daddr;
 
2978
  ARMCI_MEMHDL_T *loc_memhdl;
 
2979
  ARMCI_MEMHDL_T *rem_memhdl = &handle_array[proc];
 
2980
  const int max_num_sge = ARMCI_MIN(MAX_NUM_SGE, armci_max_num_sg_ent);
 
2981
  int numposts=0, numsegs=0;
 
2982
  
 
2983
  THREAD_LOCK(armci_user_threads.net_lock);
 
2984
 
 
2985
  assert(msginfo->operation == GET);
 
2986
  assert(stride_levels >= 0);
 
2987
  assert(stride_levels<=MAX_STRIDE_LEVEL);
 
2988
 
 
2989
  if(!get_armci_region_local_hndl(src_ptr,armci_clus_id(armci_me), &loc_memhdl)) {
 
2990
    armci_die("rdma_strided_to_contig: failed to get local handle\n",0);
 
2991
  }
 
2992
 
 
2993
  if(!msginfo->tag.imm_msg) {
 
2994
    int index = armci_server_msginfo_to_pbuf_index(msginfo);
 
2995
    assert(index>=0);
 
2996
    wr_id = PBUF_BUFID_TO_PUT_WRID(index);
 
2997
  }
 
2998
  else {
 
2999
    wr_id = DSCRID_IMMBUF_RESP_END-1-proc;
 
3000
  }
 
3001
  bzero(&sdscr1, sizeof(sdscr1));
 
3002
  sdscr1.wr_id = wr_id;
 
3003
 
 
3004
  if(DEBUG_CLN) {
 
3005
    printf("\n%d: in rdma strided to contig id=%d lkey=%ld rkey=%ld\n",
 
3006
           armci_me,wr_id,loc_memhdl->lkey,rem_memhdl->rkey);
 
3007
    fflush(stdout);
 
3008
  }
 
3009
 
 
3010
  /*initialize fixed values for descriptors*/
 
3011
  bzero(&sdscr, sizeof(sdscr));
 
3012
  bzero(sg_entry, max_num_sge*sizeof(struct ibv_sge));
 
3013
  armci_init_vbuf_srdma(&sdscr,&sg_entry[0],NULL,NULL,seg_count[0],loc_memhdl,rem_memhdl);
 
3014
  sdscr.send_flags = 0; /*non-signalled*/
 
3015
  sdscr.num_sge    = 0; /*set below in the loop*/
 
3016
  sdscr.wr_id      = wr_id;
 
3017
 
 
3018
  for(ctr=0; ctr<max_num_sge; ctr++) {
 
3019
    sg_entry[ctr].length = seg_count[0];
 
3020
    sg_entry[ctr].lkey = loc_memhdl->lkey;
 
3021
  }
 
3022
  
 
3023
  /*post requests in a loop*/
 
3024
  armci_stride_info_init(&sinfo,src_ptr,stride_levels,src_stride_arr,seg_count);
 
3025
  
 
3026
  numposts = numsegs = 0;
 
3027
  ctr=0;
 
3028
  daddr = (uint64_t)dst_ptr;
 
3029
  bytes=0;
 
3030
  while(armci_stride_info_has_more(&sinfo)) {
 
3031
    sg_entry[ctr].addr = (uint64_t)armci_stride_info_seg_ptr(&sinfo);
 
3032
    assert(sg_entry[ctr].length == seg_count[0]);
 
3033
 
 
3034
    sdscr.num_sge += 1;
 
3035
    bytes += seg_count[0];
 
3036
    ctr+=1;
 
3037
    numsegs += 1;
 
3038
    armci_stride_info_next(&sinfo);
 
3039
    if(ctr == max_num_sge || !armci_stride_info_has_more(&sinfo)) {
 
3040
      sdscr.wr.rdma.remote_addr = daddr;
 
3041
      if(!armci_stride_info_has_more(&sinfo)) {
 
3042
        sdscr.send_flags=IBV_SEND_SIGNALED; /*only the last one*/
 
3043
      }
 
3044
      else {
 
3045
        assert(sdscr.send_flags == 0);
 
3046
      }
 
3047
      assert(ctr == sdscr.num_sge);
 
3048
      rc = ibv_post_send(CLN_con[proc].qp, &sdscr, &bad_wr);
 
3049
      dassert1(1,rc==0,rc);
 
3050
 
 
3051
      numposts += 1;
 
3052
      ctr=0;
 
3053
      sdscr.num_sge = 0;
 
3054
      daddr += bytes;
 
3055
      bytes = 0;
 
3056
    }
 
3057
  }
 
3058
/*   printf("%d(s): scatgat write numposts=%d numsegs=%d\n",armci_me,numposts,numsegs); */
 
3059
  armci_stride_info_destroy(&sinfo);
 
3060
  assert(proc == msginfo->from);
 
3061
  THREAD_UNLOCK(armci_user_threads.net_lock);
 
3062
}
 
3063
 
 
3064
/*Directly read data from client buffers into remote memory. Data is
 
3065
  contiguous in client-side. */
 
3066
void armci_server_rdma_contig_to_strided(char *src_ptr, int proc,
 
3067
                                         char *dst_ptr, 
 
3068
                                         int dst_stride_arr[],
 
3069
                                         int seg_count[],
 
3070
                                         int stride_levels,
 
3071
                                         request_header_t *msginfo) {
 
3072
  int rc, ctr, wr_id, bytes;
 
3073
  struct ibv_send_wr *bad_wr, sdscr1, sdscr;
 
3074
  struct ibv_sge     sg_entry[MAX_NUM_SGE];
 
3075
  stride_info_t dinfo;
 
3076
  uint64_t saddr;
 
3077
  ARMCI_MEMHDL_T *loc_memhdl;
 
3078
  ARMCI_MEMHDL_T *rem_memhdl = &handle_array[proc];
 
3079
  const int max_num_sge = ARMCI_MIN(MAX_NUM_SGE, armci_max_num_sg_ent);
 
3080
  int numposts=0, numsegs=0;
 
3081
  
 
3082
  THREAD_LOCK(armci_user_threads.net_lock);
 
3083
 
 
3084
  assert(msginfo->operation == PUT);
 
3085
  assert(stride_levels >= 0);
 
3086
  assert(stride_levels<=MAX_STRIDE_LEVEL);
 
3087
 
 
3088
  if(!get_armci_region_local_hndl(dst_ptr,armci_clus_id(armci_me), &loc_memhdl)) {
 
3089
    armci_die("rdma_strided_to_contig: failed to get local handle\n",0);
 
3090
  }
 
3091
 
 
3092
  if(!msginfo->tag.imm_msg) {
 
3093
    int index = armci_server_msginfo_to_pbuf_index(msginfo);
 
3094
    assert(index>=0);
 
3095
    wr_id = PBUF_BUFID_TO_GET_WRID(index);
 
3096
  }
 
3097
  else {
 
3098
    wr_id = DSCRID_IMMBUF_RESP_END-1-proc;
 
3099
  }
 
3100
  bzero(&sdscr1, sizeof(sdscr1));
 
3101
  sdscr1.wr_id = wr_id;
 
3102
 
 
3103
  if(DEBUG_CLN) {
 
3104
    printf("\n%d: in rdma strided to contig id=%d lkey=%ld rkey=%ld\n",
 
3105
           armci_me,wr_id,loc_memhdl->lkey,rem_memhdl->rkey);
 
3106
    fflush(stdout);
 
3107
  }
 
3108
 
 
3109
  /*initialize fixed values for descriptors*/
 
3110
  bzero(&sdscr, sizeof(sdscr));
 
3111
  bzero(sg_entry, max_num_sge*sizeof(struct ibv_sge));
 
3112
  armci_init_vbuf_rrdma(&sdscr,&sg_entry[0],NULL,NULL,seg_count[0],loc_memhdl,rem_memhdl);
 
3113
  sdscr.send_flags = 0; /*non-signalled*/
 
3114
  sdscr.num_sge    = 0; /*set below in the loop*/
 
3115
  sdscr.wr_id      = wr_id;
 
3116
 
 
3117
  for(ctr=0; ctr<max_num_sge; ctr++) {
 
3118
    sg_entry[ctr].length = seg_count[0];
 
3119
    sg_entry[ctr].lkey = loc_memhdl->lkey;
 
3120
  }
 
3121
  
 
3122
  /*post requests in a loop*/
 
3123
  armci_stride_info_init(&dinfo,dst_ptr,stride_levels,dst_stride_arr,seg_count);
 
3124
  
 
3125
  numposts = numsegs = 0;
 
3126
  ctr=0;
 
3127
  saddr = (uint64_t)src_ptr;
 
3128
  bytes=0;
 
3129
  while(armci_stride_info_has_more(&dinfo)) {
 
3130
    sg_entry[ctr].addr = (uint64_t)armci_stride_info_seg_ptr(&dinfo);
 
3131
    assert(sg_entry[ctr].length == seg_count[0]);
 
3132
 
 
3133
    sdscr.num_sge += 1;
 
3134
    bytes += seg_count[0];
 
3135
    ctr+=1;
 
3136
    numsegs += 1;
 
3137
    armci_stride_info_next(&dinfo);
 
3138
    if(ctr == max_num_sge || !armci_stride_info_has_more(&dinfo)) {
 
3139
      sdscr.wr.rdma.remote_addr = saddr;
 
3140
      if(!armci_stride_info_has_more(&dinfo)) {
 
3141
        sdscr.send_flags=IBV_SEND_SIGNALED; /*only the last one*/
 
3142
      }
 
3143
      else {
 
3144
        assert(sdscr.send_flags == 0);
 
3145
      }
 
3146
      assert(ctr == sdscr.num_sge);
 
3147
      rc = ibv_post_send(CLN_con[proc].qp, &sdscr, &bad_wr);
 
3148
      dassert1(1,rc==0,rc);
 
3149
 
 
3150
      numposts += 1;
 
3151
      ctr=0;
 
3152
      sdscr.num_sge = 0;
 
3153
      saddr += bytes;
 
3154
      bytes = 0;
 
3155
    }
 
3156
  }
 
3157
/*   printf("%d(s): scatgat write numposts=%d numsegs=%d\n",armci_me,numposts,numsegs); */
 
3158
  armci_stride_info_destroy(&dinfo);
 
3159
  assert(proc == msginfo->from);
 
3160
  THREAD_UNLOCK(armci_user_threads.net_lock);
 
3161
}
 
3162
 
 
3163
#endif
 
3164
#endif
 
3165
 
 
3166
char *armci_ReadFromDirect(int proc, request_header_t *msginfo, int len)
 
3167
{
 
3168
int cluster = armci_clus_id(proc);
 
3169
vapibuf_ext_t* evbuf=BUF_TO_EVBUF(msginfo);
 
3170
char *dataptr = GET_DATA_PTR(evbuf->buf);
 
3171
extern void armci_util_wait_int(volatile int *,int,int);
 
3172
 
 
3173
    if(DEBUG_CLN){ printf("%d(c):read direct %d qp=%p\n",armci_me,
 
3174
                len,&(SRV_con+cluster)->qp); fflush(stdout);
 
3175
    }
 
3176
 
 
3177
    if(mark_buf_send_complete[evbuf->snd_dscr.wr_id]==0)
 
3178
       armci_send_complete(&(evbuf->snd_dscr),"armci_ReadFromDirect",1); 
 
3179
 
 
3180
    if(!msginfo->bypass){
 
3181
       long *flag;
 
3182
       int *last;
 
3183
       int loop = 0;
 
3184
       flag = &(msginfo->tag.ack);
 
3185
       if(msginfo->operation==GET){
 
3186
         last = (int *)(dataptr+len-sizeof(int));
 
3187
         if(msginfo->dscrlen >= (len-sizeof(int))){
 
3188
           last = (int *)(dataptr+len+msginfo->dscrlen-sizeof(int));
 
3189
           dataptr+=msginfo->dscrlen;
 
3190
         }
 
3191
 
 
3192
         if(DEBUG_CLN){
 
3193
           printf("\n%d:flagval=%d at ptr=%p ack=%ld dist=%d\n",armci_me,*last,
 
3194
                   last,*flag,len);fflush(stdout);
 
3195
         }
 
3196
 
 
3197
         while(armci_util_int_getval(last) == ARMCI_STAMP &&
 
3198
               armci_util_long_getval(flag)  != ARMCI_STAMP){
 
3199
           loop++;
 
3200
           loop %=100000;
 
3201
           if(loop==0){
 
3202
             if(DEBUG_CLN){
 
3203
               printf("%d: client last(%p)=%d flag(%p)=%ld off=%d\n",
 
3204
                      armci_me,last,*last,flag,*flag,msginfo->datalen);
 
3205
               fflush(stdout);
 
3206
             }
 
3207
           }
 
3208
         }
 
3209
         *flag = 0L;
 
3210
       }
 
3211
       else if(msginfo->operation == REGISTER){
 
3212
         while(armci_util_long_getval(flag)  != ARMCI_STAMP){
 
3213
           loop++;
 
3214
           loop %=100000;
 
3215
           if(loop==0){
 
3216
             if(DEBUG_CLN){
 
3217
               printf("%d: client flag(%p)=%ld off=%d\n",
 
3218
                      armci_me,flag,*flag,msginfo->datalen);
 
3219
               fflush(stdout);
 
3220
             }
 
3221
           }
 
3222
         }
 
3223
       }
 
3224
       else{
 
3225
         int *flg = (int *)(dataptr+len);
 
3226
         while(armci_util_int_getval(flg) != ARMCI_STAMP){
 
3227
           loop++;
 
3228
           loop %=100000;
 
3229
           if(loop==0){
 
3230
             if(DEBUG_CLN){
 
3231
               printf("%d: client waiting (%p)=%d off=%d\n",
 
3232
                      armci_me,flg,*flg,len);
 
3233
               fflush(stdout);
 
3234
             }
 
3235
           }
 
3236
         }
 
3237
       }
 
3238
    }
 
3239
    return dataptr;
 
3240
}
 
3241
 
 
3242
 
 
3243
#ifdef GET_STRIDED_COPY_PIPELINED
 
3244
/**Same as armci_ReadFromDirect, except reads partial segments
 
3245
 *  (identify by stamping done in armci_send_req_msg() and
 
3246
 *  returns. Note that the return value is the starting pointer of the
 
3247
 *  buffer containig the data. It is the same for all the segments
 
3248
 *  read for a message. 
 
3249
 * @param proc IN Read data corresponding to an earlier req to this proc
 
3250
 * @param msginfo IN The request for which we are reading now
 
3251
 * @param len IN #bytes in the total response
 
3252
 * @param bytes_done OUT @bytes of the total response read so far (monotonic)
 
3253
 * @return Starting pointer to the buffer containing the data
 
3254
 */
 
3255
char *armci_ReadFromDirectSegment(int proc, request_header_t *msginfo, int len, int *bytes_done) {
 
3256
  int cluster = armci_clus_id(proc);
 
3257
  vapibuf_ext_t* evbuf=BUF_TO_EVBUF(msginfo);
 
3258
  char *dataptr = GET_DATA_PTR(evbuf->buf);
 
3259
  extern void armci_util_wait_int(volatile int *,int,int);
 
3260
 
 
3261
  if(DEBUG_CLN){ printf("%d(c):read direct %d qp=%p\n",armci_me,
 
3262
                        len,&(SRV_con+cluster)->qp); fflush(stdout);
 
3263
  }
 
3264
 
 
3265
  if(mark_buf_send_complete[evbuf->snd_dscr.wr_id]==0)
 
3266
    armci_send_complete(&(evbuf->snd_dscr),"armci_ReadFromDirect",1); 
 
3267
 
 
3268
  if(!msginfo->bypass){
 
3269
    long *flag;
 
3270
    int *last, *mid1, *mid2, third;
 
3271
    int loop = 0;
 
3272
    flag = &(msginfo->tag.ack);
 
3273
    if(msginfo->operation==GET){
 
3274
      last = (int *)(dataptr+len-sizeof(int));
 
3275
      if(msginfo->dscrlen >= (len-sizeof(int))){
 
3276
        last = (int *)(dataptr+len+msginfo->dscrlen-sizeof(int));
 
3277
        dataptr+=msginfo->dscrlen;
 
3278
      }
 
3279
      third = (last-(int*)(msginfo->dscrlen+(char*)(msginfo+1)))/3;
 
3280
      mid2 = (last - third);
 
3281
      mid1 = mid2 - third;
 
3282
 
 
3283
      if(DEBUG_CLN){
 
3284
        printf("\n%d:flagval=%d at ptr=%p ack=%ld dist=%d\n",armci_me,*last,
 
3285
               last,*flag,len);fflush(stdout);
 
3286
      }
 
3287
 
 
3288
      while(armci_util_int_getval(last) == ARMCI_STAMP &&
 
3289
            armci_util_long_getval(flag)  != ARMCI_STAMP){
 
3290
        loop++;
 
3291
        loop %=100000;
 
3292
        if(loop==0){
 
3293
          if(DEBUG_CLN){
 
3294
            printf("%d: client last(%p)=%d flag(%p)=%ld off=%d\n",
 
3295
                   armci_me,last,*last,flag,*flag,msginfo->datalen);
 
3296
            fflush(stdout);
 
3297
          }
 
3298
        }
 
3299
 
 
3300
        {
 
3301
          int ssize = GET_STRIDED_COPY_PIPELINED_SIZE/sizeof(int);
 
3302
          int *sfirst = (int*)(msginfo->dscrlen+(char*)(msginfo+1))+ssize; /*stamping
 
3303
                                                                             can start here*/
 
3304
          int *slast = last;
 
3305
          int off = (((int *)(dataptr+*bytes_done)-sfirst+ssize)/ssize)*ssize;
 
3306
          int *ptr = sfirst+off;
 
3307
          dassert(1,off>=0);
 
3308
          dassert(1,(void *)sfirst>dataptr);
 
3309
          dassert(1,(void *)ptr>dataptr);
 
3310
          if(ptr<=slast && armci_util_int_getval(ptr)!=ARMCI_STAMP) {
 
3311
            *bytes_done = ((char*)ptr)-dataptr;
 
3312
            return dataptr;
 
3313
          }
 
3314
        }
 
3315
      }
 
3316
      *flag = 0L;
 
3317
      *bytes_done = len;
 
3318
      return dataptr;
 
3319
    }
 
3320
    else if(msginfo->operation == REGISTER){
 
3321
      while(armci_util_long_getval(flag)  != ARMCI_STAMP){
 
3322
        loop++;
 
3323
        loop %=100000;
 
3324
        if(loop==0){
 
3325
          if(DEBUG_CLN){
 
3326
            printf("%d: client flag(%p)=%ld off=%d\n",
 
3327
                   armci_me,flag,*flag,msginfo->datalen);
 
3328
            fflush(stdout);
 
3329
          }
 
3330
        }
 
3331
      }
 
3332
    }
 
3333
    else{
 
3334
      int *flg = (int *)(dataptr+len);
 
3335
      while(armci_util_int_getval(flg) != ARMCI_STAMP){
 
3336
        loop++;
 
3337
        loop %=100000;
 
3338
        if(loop==0){
 
3339
          if(DEBUG_CLN){
 
3340
            printf("%d: client waiting (%p)=%d off=%d\n",
 
3341
                   armci_me,flg,*flg,len);
 
3342
            fflush(stdout);
 
3343
          }
 
3344
        }
 
3345
      }
 
3346
    }
 
3347
  }
 
3348
  *bytes_done = len;
 
3349
  return dataptr;
 
3350
}
 
3351
#endif
 
3352
 
 
3353
/**
 
3354
  * @param proc IN id of remote client to put to
 
3355
  * @param buf IN local buf (has to be registered)
 
3356
 */
 
3357
void armci_send_data_to_client(int proc, void *buf, int bytes,void *dbuf)
 
3358
{
 
3359
  int i, rc = 0;
 
3360
    struct ibv_send_wr *bad_wr;
 
3361
    struct ibv_send_wr sdscr;
 
3362
    struct ibv_sge ssg_entry;
 
3363
 
 
3364
    if(DEBUG_SERVER){
 
3365
       printf("\n%d(s):sending data to client %d at %p flag = %p bytes=%d\n",
 
3366
               armci_me,
 
3367
              proc,dbuf,(char *)dbuf+bytes-sizeof(int),bytes);fflush(stdout);
 
3368
    }
 
3369
 
 
3370
    memset(&sdscr,0,sizeof(struct ibv_send_wr));
 
3371
    memset(&ssg_entry,0,sizeof(ssg_entry));
 
3372
    armci_init_vbuf_srdma(&sdscr,&ssg_entry,buf,dbuf,bytes,
 
3373
                          &serv_memhandle,(handle_array+proc));
 
3374
 
 
3375
    if(DEBUG_SERVER){
 
3376
       printf("\n%d(s):handle_array[%d]=%p dbuf=%p flag=%p bytes=%d\n",armci_me,
 
3377
              proc,&handle_array[proc],(char *)dbuf,
 
3378
              (char *)dbuf+bytes-sizeof(int),bytes);
 
3379
       fflush(stdout);
 
3380
    }
 
3381
 
 
3382
#if defined(PEND_BUFS)
 
3383
    for(i=proc*(IMM_BUF_NUM+1); i<(proc+1)*(IMM_BUF_NUM+1); i++) {
 
3384
      if((char*)buf>= serv_buf_arr[i]->buf && 
 
3385
         (char*)buf<IMM_BUF_LEN+(char*)serv_buf_arr[i]->buf)
 
3386
        break;
 
3387
    }
 
3388
 
 
3389
#if SRI_CORRECT
 
3390
     if(i<(proc+1)*(IMM_BUF_NUM+1)) {
 
3391
      /*Message from an immediate buffer*/
 
3392
     assert(serv_buf_arr[i]->send_pending==0);
 
3393
      serv_buf_arr[i]->send_pending=1;
 
3394
      sdscr.wr_id = DSCRID_IMMBUF_RESP+i;
 
3395
    }
 
3396
    else 
 
3397
#endif
 
3398
      {
 
3399
        sdscr.wr_id = DSCRID_IMMBUF_RESP+armci_nproc*(IMM_BUF_NUM+1)+1;
 
3400
      }
 
3401
/* #endif */
 
3402
 
 
3403
/* #if defined(PEND_BUFS) */
 
3404
/*     { */
 
3405
/*       static uint64_t ctr=DSCRID_IMMBUF_RESP; */
 
3406
/*       sdscr.wr_id = ctr; */
 
3407
/*       ctr = (ctr+1-DSCRID_IMMBUF_RESP)%(DSCRID_IMMBUF_RESP_END-DSCRID_IMMBUF_RESP)+DSCRID_IMMBUF_RESP; */
 
3408
/*     } */
 
3409
#else
 
3410
    sdscr.wr_id = proc+armci_nproc;
 
3411
#endif
 
3412
    rc = ibv_post_send((CLN_con+proc)->qp, &sdscr, &bad_wr);
 
3413
    dassert1(1,rc==0,rc);
 
3414
 
 
3415
#if !defined(PEND_BUFS)
 
3416
    armci_send_complete(&sdscr,"armci_send_data_to_client",1);
 
3417
#endif
 
3418
}
 
3419
 
 
3420
void armci_WriteToDirect(int proc, request_header_t* msginfo, void *buf)
 
3421
{
 
3422
int bytes;
 
3423
int *last;
 
3424
    ARMCI_PR_DBG("enter",0);
 
3425
    bytes = (int)msginfo->datalen;
 
3426
    if(DEBUG_SERVER){
 
3427
      printf("%d(s):write to direct sent %d to %d at %p\n",armci_me,
 
3428
             bytes,proc,(char *)msginfo->tag.data_ptr);
 
3429
      fflush(stdout);
 
3430
    }
 
3431
    if(msginfo->operation!=GET){
 
3432
       *(int *)((char *)buf+bytes)=ARMCI_STAMP;
 
3433
       bytes+=sizeof(int);
 
3434
    }
 
3435
#if defined(PEND_BUFS)
 
3436
    if(!msginfo->tag.imm_msg) {
 
3437
      int i;
 
3438
/*       fprintf(stderr, "%d:: Not immediate mesg operated on\n", armci_me); */
 
3439
      assert(msginfo->operation == GET); /*nothing else uses this for now*/
 
3440
      /**This is a pending buf*/
 
3441
      vapibuf_pend_t *pbuf=NULL;
 
3442
      int index;
 
3443
      for(i = 0; i<PENDING_BUF_NUM; i++) {
 
3444
        if(serv_pendbuf_arr[i].buf == (char *)msginfo) {
 
3445
          pbuf = &serv_pendbuf_arr[i];
 
3446
          index = i;
 
3447
          break;
 
3448
        }
 
3449
      }
 
3450
      assert(pbuf != NULL);
 
3451
      assert(sizeof(request_header_t)+msginfo->dscrlen+bytes<PENDING_BUF_LEN);
 
3452
      _armci_send_data_to_client_pbuf(proc, &pbuf->sdscr,
 
3453
                                      PBUF_BUFID_TO_PUT_WRID(index),
 
3454
                                      &pbuf->sg_entry,
 
3455
                                      msginfo->tag.data_ptr, buf,
 
3456
                                      bytes);
 
3457
    }
 
3458
    else 
 
3459
#endif
 
3460
    {
 
3461
      armci_send_data_to_client(proc,buf,bytes,msginfo->tag.data_ptr);
 
3462
    }
 
3463
    /*if(msginfo->dscrlen >= (bytes-sizeof(int)))
 
3464
       last = (int*)(((char*)(buf)) + (msginfo->dscrlen+bytes - sizeof(int)));
 
3465
    else*/
 
3466
       last = (int*)(((char*)(buf)) + (bytes - sizeof(int)));
 
3467
 
 
3468
    if(msginfo->operation==GET && *last == ARMCI_STAMP){
 
3469
       SERVER_SEND_ACK(msginfo->from);
 
3470
    }
 
3471
    armci_ack_proc=NONE;
 
3472
    ARMCI_PR_DBG("exit",0);
 
3473
}
 
3474
 
 
3475
 
 
3476
#if defined(PEND_BUFS)
 
3477
void armci_rcv_req(void *mesg,void *phdr,void *pdescr,void *pdata,int *buflen)
 
3478
{
 
3479
  request_header_t *msginfo = *(request_header_t**)mesg;
 
3480
  *(void **)phdr = msginfo;
 
3481
 
 
3482
  if(msginfo->tag.imm_msg) 
 
3483
    *buflen = IMM_BUF_LEN - sizeof(request_header_t) - msginfo->dscrlen;
 
3484
  else 
 
3485
    *buflen = PENDING_BUF_LEN - sizeof(request_header_t) - msginfo->dscrlen;
 
3486
  
 
3487
  *(void **)pdata = msginfo->dscrlen + (char *)(msginfo+1);
 
3488
  if(msginfo->bytes)
 
3489
    *(void **)pdescr = msginfo+1;
 
3490
  else
 
3491
    *(void **)pdescr = NULL;
 
3492
}
 
3493
#else
 
3494
void armci_rcv_req(void *mesg,void *phdr,void *pdescr,void *pdata,int *buflen)
 
3495
{
 
3496
  vapibuf_t *vbuf = (vapibuf_t*)mesg;
 
3497
  request_header_t *msginfo = (request_header_t *)vbuf->buf;
 
3498
  *(void **)phdr = msginfo;
 
3499
 
 
3500
  ARMCI_PR_DBG("enter",msginfo->operation);
 
3501
  if(DEBUG_SERVER){
 
3502
    printf("%d(server): got %d req (dscrlen=%d datalen=%d) from %d\n",
 
3503
           armci_me, msginfo->operation, msginfo->dscrlen,
 
3504
           msginfo->datalen, msginfo->from); fflush(stdout);
 
3505
  }
 
3506
 
 
3507
  /* we leave room for msginfo on the client side */
 
3508
  *buflen = MSG_BUFLEN - sizeof(request_header_t);
 
3509
  
 
3510
  if(msginfo->bytes) {
 
3511
    *(void **)pdescr = msginfo+1;
 
3512
    if(msginfo->operation == GET)
 
3513
      *(void **)pdata = MessageRcvBuffer;
 
3514
    else
 
3515
      *(void **)pdata = msginfo->dscrlen + (char*)(msginfo+1);
 
3516
  }else {
 
3517
    *(void**)pdescr = NULL;
 
3518
    *(void**)pdata = MessageRcvBuffer;
 
3519
  }
 
3520
  ARMCI_PR_DBG("exit",msginfo->operation);
 
3521
}
 
3522
#endif
 
3523
 
 
3524
static void posts_scatter_desc(sr_descr_t *pend_dscr,int proc,int type)
 
3525
{
 
3526
int rc;
 
3527
int cluster = armci_clus_id(proc);
 
3528
struct ibv_recv_wr *scat_dscr;
 
3529
struct ibv_recv_wr *bad_wr;
 
3530
 
 
3531
    scat_dscr = &pend_dscr->rdescr;
 
3532
 
 
3533
    /*armci_vapi_print_dscr_info(NULL,scat_dscr);*/
 
3534
    if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN)){
 
3535
       printf("%d(%d) : inside posts scatter dscr, id is %d\n",
 
3536
              armci_me,type,scat_dscr->wr_id);
 
3537
       fflush(stdout);
 
3538
    }
 
3539
 
 
3540
    if(type == SERV)
 
3541
        rc = ibv_post_recv((CLN_con + proc)->qp, scat_dscr, &bad_wr);
 
3542
    else
 
3543
        rc = ibv_post_recv((SRV_con+cluster)->qp, scat_dscr, &bad_wr);
 
3544
    dassert1(1,rc==0,rc);
 
3545
 
 
3546
    if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN) ) {
 
3547
       printf("\n%d: list_length is %d, id is %ld\n",
 
3548
              armci_me,scat_dscr->num_sge,scat_dscr->wr_id);
 
3549
       fflush(stdout);
 
3550
    }
 
3551
}
 
3552
 
 
3553
 
 
3554
/*\
 
3555
 *  client calls from request.c
 
3556
 *  server calls from ds-shared.c
 
3557
\*/
 
3558
static sr_descr_t serv_blocking_scatter_dscr;
 
3559
static sr_descr_t client_blocking_scatter_dscr;
 
3560
void armci_post_scatter(void *dest_ptr, int dest_stride_arr[], int count[],
 
3561
     int stride_levels, armci_vapi_memhndl_t *mhandle,
 
3562
     int proc, int nbtag, int type, sr_descr_t **srd)
 
3563
{
 
3564
    int i;
 
3565
    int total_size = 0;
 
3566
    int total_of_2D = 1;
 
3567
    int index[MAX_STRIDE_LEVEL], unit[MAX_STRIDE_LEVEL];
 
3568
    int j,k,y,z;
 
3569
    int num_dscr = 0;
 
3570
    int num_xmit = 0, num_seg, max_seg, rem_seg,vecind;
 
3571
    char* src, *src1;
 
3572
    sr_descr_t *pend_dscr;
 
3573
    struct ibv_sge *scat_sglist;
 
3574
    struct ibv_recv_wr *scat_dscr;
 
3575
 
 
3576
    if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN) ){
 
3577
       printf("%d(%d)  : inside post_scatter %d\n",armci_me,type,nbtag);
 
3578
       fflush(stdout);
 
3579
    }
 
3580
 
 
3581
    max_seg =  armci_max_num_sg_ent;
 
3582
 
 
3583
    THREAD_LOCK(armci_user_threads.net_lock);
 
3584
 
 
3585
    if(nbtag){
 
3586
       pend_dscr = armci_vapi_get_next_rdescr(nbtag,1);
 
3587
       if(srd!=NULL)*srd=pend_dscr;
 
3588
    }
 
3589
    else{
 
3590
       pend_dscr = &client_blocking_scatter_dscr;
 
3591
       pend_dscr->rdescr.wr_id=DSCRID_SCATGAT + MAX_PENDING;
 
3592
    }
 
3593
 
 
3594
    /*pend_dscr->proc = proc;*/
 
3595
    pend_dscr->numofrecvs=0;
 
3596
 
 
3597
    scat_dscr = &pend_dscr->rdescr;
 
3598
    scat_sglist = pend_dscr->sg_entry;
 
3599
    /* scat_dscr->opcode = VAPI_RECEIVE; no ->opcode in ibv_recv_wr */
 
3600
    /* scat_dscr->comp_type = VAPI_SIGNALED; no ->comp_type in ibv_recv_wr */
 
3601
    scat_dscr->sg_list = scat_sglist;
 
3602
    scat_dscr->num_sge = 0;
 
3603
 
 
3604
    index[2] = 0; unit[2] = 1;
 
3605
    if(stride_levels > 1){
 
3606
       total_of_2D = count[2];
 
3607
       for(j=3; j<=stride_levels; j++){
 
3608
         index[j] = 0; unit[j] = unit[j-1]*count[j-1];
 
3609
         total_of_2D*=count[j];
 
3610
       }
 
3611
    }
 
3612
 
 
3613
    num_xmit = total_of_2D*count[1]/max_seg;
 
3614
    rem_seg = (total_of_2D*count[1])%max_seg;
 
3615
    if(num_xmit == 0) num_xmit = 1;
 
3616
    else if(rem_seg!= 0)num_xmit++;
 
3617
 
 
3618
 
 
3619
    if ((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN) ) {
 
3620
       printf("%d(%d):armci_post_scatter num_xmit = %d\t, rem_seg = %d\n",
 
3621
               armci_me,type,num_xmit,rem_seg);
 
3622
       fflush(stdout);
 
3623
    }
 
3624
 
 
3625
    k=0; vecind = 0;
 
3626
    if(rem_seg!=0 && k==(num_xmit-1))num_seg = rem_seg;
 
3627
    else num_seg = max_seg;
 
3628
 
 
3629
    y=0,z=0;
 
3630
    for(i=0;i<total_of_2D;i++){
 
3631
       src = (char *)dest_ptr;
 
3632
       for(j=2;j<=stride_levels;j++){
 
3633
         src+= index[j]*dest_stride_arr[j-1];
 
3634
         if(((i+1)%unit[j]) == 0) index[j]++;
 
3635
         if(index[j] >= count[j]) index[j] =0;
 
3636
       }
 
3637
       src1 = src;
 
3638
 
 
3639
       for(j=0; j<count[1]; j++, vecind++){
 
3640
         if(vecind == num_seg) {
 
3641
           posts_scatter_desc(pend_dscr,proc,type);
 
3642
           pend_dscr->numofrecvs++;
 
3643
 
 
3644
           /* the previous one has been posted, start off new*/
 
3645
           scat_dscr->num_sge = 0;
 
3646
           y = 0; /* reuse the same scatter descriptor */
 
3647
           vecind=0;total_size=0;k++;
 
3648
           if(rem_seg!=0 && k==(num_xmit-1))num_seg = rem_seg;
 
3649
         }
 
3650
         /* fill the scatter descriptor */
 
3651
         scat_sglist[y].addr = (uint64_t)src1;
 
3652
         scat_sglist[y].lkey = mhandle->lkey;
 
3653
         scat_sglist[y].length = count[0];
 
3654
         scat_dscr->num_sge++;
 
3655
         src1 += dest_stride_arr[0];
 
3656
         y++;
 
3657
 
 
3658
       }
 
3659
 
 
3660
       if(vecind == num_seg){
 
3661
         posts_scatter_desc(pend_dscr,proc,type);
 
3662
         pend_dscr->numofrecvs++;
 
3663
 
 
3664
         /* the previous one has been posted, start off new*/
 
3665
         scat_dscr->num_sge = 0;
 
3666
         y =0 ;
 
3667
         vecind = 0; total_size=0; k++;
 
3668
         if(rem_seg!=0 && k==(num_xmit-1))num_seg=rem_seg;
 
3669
         else num_seg = max_seg;
 
3670
       }
 
3671
 
 
3672
    }
 
3673
 
 
3674
    THREAD_UNLOCK(armci_user_threads.net_lock);
 
3675
 
 
3676
/*     printf("%d(s): num scatters posted=%d\n", armci_me,pend_dscr->numofrecvs); */
 
3677
    if(!nbtag){
 
3678
       /*if blocking call wait_for_blocking_scatter to complete*/
 
3679
    }
 
3680
    return;
 
3681
}
 
3682
 
 
3683
void armci_wait_for_blocking_scatter()
 
3684
{
 
3685
sr_descr_t *pend_dscr=&client_blocking_scatter_dscr;
 
3686
int i;
 
3687
    armci_recv_complete(&pend_dscr->rdescr,"armci_post_scatter",pend_dscr->numofrecvs);
 
3688
}
 
3689
 
 
3690
 
 
3691
/*\
 
3692
 *  function used by armci_post_gather to actually post the sctter list
 
3693
\*/
 
3694
static void posts_gather_desc(sr_descr_t *pend_dscr,int proc,int type)
 
3695
{
 
3696
    int rc;
 
3697
    int cluster = armci_clus_id(proc);
 
3698
    struct ibv_send_wr *gat_dscr;
 
3699
    struct ibv_send_wr *bad_wr;
 
3700
 
 
3701
    THREAD_LOCK(armci_user_threads.net_lock);
 
3702
 
 
3703
    gat_dscr = &pend_dscr->sdescr;
 
3704
    /*armci_vapi_print_dscr_info(gat_dscr,NULL);*/
 
3705
    if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN)){
 
3706
       printf("%d: type(client=1)=%d inside posts gather dscr, id is %d\n",
 
3707
              armci_me,type,gat_dscr->wr_id);
 
3708
       fflush(stdout);
 
3709
    }
 
3710
 
 
3711
    rc = 0;
 
3712
    if(type == CLN){
 
3713
       rc = ibv_post_send((SRV_con+cluster)->qp, gat_dscr, &bad_wr);
 
3714
    }
 
3715
    else{
 
3716
        rc = ibv_post_send((CLN_con + proc)->qp, gat_dscr, &bad_wr);
 
3717
    }
 
3718
    dassert1(1,rc==0,rc);
 
3719
 
 
3720
    THREAD_UNLOCK(armci_user_threads.net_lock);
 
3721
 
 
3722
}
 
3723
 
 
3724
/*\
 
3725
 *  posts a bunch of gather descriptors
 
3726
\*/ 
 
3727
static sr_descr_t serv_blocking_gather_dscr;
 
3728
static sr_descr_t client_blocking_gather_dscr;
 
3729
void armci_post_gather(void *src_ptr, int src_stride_arr[], int count[],
 
3730
      int stride_levels, armci_vapi_memhndl_t *mhandle,
 
3731
      int proc,int nbtag, int type, sr_descr_t **srd)
 
3732
{
 
3733
    int i;
 
3734
    int total_of_2D = 1;
 
3735
    int total_size = 0;
 
3736
    int index[MAX_STRIDE_LEVEL], unit[MAX_STRIDE_LEVEL];
 
3737
    int j,k,y,z;
 
3738
    int num_posted = 0;
 
3739
    char *src, *src1;
 
3740
    int num_xmit = 0, num_seg, max_seg, rem_seg,vecind;
 
3741
    sr_descr_t *pend_dscr;
 
3742
 
 
3743
    struct ibv_sge *gat_sglist;
 
3744
    struct ibv_send_wr *gat_dscr;
 
3745
 
 
3746
    if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN)){
 
3747
      printf("%d(%d)  : inside post_gather\n",armci_me,type);
 
3748
      fflush(stdout);
 
3749
    }
 
3750
 
 
3751
    max_seg =  armci_max_num_sg_ent;
 
3752
    if(nbtag){
 
3753
       pend_dscr = armci_vapi_get_next_sdescr(nbtag,1);
 
3754
       if(srd!=NULL)*srd=pend_dscr;
 
3755
    }
 
3756
    else{
 
3757
       pend_dscr = &client_blocking_gather_dscr;
 
3758
       pend_dscr->sdescr.wr_id=DSCRID_SCATGAT + MAX_PENDING;
 
3759
    }
 
3760
    pend_dscr->numofsends=0;
 
3761
 
 
3762
    gat_dscr = &pend_dscr->sdescr;
 
3763
    gat_sglist = pend_dscr->sg_entry;
 
3764
    gat_dscr->opcode = IBV_WR_SEND;
 
3765
    gat_dscr->send_flags = IBV_SEND_SIGNALED;
 
3766
    gat_dscr->sg_list = gat_sglist;
 
3767
    gat_dscr->num_sge = 0;
 
3768
/*     gat_dscr->send_flags = 0; */
 
3769
 
 
3770
    index[2] = 0; unit[2] = 1;
 
3771
    if(stride_levels > 1){
 
3772
      total_of_2D = count[2];
 
3773
      for(j=3; j<=stride_levels; j++){
 
3774
        index[j] = 0; unit[j] = unit[j-1]*count[j-1];
 
3775
        total_of_2D*=count[j];
 
3776
      }
 
3777
    }
 
3778
 
 
3779
    num_xmit = total_of_2D*count[1]/max_seg;
 
3780
    rem_seg = (total_of_2D*count[1])%max_seg;
 
3781
    if(num_xmit == 0) num_xmit = 1;
 
3782
    else if(rem_seg!= 0)num_xmit++;
 
3783
 
 
3784
    if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN) ){ 
 
3785
       printf("%d(%d):armci_post_gather total_2D=%d, num_xmit=%d, rem_seg =%d, count[1] = %d\n",armci_me,type,total_of_2D, num_xmit,rem_seg,count[1]);
 
3786
      fflush(stdout);
 
3787
    }
 
3788
 
 
3789
    k=0; vecind = 0;
 
3790
    if(rem_seg!=0 && k==(num_xmit-1))num_seg = rem_seg;
 
3791
    else num_seg = max_seg;
 
3792
 
 
3793
    y=0,z=0;
 
3794
    for(i=0;i<total_of_2D;i++){
 
3795
       src = (char *)src_ptr;
 
3796
       for(j=2;j<=stride_levels;j++){
 
3797
         src+= index[j]*src_stride_arr[j-1];
 
3798
         if(((i+1)%unit[j]) == 0) index[j]++;
 
3799
         if(index[j] >= count[j]) index[j] =0;
 
3800
       }
 
3801
       src1 = src;
 
3802
 
 
3803
       for(j=0; j<count[1]; j++, vecind++){
 
3804
         if(vecind == num_seg){
 
3805
           posts_gather_desc(pend_dscr,proc,type);
 
3806
           pend_dscr->numofsends++;
 
3807
 
 
3808
           /* the previous one has been posted, start off new*/
 
3809
           gat_dscr->num_sge = 0;
 
3810
           y = 0;
 
3811
           vecind=0;total_size=0;k++;
 
3812
           if(rem_seg!=0 && k==(num_xmit-1))num_seg = rem_seg;
 
3813
         }
 
3814
 
 
3815
         /* fill the gather descriptor */
 
3816
         gat_sglist[y].addr = (uint64_t)src1;
 
3817
         gat_sglist[y].lkey = mhandle->lkey;
 
3818
         gat_sglist[y].length = count[0];
 
3819
         gat_dscr->num_sge++;
 
3820
         src1 += src_stride_arr[0];
 
3821
         y++;
 
3822
 
 
3823
       }
 
3824
 
 
3825
       if(vecind == num_seg){
 
3826
         posts_gather_desc(pend_dscr,proc,type);
 
3827
         pend_dscr->numofsends++;
 
3828
         if((type==SERV && DEBUG_SERVER) || (type==CLN && DEBUG_CLN) ){
 
3829
           printf("%d(%d)posts_gather_desc done\n",armci_me,type);
 
3830
           fflush(stdout);
 
3831
         }
 
3832
 
 
3833
         /* the previous one has been posted, start off new*/
 
3834
         gat_dscr->num_sge = 0;
 
3835
         y = 0;
 
3836
         vecind = 0; total_size=0; k++;
 
3837
         if(rem_seg!=0 && k==(num_xmit-1))num_seg=rem_seg;
 
3838
         else num_seg = max_seg;
 
3839
       }
 
3840
    }
 
3841
/*     printf("%d: num gathers posted =%d\n",armci_me,pend_dscr->numofsends); */
 
3842
    if(!nbtag){
 
3843
       /*complete here*/
 
3844
       armci_send_complete(&pend_dscr->sdescr,"armci_post_gather",pend_dscr->numofsends);
 
3845
    }
 
3846
    return;
 
3847
}
 
3848
/***********************END SCATTER GATHER STUFF******************************/
 
3849
 
 
3850
 
 
3851
 
 
3852
/***********************SPECIAL SEND/RECV*************************************/
 
3853
void armci_server_direct_send(int dst, char *src_buf, char *dst_buf, int len,
 
3854
                              uint32_t *lkey, uint32_t *rkey)
 
3855
{
 
3856
    int rc = 0;
 
3857
    struct ibv_wc *pdscr=NULL;
 
3858
    struct ibv_wc pdscr1;
 
3859
    struct ibv_send_wr sdscr;
 
3860
    struct ibv_sge ssg_entry;
 
3861
 
 
3862
    pdscr = &pdscr1;
 
3863
 
 
3864
    if(DEBUG_SERVER){
 
3865
       printf("\n%d(s):sending dir data to client %d at %p bytes=%d last=%p\n",
 
3866
                armci_me,dst,dst_buf,len,(dst_buf+len-4));fflush(stdout);
 
3867
    }
 
3868
 
 
3869
    memset(&sdscr,0,sizeof(struct ibv_send_wr));
 
3870
    armci_init_vbuf_srdma(&sdscr,&ssg_entry,src_buf,dst_buf,len,NULL,NULL);
 
3871
    sdscr.wr.rdma.rkey = *rkey;
 
3872
    ssg_entry.lkey = *lkey;
 
3873
 
 
3874
    sdscr.wr_id = dst+armci_nproc;
 
3875
    struct ibv_send_wr *bad_wr;
 
3876
    rc = ibv_post_send((CLN_con+dst)->qp, &sdscr, &bad_wr);
 
3877
    dassert1(1,rc==0,rc);
 
3878
 
 
3879
    while (rc == 0) {
 
3880
       rc = ibv_poll_cq(CLN_nic->scq, 1, pdscr);
 
3881
    }
 
3882
    dassertp(1,rc>=0,("%d: rc=%d id=%d status=%d\n",
 
3883
                      armci_me,rc,(int)pdscr->wr_id,pdscr->status));
 
3884
    dassert1(1,pdscr->status==IBV_WC_SUCCESS,pdscr->status);;
 
3885
}
 
3886
 
 
3887
 
 
3888
 
 
3889
void armci_send_contig_bypass(int proc, request_header_t *msginfo,
 
3890
                              void *src_ptr, void *rem_ptr, int bytes)
 
3891
{
 
3892
    int *last;
 
3893
    uint32_t *lkey=NULL;
 
3894
    uint32_t *rkey;    
 
3895
    int dscrlen = msginfo->dscrlen;
 
3896
 
 
3897
    last = (int*)(((char*)(src_ptr)) + (bytes - sizeof(int)));
 
3898
    dassertp(1,msginfo->pinned,("%d: not pinned proc=%d",armci_me,proc));
 
3899
 
 
3900
    rkey = (uint32_t *)((char *)(msginfo+1)+dscrlen-(sizeof(uint32_t)+sizeof(uint32_t)));
 
3901
 
 
3902
    if(DEBUG_SERVER){
 
3903
       printf("%d(server): sending data bypass to %d (%p,%p) %d %d\n", armci_me,
 
3904
               msginfo->from,src_ptr, rem_ptr,*lkey,*rkey);
 
3905
       fflush(stdout);
 
3906
    }
 
3907
    armci_server_direct_send(msginfo->from,src_ptr,rem_ptr,bytes,lkey,rkey);
 
3908
 
 
3909
    if(*last == ARMCI_STAMP){
 
3910
       SERVER_SEND_ACK(msginfo->from);
 
3911
    }
 
3912
}
 
3913
 
 
3914
void armci_rcv_strided_data_bypass_both(int proc, request_header_t *msginfo,
 
3915
                                       void *ptr, int *count, int stride_levels)
 
3916
{
 
3917
int datalen = msginfo->datalen;
 
3918
int *last;
 
3919
long *ack;
 
3920
int loop=0;
 
3921
 
 
3922
    if(DEBUG_CLN){ printf("%d:rcv_strided_data_both bypass from %d\n",
 
3923
                armci_me,  proc); fflush(stdout);
 
3924
    }
 
3925
    if(!stride_levels){
 
3926
      last = (int*)(((char*)(ptr)) + (count[0] -sizeof(int)));
 
3927
      ack  = (long *)&msginfo->tag;
 
3928
      while(armci_util_int_getval(last) == ARMCI_STAMP &&
 
3929
            armci_util_long_getval(ack)  != ARMCI_STAMP){
 
3930
        loop++;
 
3931
        loop %=1000000;
 
3932
        if(loop==0){
 
3933
          if(DEBUG_CLN){
 
3934
            printf("%d: client last(%p)=%d ack(%p)=%ld off=%d\n",
 
3935
                  armci_me,last,*last,ack,*ack,(int)((char*)last - (char*)ptr));
 
3936
            fflush(stdout);
 
3937
          }
 
3938
        }
 
3939
      }
 
3940
    }
 
3941
    else {
 
3942
      printf("\n%d:rcv_strided_data called, it should never be called\n",armci_me);
 
3943
      armci_dscrlist_recv_complete(0,"armci_rcv_strided_data_bypass_both",NULL);
 
3944
    }
 
3945
 
 
3946
    if(DEBUG_CLN){printf("%d:rcv_strided_data bypass both: %d bytes from %d\n",
 
3947
                          armci_me, datalen, proc); fflush(stdout);
 
3948
    }
 
3949
}
 
3950
 
 
3951
 
 
3952
/*************************END OF FILE UNUSED CODE BELOW********************/
 
3953
int armci_pin_memory(void *ptr, int stride_arr[], int count[], int strides)
 
3954
{
 
3955
    printf("\n%d:armci_pin_memory not implemented",armci_me);fflush(stdout);
 
3956
    return 0;
 
3957
}
 
3958
 
 
3959
 
 
3960
void armci_client_send_ack(int proc, int n)
 
3961
{
 
3962
    printf("\n%d:client_send_ack not implemented",armci_me);fflush(stdout);
 
3963
}
 
3964
 
 
3965
 
 
3966
void armci_rcv_strided_data_bypass(int proc, request_header_t* msginfo,
 
3967
                                   void *ptr, int stride_levels)
 
3968
{
 
3969
    printf("\n%d:armci_rcv_strided_data_bypass not implemented",armci_me);
 
3970
    fflush(stdout);
 
3971
}
 
3972
 
 
3973
 
 
3974
void armci_unpin_memory(void *ptr, int stride_arr[], int count[], int strides)
 
3975
{
 
3976
    printf("\n%d:armci_unpin_memory not implemented",armci_me);fflush(stdout);
 
3977
}
 
3978
 
 
3979
 
 
3980
int armcill_server_wait_ack(int proc, int n)
 
3981
{
 
3982
    printf("\n%d:armcill_server_wait_ack not implemented",armci_me);
 
3983
    fflush(stdout);
 
3984
    return(0);
 
3985
}
 
3986
 
 
3987
 
 
3988
void armcill_server_put(int proc, void* s, void *d, int len)
 
3989
{
 
3990
    printf("\n%d:armcill_server_put not implemented",armci_me);fflush(stdout);
 
3991
}
 
3992
 
 
3993
 
 
3994
/*\
 
3995
 *  initialising the atomic send descriptor
 
3996
\*/
 
3997
void armci_init_vapibuf_atomic(struct ibv_send_wr *sd, struct ibv_sge *sg,
 
3998
                   int op, int*ploc,int *prem, int extra,
 
3999
                   int id,ARMCI_MEMHDL_T *lhandle,
 
4000
                   ARMCI_MEMHDL_T *rhandle)
 
4001
{
 
4002
    if (1) {
 
4003
       printf("%d(c) : entered armci_init_vapibuf_atomic\n",armci_me);
 
4004
       fflush(stdout);
 
4005
    }
 
4006
    memset(sd,0,sizeof(struct ibv_send_wr));
 
4007
    if (op == ARMCI_FETCH_AND_ADD_LONG ) {
 
4008
       printf("%d(c) :setting opcode for snd dscr to FETCH_AND_ADD\n",armci_me);
 
4009
       sd->opcode = IBV_WR_ATOMIC_FETCH_AND_ADD;
 
4010
       sd->wr.atomic.compare_add = (uint64_t)extra;
 
4011
    } else if(op == ARMCI_SWAP_LONG){
 
4012
       sd->opcode = IBV_WR_ATOMIC_CMP_AND_SWP;
 
4013
       sd->wr.atomic.swap = (uint64_t)extra;
 
4014
    }
 
4015
    sd->send_flags = IBV_SEND_SIGNALED;
 
4016
    sg->length = 8; /* 64 bit atomic*/
 
4017
    printf("--------\n");
 
4018
    sg->addr= (uint64_t)(void *)ploc;
 
4019
    if(lhandle)
 
4020
    sg->lkey = lhandle->lkey;
 
4021
    sd->sg_list = sg;
 
4022
    sd->num_sge = 1;
 
4023
    sd->wr.atomic.remote_addr = (uint64_t)(void *)prem;
 
4024
    if(rhandle)
 
4025
       sd->wr.atomic.rkey = rhandle->rkey; /* how do we get the remote key  */
 
4026
    sd->wr_id = DSCRID_RMW + armci_me;
 
4027
 
 
4028
    if(1){
 
4029
       printf("%d(c) : finished initialising atomic send desc id is %ld,armci_ime = %d\n",armci_me,sd->wr_id,armci_me);
 
4030
       fflush(stdout);
 
4031
    }   
 
4032
}
 
4033
/*\
 
4034
 *   using vapi remote atomic operations
 
4035
\*/
 
4036
void client_rmw_complete(struct ibv_send_wr *snd_dscr, char *from)
 
4037
{
 
4038
    int rc = 0;
 
4039
    struct ibv_wc pdscr1;
 
4040
    struct ibv_wc *pdscr=&pdscr1;
 
4041
 
 
4042
  printf("%d(c) : inside client_rmw_complete\n",armci_me);
 
4043
  do {
 
4044
      while (rc == 0) {
 
4045
        rc =  ibv_poll_cq(CLN_nic->scq, 1, pdscr);
 
4046
      }
 
4047
      dassertp(DBG_POLL|DBG_ALL,rc>=0,
 
4048
               ("%d: rc=%d id=%d status=%d\n",
 
4049
                armci_me,rc,pdscr->wr_id,pdscr->status));
 
4050
      dassert1(1,pdscr->status==IBV_WC_SUCCESS,pdscr->status);
 
4051
      rc = 0;
 
4052
    } while(pdscr->wr_id != snd_dscr->wr_id);
 
4053
}
 
4054
 
 
4055
 
 
4056
void armci_direct_rmw(int op, int*ploc, int *prem, int extra, int proc,
 
4057
                      ARMCI_MEMHDL_T *lhandle, ARMCI_MEMHDL_T *rhandle)
 
4058
{
 
4059
    int rc = 0;
 
4060
    struct ibv_send_wr *sd;
 
4061
    struct ibv_sge *sg;
 
4062
    vapi_nic_t *nic;
 
4063
    armci_connect_t *con;
 
4064
 
 
4065
    nic = SRV_nic;
 
4066
    con = CLN_con+proc;
 
4067
 
 
4068
    sd = &(rmw[armci_me].rmw_dscr);
 
4069
    sg = &(rmw[armci_me].rmw_entry);
 
4070
 
 
4071
    if (1) {
 
4072
        printf("%d(c) : about to call armci_init_vapibuf_atomic\n",armci_me);
 
4073
        fflush(stdout);
 
4074
    }
 
4075
 
 
4076
  armci_init_vapibuf_atomic(sd, sg, op,ploc,prem,extra,proc,lhandle,rhandle);
 
4077
 
 
4078
  if (1) {
 
4079
     printf("%d(c) : finished armci_init_vapibuf_atomic\n",armci_me);
 
4080
     fflush(stdout);
 
4081
  }
 
4082
 
 
4083
  struct ibv_send_wr * bad_wr;
 
4084
  rc = ibv_post_send(con->qp, sd, &bad_wr);
 
4085
  dassert1(1,rc==0,rc);
 
4086
 
 
4087
  if (1) {
 
4088
     printf("%d(c) : finished posting desc\n",armci_me);
 
4089
     fflush(stdout);
 
4090
  }
 
4091
 
 
4092
  /*armci_send_complete(sd,"send_remote_atomic");*/
 
4093
  client_rmw_complete(sd,"send_remote_atomic");
 
4094
 
 
4095
  return;
 
4096
}
 
4097
 
 
4098