5
/* $Id: request.c,v 1.74.2.11 2007-10-18 06:09:37 d3h325 Exp $ */
19
# define MARK_ENTER(func_) { fprintf(stdout, "ENTERING %s\n", func_); fflush(stdout); }
20
# define MARK_EXIT(func_) { fprintf(stdout, "EXITING %s\n", func_); fflush(stdout); }
22
# define MARK_ENTER(func_)
23
# define MARK_EXIT(func_)
27
# define PRNDBG3(m,a1,a2,a3) \
28
fprintf(stderr,"DBG %d: " m,armci_me,a1,a2,a3);fflush(stderr)
29
# define PRNDBG(m) PRNDBG3(m,0,0,0)
30
# define PRNDBG1(m,a1) PRNDBG3(m,a1,0,0)
31
# define PRNDBG2(m,a1,a2) PRNDBG3(m,a1,a2,0)
34
# define PRNDBG1(m,a1)
35
# define PRNDBG2(m,a1,a2)
36
# define PRNDBG3(m,a1,a2,a3)
40
#if !defined(GM) && !defined(VIA) && !defined(LAPI) &&!defined(VAPI)
41
double _armci_rcv_buf[MSG_BUFLEN_DBL];
42
double _armci_snd_buf[MSG_BUFLEN_DBL];
43
char* MessageSndBuffer = (char*)_armci_snd_buf;
44
char* MessageRcvBuffer = (char*)_armci_rcv_buf;
49
#define ADDBUF(buf,type,val) *(type*)(buf) = (val); (buf) += sizeof(type)
50
#define GETBUF(buf,type,var) (var) = *(type*)(buf); (buf) += sizeof(type)
52
#define ALLIGN8(buf){size_t _adr=(size_t)(buf); \
53
_adr>>=3; _adr<<=3; _adr+=8; (buf) = (char*)_adr; }
62
/*******************Routines to handle completion descriptor******************/
64
*Following the the routines to fill a completion descriptor, if necessary
65
*copy the data to destination based on completion descriptor
66
*NOTE, THE FOLLOWING ROUTINES ARE FOR CLIENTS ONLY
70
/*\Routine to complete a vector request, data is in buf and descriptor in dscr
72
extern int armci_direct_vector_get(request_header_t *msginfo , armci_giov_t darr[], int len, int proc);
73
static void armci_complete_vector_get(armci_giov_t darr[],int len,void *buf)
76
request_header_t *msginfo = (request_header_t*) buf;
78
#if defined(USE_SOCKET_VECTOR_API)
79
armci_direct_vector_get(msginfo, darr, len, proc);
81
armci_rcv_vector_data(proc, msginfo, darr, len);
83
FREE_SEND_BUFFER(buf);
91
/*\ Routine called from buffers.c to complete a request for which the buffer was
92
* used for, so that the buffer can be reused.
94
void armci_complete_req_buf(BUF_INFO_T *info, void *buffer)
96
request_header_t *msginfo = (request_header_t*) buffer;
97
ARMCI_PR_DBG("enter",0);
98
if(info->protocol==0)return;
99
else if(info->protocol==SDSCR_IN_PLACE){
100
char *dscr = info->dscr;
103
int *loc_stride_arr,*count;
105
loc_ptr = *(void**)dscr; dscr += sizeof(void*);
106
stride_levels = *(int*)dscr; dscr += sizeof(int);
107
loc_stride_arr = (int*)dscr; dscr += stride_levels*sizeof(int);
111
printf("\n%d:extracted loc_ptr=%p, stridelevels=%d\n",armci_me,
112
loc_ptr,stride_levels);
116
armci_rcv_strided_data(msginfo->to, msginfo, msginfo->datalen, loc_ptr,
117
stride_levels,loc_stride_arr,count);
118
FREE_SEND_BUFFER(msginfo);
120
else if(info->protocol==VDSCR_IN_PLACE || info->protocol==VDSCR_IN_PTR){
123
if(info->protocol==VDSCR_IN_PLACE){
125
//printf("\n%d:vdscr in place\n",armci_me);
128
dscr = info->ptr.dscrbuf;
129
//printf("\n%d:vdscr in buf\n",armci_me);
131
GETBUF(dscr, long ,len);
134
darr = (armci_giov_t *)malloc(sizeof(armci_giov_t)*len);
135
if(!darr)armci_die("malloc in complete_req_buf failed",len);
136
for(i = 0; i< len; i++){
138
GETBUF(dscr, int, parlen);
139
GETBUF(dscr, int, bytes);
140
darr[i].ptr_array_len = parlen;
141
darr[i].bytes = bytes;
142
if(msginfo->operation==GET)darr[i].dst_ptr_array=(void **)dscr;
143
else darr[i].src_ptr_array=(void **)dscr;
144
dscr+=sizeof(void *)*parlen;
146
if (msginfo->operation==GET) armci_complete_vector_get(darr,len,buffer);
150
armci_die("armci_complete_req_buf,protocol val invalid",info->protocol);
151
ARMCI_PR_DBG("exit",0);
154
extern long x_net_offset(void *,int);
155
/*\ save a part of strided descriptor needed to complete request
157
rmo: it seems as if save_
160
void armci_save_strided_dscr(char **bptr, void *rem_ptr,int rem_stride_arr[],
161
int count[], int stride_levels,int is_nb,int proc)
165
BUF_INFO_T *info=NULL;
166
long network_offset,tmpoffset;
167
ARMCI_PR_DBG("enter",0);
169
# ifdef PORTALS_UNRESOLVED
171
network_offset=x_net_offset(rem_ptr,proc);
172
if(DEBUG_){printf("\n%d:rem_ptr=%p offset=%d newrem=%p",armci_me,rem_ptr,network_offset,(char *)rem_ptr+network_offset);fflush(stdout);}
173
rem_ptr = (char *)rem_ptr+network_offset;
178
info=BUF_TO_BUFINFO(*bptr);
179
bufptr = (info->dscr);
181
*(void**)bufptr = rem_ptr; bufptr += sizeof(void*);
182
*(int*)bufptr = stride_levels; bufptr += sizeof(int);
183
for(i=0;i<stride_levels;i++)((int*)bufptr)[i] = rem_stride_arr[i];
184
bufptr += stride_levels*sizeof(int);
185
for(i=0;i< stride_levels+1;i++)((int*)bufptr)[i] = count[i];
186
bufptr += (1+stride_levels)*sizeof(int);
187
if((0 || DEBUG_) && is_nb){
188
bufptr = (info->dscr);
190
printf("\n%d:rem_ptr %p=%p stride_levels %d=%d\n",armci_me,
191
*(void**)bufptr,rem_ptr,
192
*(int*)(bufptr + sizeof(void*)),stride_levels);
194
/*remote_strided expects the pointer to point to the end of descr hence..*/
196
info->protocol=SDSCR_IN_PLACE;
199
ARMCI_PR_DBG("exit",0);
204
/*\ save a part of vector descriptor needed to complete request
206
void armci_save_vector_dscr(char **bptr,armci_giov_t darr[],int len,
207
int op,int is_nb, int proc)
209
int i,size=sizeof(int);
211
char *buf,*bufptr=*bptr;
214
ARMCI_PR_DBG("enter",0);
217
size+=(2*sizeof(int)+darr[i].ptr_array_len * sizeof(void*));
219
info=BUF_TO_BUFINFO(bufptr);
220
/*if descr fits in available buffer, use it else do malloc */
223
info->protocol=VDSCR_IN_PLACE;
226
info->ptr.dscrbuf = (void *)malloc(size);
227
buf = (char *)info->ptr.dscrbuf;
228
info->protocol=VDSCR_IN_PTR;
234
ADDBUF(buf,long,len); /* number of sets */
237
ADDBUF(buf,int,darr[i].ptr_array_len); /* number of elements */
238
ADDBUF(buf,int,darr[i].bytes); /* sizeof element */
241
rem_ptr = darr[i].dst_ptr_array;
244
# ifdef PORTALS_UNRESOLVED
245
for(j=0;j<darr[i].ptr_array_len;j++){
246
offst=x_net_offset(darr[i].src_ptr_array[j],proc);
247
darr[i].src_ptr_array[j]= (char*)darr[i].src_ptr_array[j]+offst;
250
rem_ptr = darr[i].src_ptr_array;
254
# ifdef PORTALS_UNRESOLVED
255
for(j=0;j<darr[i].ptr_array_len;j++){
256
offst=x_net_offset(darr[i].dst_ptr_array[j],proc);
257
darr[i].dst_ptr_array[j]= (char*)darr[i].dst_ptr_array[j]+offst;
260
rem_ptr = darr[i].dst_ptr_array;
262
armci_copy(rem_ptr,buf, darr[i].ptr_array_len * sizeof(void*));
263
buf += darr[i].ptr_array_len*sizeof(void*);
267
ARMCI_PR_DBG("exit",0);
271
* If buf==null, set handle->bufid to val, else set it to the id of the buf
273
void armci_set_nbhandle_bufid(armci_ihdl_t nb_handle,char *buf,int val)
277
info = BUF_TO_BUFINFO(buf);
280
nb_handle->bufid = val;
283
/**************End--Routines to handle completion descriptor******************/
286
/*\ send request to server to LOCK MUTEX
288
void armci_rem_lock(int mutex, int proc, int *ticket)
290
request_header_t *msginfo;
292
int bufsize = sizeof(request_header_t)+sizeof(int);
294
msginfo = (request_header_t*)GET_SEND_BUFFER(bufsize,LOCK,proc);
295
bzero(msginfo,sizeof(request_header_t));
297
msginfo->datalen = sizeof(int);
298
msginfo->dscrlen = 0;
299
msginfo->from = armci_me;
301
msginfo->operation = LOCK;
302
msginfo->format = mutex;
303
msginfo->bytes = msginfo->datalen + msginfo->dscrlen;
305
ibuf = (int*)(msginfo+1);
308
armci_send_req(proc, msginfo, bufsize, 0);
310
/* receive ticket from server */
311
*ticket = *(int*)armci_rcv_data(proc,msginfo,0);
312
FREE_SEND_BUFFER(msginfo);
314
if(DEBUG_)fprintf(stderr,"%d receiving ticket %d\n",armci_me, *ticket);
320
void armci_server_lock(request_header_t *msginfo)
322
int *ibuf = (int*)(msginfo+1);
323
int proc = msginfo->from;
326
ARMCI_PR_DBG("enter",0);
330
/* acquire lock on behalf of requesting process */
331
ticket = armci_server_lock_mutex(mutex, proc, msginfo->tag);
335
msginfo->datalen = sizeof(int);
336
armci_send_data(msginfo, &ticket);
338
ARMCI_PR_DBG("exit",0);
342
/*\ send request to server to UNLOCK MUTEX
344
void armci_rem_unlock(int mutex, int proc, int ticket)
346
request_header_t *msginfo;
348
int bufsize = sizeof(request_header_t)+sizeof(ticket);
350
msginfo = (request_header_t*)GET_SEND_BUFFER(bufsize,UNLOCK,proc);
351
bzero(msginfo,sizeof(request_header_t));
353
msginfo->dscrlen = msginfo->bytes = sizeof(ticket);
354
msginfo->datalen = 0;
355
msginfo->from = armci_me;
357
msginfo->operation = UNLOCK;
358
msginfo->format = mutex;
359
ibuf = (int*)(msginfo+1);
362
if(DEBUG_)fprintf(stderr,"%d sending unlock\n",armci_me);
363
armci_send_req(proc, msginfo, bufsize,0);
368
/*\ server unlocks mutex and passes lock to the next waiting process
370
void armci_server_unlock(request_header_t *msginfo, char* dscr)
372
int ticket = *(int*)dscr;
373
int mutex = msginfo->format;
374
int proc = msginfo->to;
377
waiting = armci_server_unlock_mutex(mutex,proc,ticket,&msginfo->tag);
379
if(waiting >-1){ /* -1 means that nobody is waiting */
382
/* pass ticket to the waiting process */
383
msginfo->from = waiting;
384
msginfo->datalen = sizeof(ticket);
385
armci_send_data(msginfo, &ticket);
390
void armci_unlock_waiting_process(msg_tag_t tag, int proc, int ticket)
392
request_header_t header;
393
request_header_t *msginfo = &header;
395
msginfo->datalen = sizeof(int);
397
msginfo->from = proc;
398
msginfo->to = armci_me;
399
armci_send_data(msginfo, &ticket);
402
void * armci_server_ptr(int id){
404
int bufsize = sizeof(int);
405
request_header_t *msginfo = (request_header_t*)GET_SEND_BUFFER(bufsize,ATTACH,armci_me);
406
bzero(msginfo,sizeof(request_header_t));
407
msginfo->from = armci_me;
408
msginfo->to = SERVER_NODE(armci_clus_me);
409
msginfo->dscrlen = 0;
410
msginfo->datalen = sizeof(int);
411
msginfo->operation = ATTACH;
412
msginfo->bytes = msginfo->dscrlen+ msginfo->datalen;
413
armci_copy(&id, msginfo +1, sizeof(int));
415
printf("\n%d:attach req:sending id %d \n",armci_me,id);fflush(stdout);
417
armci_send_req(armci_master, msginfo, bufsize,0);
418
buf= armci_rcv_data(armci_master,msginfo,sizeof(void *));/* receive response */
420
printf("\n%d:attach req:got %p \n",armci_me,buf);fflush(stdout);
422
FREE_SEND_BUFFER(msginfo);
423
ARMCI_PR_DBG("exit",0);
428
/*\ control message to the server, e.g.: ATTACH to shmem, return ptr etc.
430
void armci_serv_attach_req(void *info, int ilen, long size, void* resp,int rlen)
433
ARMCI_PR_DBG("enter",0);
434
int bufsize = 2*sizeof(request_header_t)+ilen + sizeof(long)+sizeof(rlen);
435
long *idlist=(long *)info;
436
request_header_t *msginfo = (request_header_t*)GET_SEND_BUFFER(bufsize,ATTACH,armci_me);
437
bzero(msginfo,sizeof(request_header_t));
439
msginfo->from = armci_me;
440
msginfo->to = SERVER_NODE(armci_clus_me);
441
msginfo->dscrlen = ilen;
442
msginfo->datalen = sizeof(long)+sizeof(int);
443
msginfo->operation = ATTACH;
444
msginfo->bytes = msginfo->dscrlen+ msginfo->datalen;
446
armci_copy(info, msginfo +1, ilen);
447
if(DEBUG_MEM){printf("\n%d:sending idlist+1 %d, size %d, idlist[0] %d, idlist[1] %d\n",armci_me,idlist+1,size,idlist[0],idlist[1]);}
448
buf = ((char*)msginfo) + ilen + sizeof(request_header_t);
450
*(int*)(buf+ sizeof(long)) = rlen;
451
armci_send_req(armci_master, msginfo, bufsize,0);
453
buf= armci_rcv_data(armci_master, msginfo,rlen); /* receive response */
454
bcopy(buf, resp, rlen);
455
FREE_SEND_BUFFER(msginfo);
457
if(DEBUG_MEM){printf("%d:client attaching got ptr=%p %d bytes\n",armci_me,buf,rlen);
461
ARMCI_PR_DBG("exit",0);
465
/*\ server initializes its copy of the memory lock data structures
467
static void server_alloc_memlock(void *ptr_myclus)
471
/* for protection, set pointers for processes outside local node NULL */
472
memlock_table_array = calloc(armci_nproc,sizeof(void*));
473
if(!memlock_table_array) armci_die("malloc failed for ARMCI lock array",0);
475
/* set pointers for processes on local cluster node
476
* ptr_myclus - corresponds to the master process
478
for(i=0; i< armci_clus_info[armci_clus_me].nslave; i++){
479
memlock_table_array[armci_master +i] = ((char*)ptr_myclus)
480
+ MAX_SLOTS*sizeof(memlock_t)*i;
483
/* set pointer to the use flag */
484
#ifdef MEMLOCK_SHMEM_FLAG
485
armci_use_memlock_table = (int*) (MAX_SLOTS*sizeof(memlock_t) +
486
(char*) memlock_table_array[armci_clus_last]);
489
fprintf(stderr,"server initialized memlock %p\n",armci_use_memlock_table);
494
static int allocate_memlock=1;
496
/*\ server actions triggered by client request to ATTACH
498
void armci_server_ipc(request_header_t* msginfo, void* descr,
499
void* buffer, int buflen)
502
long *idlist = (long*)descr;
503
long size = *(long*)buffer;
504
int rlen = *(int*)(sizeof(long)+(char*)buffer);
505
extern int **_armci_int_mutexes;
506
ARMCI_PR_DBG("enter",0);
507
if(size<0) armci_die("armci_server_ipc: size<0",(int)size);
508
if(DEBUG_MEM)printf("\n%d:got idlist+1 %p, size %d, idlist[0] %d, idlist[1] %d",armci_me,idlist+1,size,idlist[0],idlist[1]);
509
ptr=(double*)Attach_Shared_Region(idlist+1,size,idlist[0]);
510
if(!ptr)armci_die("armci_server_ipc: failed to attach",0);
511
/* provide data server with access to the memory lock data structures */
512
if(allocate_memlock){
513
allocate_memlock = 0;
514
server_alloc_memlock(ptr);
516
if(_armci_int_mutexes==NULL){
517
printf("unresolved portals external\n");
519
# ifdef PORTALS_UNRESOLVED
520
extern int _armci_server_mutex_ready;
521
extern void *_armci_server_mutex_ptr;
522
if(_armci_server_mutex_ready){
523
_armci_int_mutexes=(int **)_armci_server_mutex_ptr;
527
if(size>0)armci_set_mem_offset(ptr);
529
if(msginfo->datalen != sizeof(long)+sizeof(int))
530
armci_die("armci_server_ipc: bad msginfo->datalen ",msginfo->datalen);
532
if(rlen==sizeof(ptr)){
533
msginfo->datalen = rlen;
534
armci_send_data(msginfo, &ptr);
536
else armci_die("armci_server_ipc: bad rlen",rlen);
537
ARMCI_PR_DBG("exit",0);
541
/*\ send RMW request to server
543
void armci_rem_rmw(int op, void *ploc, void *prem, int extra, int proc)
545
request_header_t *msginfo;
548
int bufsize = sizeof(request_header_t)+sizeof(long)+sizeof(void*);
551
ARMCI_PR_DBG("enter",0);
552
msginfo = (request_header_t*)GET_SEND_BUFFER(bufsize,op,proc);
553
bzero(msginfo,sizeof(request_header_t));
555
msginfo->dscrlen = sizeof(void*);
556
msginfo->from = armci_me;
558
msginfo->operation = op;
559
msginfo->datalen = sizeof(long);
560
# ifdef PORTALS_UNRESOLVED
561
offst=x_net_offset(prem,proc);
562
prem = ((char *)prem+offst);
564
buf = (char*)(msginfo+1);
565
ADDBUF(buf, void*, prem); /* pointer is shipped as descriptor */
567
/* data field: extra argument in fetch&add and local value in swap */
569
ADDBUF(buf, int, *((int*)ploc));
570
}else if(op==ARMCI_SWAP_LONG) {
571
ADDBUF(buf, long, *((long*)ploc) );
572
msginfo->datalen = sizeof(long);
574
ADDBUF(buf, int, extra);
577
msginfo->bytes = msginfo->datalen+msginfo->dscrlen ;
580
printf("%d sending RMW request %d to %d\n",armci_me,op,proc);
583
armci_send_req(proc, msginfo, bufsize,0);
584
buffer = armci_rcv_data(proc,msginfo,0); /* receive response */
586
if(op==ARMCI_FETCH_AND_ADD || op== ARMCI_SWAP)
587
*(int*)ploc = *(int*)buffer;
589
*(long*)ploc = *(long*)buffer;
591
FREE_SEND_BUFFER(msginfo);
592
ARMCI_PR_DBG("exit",0);
596
/*\ server response to RMW
598
void armci_server_rmw(request_header_t* msginfo,void* ptr, void* pextra)
603
int op = msginfo->operation;
605
ARMCI_PR_DBG("enter",0);
607
printf("%d server: executing RMW from %d. op=%d pextra=%p\n",armci_me,msginfo->from, op, pextra);
610
if(msginfo->datalen != sizeof(long))
611
armci_die2("armci_server_rmw: bad datalen=",msginfo->datalen,op);
613
/* for swap operations *pextra has the value to swap
614
* for fetc&add it carries the increment argument
618
iold = *(int*) pextra;
619
case ARMCI_FETCH_AND_ADD:
623
case ARMCI_SWAP_LONG:
624
lold = *(long*) pextra;
625
case ARMCI_FETCH_AND_ADD_LONG:
630
armci_die("armci_server_rmw: bad operation code=",op);
633
armci_generic_rmw(op, pold, *(int**)ptr, *(int*) pextra, msginfo->to);
635
armci_send_data(msginfo, pold);
636
ARMCI_PR_DBG("exit",0);
639
extern int armci_direct_vector_snd(request_header_t *msginfo , armci_giov_t darr[], int len, int proc);
640
extern int armci_direct_vector(request_header_t *msginfo , armci_giov_t darr[], int len, int proc);
641
int armci_rem_vector(int op, void *scale, armci_giov_t darr[],int len,int proc,int flag, armci_ihdl_t nb_handle)
644
request_header_t *msginfo;
645
int bytes =0, s, slen=0;
647
int bufsize = sizeof(request_header_t);
650
if(nb_handle)tag=nb_handle->tag;
652
/* compute size of the buffer needed */
653
for(s=0; s<len; s++){
654
bytes += darr[s].ptr_array_len * darr[s].bytes; /* data */
655
bufsize += darr[s].ptr_array_len *sizeof(void*)+2*sizeof(int); /*descr*/
658
bufsize += bytes + sizeof(long) +2*sizeof(double) +8; /*+scale+allignment*/
660
buf = buf0= GET_SEND_BUFFER(bufsize,op,proc);
661
msginfo = (request_header_t*)buf;
662
bzero(msginfo,sizeof(request_header_t));
664
/* printf("%d:: rem_vector. len=%d. ptr_len[len-1]=%d bytes[len-1]=%d bufsize=%d\n", */
665
/* armci_me, len, darr[len-1].ptr_array_len, darr[len-1].bytes,bufsize); */
666
/* fflush(stdout); */
670
/* INIT_SENDBUF_INFO(nb_handle,buf,op,proc); redundant -- see armci_rem_strided */
671
_armci_buf_set_tag(buf,nb_handle->tag,0);
672
if(nb_handle->bufid == NB_NONE)
673
armci_set_nbhandle_bufid(nb_handle,buf,0);
676
buf += sizeof(request_header_t);
678
/* fill vector descriptor */
679
armci_save_vector_dscr(&buf,darr,len,op,0,proc);
681
/* align buf for doubles (8-bytes) before copying data */
690
/* fill message header */
691
msginfo->dscrlen = buf - buf0 - sizeof(request_header_t);
692
msginfo->from = armci_me;
694
msginfo->operation = op;
695
msginfo->format = VECTOR;
696
msginfo->datalen = bytes;
698
/* put scale for accumulate */
701
*(int*)buf = *(int*)scale; slen= sizeof(int); break;
703
((double*)buf)[0] = ((double*)scale)[0];
704
((double*)buf)[1] = ((double*)scale)[1];
705
slen=2*sizeof(double);break;
707
*(double*)buf = *(double*)scale; slen = sizeof(double); break;
709
((float*)buf)[0] = ((float*)scale)[0];
710
((float*)buf)[1] = ((float*)scale)[1];
711
slen=2*sizeof(float);break;
713
*(float*)buf = *(float*)scale; slen = sizeof(float); break;
717
msginfo->datalen += slen;
718
msginfo->bytes = msginfo->datalen+msginfo->dscrlen;
721
/* for put and accumulate copy data into buffer */
723
/* fprintf(stderr,"sending %lf\n",*(double*)darr[0].src_ptr_array[0]);*/
724
armci_vector_to_buf(darr, len, buf);
727
armci_send_req(proc, msginfo, bufsize,tag);
728
/*x_buf_send_complete(buf0);*/
730
if(nb_handle && op==GET) armci_save_vector_dscr(&buf0,darr,len,op,1,proc);
731
if(op == GET&& !nb_handle){
732
armci_complete_vector_get(darr,len,msginfo);
738
#define CHUN_ (8*8096)
741
/*\ client version of remote strided operation
743
int armci_rem_strided(int op, void* scale, int proc,
744
void *src_ptr, int src_stride_arr[],
745
void* dst_ptr, int dst_stride_arr[],
746
int count[], int stride_levels,
747
ext_header_t *h, int flag,armci_ihdl_t nb_handle)
750
request_header_t *msginfo;
751
int i, slen=0, bytes;
754
int bufsize = sizeof(request_header_t);
759
/* we send ext header only for last chunk */
761
if(h) ehlen = h->len;
763
if(h) if(h->last) ehlen = h->len;
765
if(ehlen>MAX_EHLEN || ehlen <0)
766
armci_die2("armci_rem_strided ehlen out of range",MAX_EHLEN,ehlen);
767
/* calculate size of the buffer needed */
768
for(i=0, bytes=1;i<=stride_levels;i++)bytes*=count[i];
769
bufsize += bytes+sizeof(void*)+2*sizeof(int)*(stride_levels+1) +ehlen
770
+2*sizeof(double) + 16; /* +scale+alignment */
773
if(op==GET)bufsize -=bytes;
776
buf = buf0= GET_SEND_BUFFER((bufsize),op,proc);
777
msginfo = (request_header_t*)buf;
778
bzero(msginfo,sizeof(request_header_t));
786
// printf("%s: non-blocking ops not yet supported\n",Portals_ID());
788
/* INIT_SENDBUF_INFO(nb_handle,buf,op,proc); same as _armci_buf_set_tag, why here? */
789
_armci_buf_set_tag(buf,nb_handle->tag,0);
790
if(nb_handle->bufid == NB_NONE)
791
armci_set_nbhandle_bufid(nb_handle,buf,0);
792
tag = nb_handle->tag;
797
rem_stride_arr = src_stride_arr;
800
rem_stride_arr = dst_stride_arr;
803
msginfo->datalen=bytes;
805
/* fill strided descriptor */
806
buf += sizeof(request_header_t);
807
/*this function fills the dscr into buf and also moves the buf ptr to the
809
armci_save_strided_dscr(&buf,rem_ptr,rem_stride_arr,count,stride_levels,0,proc);
811
/* align buf for doubles (8-bytes) before copying data */
814
/* fill message header */
815
msginfo->from = armci_me;
817
msginfo->format = STRIDED;
818
msginfo->operation = op;
820
/* put scale for accumulate */
823
*(int*)buf = *(int*)scale; slen= sizeof(int); break;
825
((double*)buf)[0] = ((double*)scale)[0];
826
((double*)buf)[1] = ((double*)scale)[1];
827
slen=2*sizeof(double);break;
829
*(double*)buf = *(double*)scale; slen = sizeof(double); break;
831
((float*)buf)[0] = ((float*)scale)[0];
832
((float*)buf)[1] = ((float*)scale)[1];
833
slen=2*sizeof(float);break;
835
*(float*)buf = *(float*)scale; slen = sizeof(float); break;
837
*(long*)buf = *(long*)scale; slen = sizeof(long); break;
842
if(ARMCI_ACC(op))printf("%d client len=%d alpha=%lf data=%lf,%lf\n",
843
armci_me, buf-(char*)msginfo,((double*)buf)[0],*((double*)src_ptr), ((double*)buf)[1]);
848
/**** add extended header *******/
850
bcopy(h->exthdr,buf,ehlen);
851
i = ehlen%8; ehlen += (8-i); /* make sure buffer is still alligned */
855
msginfo->ehlen = ehlen;
856
msginfo->dscrlen = buf - buf0 - sizeof(request_header_t);
857
msginfo->bytes = msginfo->datalen+msginfo->dscrlen;
862
printf("%s rem_strided: nb gets not yet available\n",Portals_ID());
866
armci_send_req(proc, msginfo, bufsize,tag);
867
armci_save_strided_dscr(&buf0,dst_ptr,dst_stride_arr,count,
868
stride_levels,1,proc);
870
# ifdef PORTALS_ALLOW_NBGETS
873
armci_rcv_strided_data(proc, msginfo, msginfo->datalen,
874
dst_ptr, stride_levels, dst_stride_arr, count);
875
FREE_SEND_BUFFER(msginfo);
876
# ifdef PORTALS_ALLOW_NBGETS
880
/* for put and accumulate send data */
881
armci_send_strided(proc,msginfo, buf,
882
src_ptr, stride_levels, src_stride_arr, count,tag);
889
void armci_process_extheader(request_header_t *msginfo, char *dscr, char* buf, int buflen)
894
h = (armci_flag_t*)(dscr + msginfo->dscrlen - msginfo->ehlen);
896
if(msginfo->ehlen)printf("%d:server from=%d len=%d: ptr=%p val=%d\n",armci_me,msginfo->from, msginfo->ehlen,h->ptr,h->val);
899
flag = (int*)(h->ptr);
903
void armci_server(request_header_t *msginfo, char *dscr, char* buf, int buflen)
905
int buf_stride_arr[MAX_STRIDE_LEVEL+1];
906
int *loc_stride_arr,slen;
907
int *count, stride_levels;
908
void *buf_ptr, *loc_ptr;
910
char *dscr_save = dscr;
914
ARMCI_PR_DBG("enter",msginfo->datalen);fflush(stdout);
915
/*return if using readv/socket for put*/
916
if(msginfo->operation==PUT && msginfo->datalen==0){
917
if(msginfo->ehlen) /* process extra header if available */
918
armci_process_extheader(msginfo, dscr, buf, buflen);
922
/* unpack descriptor record */
923
loc_ptr = *(void**)dscr; dscr += sizeof(void*);
924
stride_levels = *(int*)dscr; dscr += sizeof(int);
925
loc_stride_arr = (int*)dscr; dscr += stride_levels*sizeof(int);
928
/* compute stride array for buffer */
929
buf_stride_arr[0]=count[0];
930
for(i=0; i< stride_levels; i++)
931
buf_stride_arr[i+1]= buf_stride_arr[i]*count[i+1];
933
/* get scale for accumulate, adjust buf to point to data */
934
switch(msginfo->operation){
935
case ARMCI_ACC_INT: slen = sizeof(int); break;
936
case ARMCI_ACC_DCP: slen = 2*sizeof(double); break;
937
case ARMCI_ACC_DBL: slen = sizeof(double); break;
938
case ARMCI_ACC_CPL: slen = 2*sizeof(float); break;
939
case ARMCI_ACC_FLT: slen = sizeof(float); break;
940
case ARMCI_ACC_LNG: slen = sizeof(long); break;
944
scale = dscr_save+ (msginfo->dscrlen - slen -msginfo->ehlen);
946
if(ARMCI_ACC(msginfo->operation))
947
fprintf(stderr,"%d in server len=%d slen=%d alpha=%lf data=%lf\n",
948
armci_me, msginfo->dscrlen, slen, *(double*)scale,*(double*)buf);
951
buf_ptr = buf; /* data in buffer */
955
if(msginfo->operation == GET){
956
armci_send_strided_data(proc, msginfo, buf,
957
loc_ptr, stride_levels, loc_stride_arr, count);
958
/* fprintf(stderr, "GET response sent with tag: %d\n, msginfo->tag",
961
if((rc = armci_op_strided(msginfo->operation, scale, proc,
962
buf_ptr, buf_stride_arr, loc_ptr, loc_stride_arr,
963
count, stride_levels, 1,NULL)))
964
armci_die("server_strided: op from buf failed",rc);
967
if(msginfo->ehlen) /* process extra header if available */
968
armci_process_extheader(msginfo, dscr_save, buf, buflen);
969
ARMCI_PR_DBG("exit",0);
973
void armci_server_vector( request_header_t *msginfo,
974
char *dscr, char* buf, int buflen)
981
if(msginfo->operation==PUT && msginfo->datalen==0)return;/*return if using readv/socket for put*/
982
/* unpack descriptor record */
983
GETBUF(dscr, long ,len);
985
/* get scale for accumulate, adjust buf to point to data */
987
switch(msginfo->operation){
988
case ARMCI_ACC_INT: buf += sizeof(int); break;
989
case ARMCI_ACC_DCP: buf += 2*sizeof(double); break;
990
case ARMCI_ACC_DBL: buf += sizeof(double); break;
991
case ARMCI_ACC_CPL: buf += 2*sizeof(float); break;
992
case ARMCI_ACC_FLT: buf += sizeof(float); break;
997
/*fprintf(stderr,"scale=%lf\n",*(double*)scale);*/
998
/* execute the operation */
1000
switch(msginfo->operation) {
1002
/* fprintf(stderr, "%d:: Got a vector message!!\n", armci_me); */
1003
if(msginfo->ehlen) {
1004
armci_die("Unexpected vector message with non-zero ehlen. GPC call?",
1008
for(i = 0; i< len; i++){
1011
GETBUF(dscr, int, parlen);
1012
GETBUF(dscr, int, bytes);
1013
/* fprintf(stderr,"len=%d bytes=%d parlen=%d\n",len,bytes,parlen);*/
1014
ptr = (void**)dscr; dscr += parlen*sizeof(char*);
1015
for(s=0; s< parlen; s++){
1016
armci_copy(ptr[s], buf, bytes);
1020
/* fprintf(stderr,"%d:: VECTOR GET. server sending buffer %p datalen=%d\n",armci_me, sbuf, msginfo->datalen); */
1021
armci_send_data(msginfo, sbuf);
1027
/* fprintf(stderr,"received in buffer %lf\n",*(double*)buf);*/
1028
for(i = 0; i< len; i++){
1031
GETBUF(dscr, int, parlen);
1032
GETBUF(dscr, int, bytes);
1033
ptr = (void**)dscr; dscr += parlen*sizeof(char*);
1034
for(s=0; s< parlen; s++){
1036
armci_copy(buf, ptr[s], bytes);
1038
bcopy(buf, ptr[s], (size_t)bytes);
1046
/* this should be accumulate */
1047
if(!ARMCI_ACC(msginfo->operation))
1048
armci_die("v server: wrong op code",msginfo->operation);
1050
/* fprintf(stderr,"received first=%lf last =%lf in buffer\n",*/
1051
/* *((double*)buf),((double*)buf)[99]);*/
1053
for(i = 0; i< len; i++){
1056
GETBUF(dscr, int, parlen);
1057
GETBUF(dscr, int, bytes);
1058
ptr = (void**)dscr; dscr += parlen*sizeof(char*);
1059
armci_lockmem_scatter(ptr, parlen, bytes, proc);
1060
for(s=0; s< parlen; s++){
1061
armci_acc_2D(msginfo->operation, scale, proc, buf, ptr[s],
1062
bytes, 1, bytes, bytes, 0);
1065
ARMCI_UNLOCKMEM(proc);