5
/* $Id: dataserv.c,v 1.30.8.5 2007-07-02 05:18:13 d3p687 Exp $ */
21
#define USE_VECTOR_FORMAT_ 1
23
active_socks_t *_armci_active_socks;
25
extern int AR_ready_sigchld;
30
char *msg="hello from server";
31
static int *readylist=(int*)0;
33
#define GETBUF(buf,type,var) (var) = *(type*)(buf); (buf) += sizeof(type)
35
#if defined(USE_SOCKET_VECTOR_API)
36
int armci_RecvVectorFromSocket(int sock,armci_giov_t darr[], int len,
38
int i,j=0,k,num_xmit=0,lastiovlength,iovlength,n=0,max_iovec,totalsize=0;
39
int totaliovs=0,dim1=0,dim2=0;
40
struct iovec *saveiov=iov;
41
max_iovec = MAX_IOVEC;
44
totaliovs+=darr[i].ptr_array_len;
45
num_xmit = totaliovs/max_iovec;
46
lastiovlength = totaliovs%max_iovec;
47
if(num_xmit == 0) num_xmit = 1;
48
else if(lastiovlength!=0)num_xmit++;
49
dim2=darr[dim1].ptr_array_len;
50
for(k=0;k<num_xmit;k++){
51
if(lastiovlength!=0 && k==(num_xmit-1))iovlength=lastiovlength;
52
else iovlength=max_iovec;
54
for(j=0;j<iovlength;j++){
55
if(dim2==0){dim1+=1;dim2=darr[dim1].ptr_array_len;}
56
iov[j].iov_base=darr[dim1].dst_ptr_array[darr[dim1].ptr_array_len-dim2];
57
iov[j].iov_len = darr[dim1].bytes;totalsize+=iov[j].iov_len;
60
n+=armci_ReadVFromSocket(sock,iov,j,totalsize);
62
printf("\n%d:armci_RecvVectorFromSocket recved iovlength=%d totalsize=%d n=%d",armci_me,iovlength,totalsize,n);
71
int armci_SendVectorToSocket(int sock,armci_giov_t darr[], int len,
73
int i,j=0,k,num_xmit=0,lastiovlength,iovlength,n=0,max_iovec,totalsize=0;
74
int totaliovs=0,dim1=0,dim2=0;
75
struct iovec *saveiov=iov;
76
max_iovec = MAX_IOVEC;
78
totaliovs+=darr[i].ptr_array_len;
79
num_xmit = totaliovs/max_iovec;
80
lastiovlength = totaliovs%max_iovec;
81
if(num_xmit == 0) num_xmit = 1;
82
else if(lastiovlength!=0)num_xmit++;
83
dim2=darr[dim1].ptr_array_len;
84
for(k=0;k<num_xmit;k++){
85
if(lastiovlength!=0 && k==(num_xmit-1))iovlength=lastiovlength;
86
else iovlength=max_iovec;
88
for(j=0;j<iovlength;j++){
89
if(dim2==0){dim1++;dim2=darr[dim1].ptr_array_len;}
90
iov[j].iov_base=darr[dim1].src_ptr_array[darr[dim1].ptr_array_len-dim2];
91
iov[j].iov_len = darr[dim1].bytes;totalsize+=iov[j].iov_len;
94
n+=armci_WriteVToSocket(sock,iov,j,totalsize);
96
printf("\n%d:armci_SendVectorToSocket done iovlen=%d totalsiz=%d n=%d",
97
armci_me,iovlength,totalsize,n);
106
int armci_RecvStridedFromSocket(int sock,void *dst_ptr, int dst_stride_arr[],
107
int count[],int stride_levels,struct iovec *iov){
109
char *dst=(char*)dst_ptr;
111
int i,j,k,num_xmit=0,lastiovlength,iovlength,n=0,max_iovec,totalsize=0,vecind;
113
int index[MAX_STRIDE_LEVEL], unit[MAX_STRIDE_LEVEL];
115
max_iovec = MAX_IOVEC;
117
printf("\nin readv count[0] is %d and strarr[0] is%d\n",count[0],
121
index[2] = 0; unit[2] = 1;
123
total_of_2D = count[2];
124
for(j=3; j<=stride_levels; j++) {
125
index[j] = 0; unit[j] = unit[j-1] * count[j-1];
126
total_of_2D *= count[j];
130
num_xmit = (total_of_2D*count[1])/max_iovec;
131
lastiovlength = (total_of_2D*count[1])%max_iovec;
132
if(num_xmit == 0) num_xmit = 1;
133
else if(lastiovlength!=0)num_xmit++;
136
if(lastiovlength!=0 && k==(num_xmit-1))iovlength=lastiovlength;
137
else iovlength=max_iovec;
139
for(i=0; i<total_of_2D; i++) {
140
dst = (char *)dst_ptr;
141
for(j=2; j<=stride_levels; j++) {
142
dst += index[j] * dst_stride_arr[j-1];
143
if(((i+1) % unit[j]) == 0) index[j]++;
144
if(index[j] >= count[j]) index[j] = 0;
147
for(j=0;j<count[1];j++,vecind++){
148
if(vecind==iovlength){
149
n+=armci_ReadVFromSocket(sock,iov,iovlength,totalsize);
150
vecind = 0; totalsize=0; k++;
151
if(lastiovlength!=0 && k==(num_xmit-1))iovlength=lastiovlength;
152
else iovlength=max_iovec;
154
iov[vecind].iov_base = dst1;
155
iov[vecind].iov_len = count[0];totalsize+=count[0];
156
dst1+=dst_stride_arr[0];
158
if(vecind==iovlength){
159
n+=armci_ReadVFromSocket(sock,iov,iovlength,totalsize);
160
vecind = 0; totalsize=0; k++;
161
if(lastiovlength!=0 && k==(num_xmit-1))iovlength=lastiovlength;
162
else iovlength=max_iovec;
165
printf("\n%d:armci_RecvStridedFromSocket iovlen=%d totalsize=%d n=%d",
166
armci_me,iovlength,totalsize,n);
174
int armci_SendStridedToSocket(int sock,void *src_ptr, int src_stride_arr[],
175
int count[], int stride_levels,struct iovec *iov){
176
char *src=(char*)src_ptr;
178
int i,j,k,num_xmit=0,lastiovlength,iovlength,n=0,max_iovec,totalsize=0,vecind;
180
int index[MAX_STRIDE_LEVEL], unit[MAX_STRIDE_LEVEL];
182
max_iovec = MAX_IOVEC;
184
printf("\nin writev count[0] is %d and strarr[0] is%d\n",count[0],
188
index[2] = 0; unit[2] = 1;
190
total_of_2D = count[2];
191
for(j=3; j<=stride_levels; j++) {
192
index[j] = 0; unit[j] = unit[j-1] * count[j-1];
193
total_of_2D *= count[j];
196
num_xmit = total_of_2D*count[1]/max_iovec;
197
lastiovlength = (total_of_2D*count[1])%max_iovec;
198
if(num_xmit == 0) num_xmit = 1;
199
else if(lastiovlength!=0)num_xmit++;
202
if(lastiovlength!=0 && k==(num_xmit-1))iovlength=lastiovlength;
203
else iovlength=max_iovec;
205
for(i=0; i<total_of_2D; i++) {
206
src = (char *)src_ptr;
207
for(j=2; j<=stride_levels; j++) {
208
src += index[j] * src_stride_arr[j-1];
209
if(((i+1) % unit[j]) == 0) index[j]++;
210
if(index[j] >= count[j]) index[j] = 0;
213
for(j=0;j<count[1];j++,vecind++){
214
if(vecind==iovlength){
215
n+=armci_WriteVToSocket(sock,iov,iovlength,totalsize);
216
vecind = 0; totalsize=0; k++;
217
if(lastiovlength!=0 && k==(num_xmit-1))iovlength=lastiovlength;
218
else iovlength=max_iovec;
220
iov[vecind].iov_base = src1;
221
iov[vecind].iov_len = count[0];totalsize+=count[0];
222
src1+=src_stride_arr[0];
224
if(vecind==iovlength){
225
n+=armci_WriteVToSocket(sock,iov,iovlength,totalsize);
226
vecind = 0; totalsize=0; k++;
227
if(lastiovlength!=0 && k==(num_xmit-1))iovlength=lastiovlength;
228
else iovlength=max_iovec;
231
printf("\n%d:armci_SendStridedToSocket iovlength=%d totalsize=%d n=%d",
232
armci_me,iovlength, totalsize,n);fflush(stdout);
239
int armci_direct_vector_snd(request_header_t *msginfo , armci_giov_t darr[],
242
int bufsize=0,bytes=0,s;
244
for(s=0; s<len; s++){
245
bytes += darr[s].ptr_array_len * darr[s].bytes;/* data */
246
bufsize += darr[s].ptr_array_len *sizeof(void*)+2*sizeof(int);/*descr*/
248
bufsize += bytes + sizeof(long) +2*sizeof(double) +8;
249
if(msginfo->operation==GET)
250
bufsize = msginfo->dscrlen+sizeof(request_header_t);
251
if(msginfo->operation==PUT){
253
msginfo->bytes=msginfo->dscrlen;
254
bufsize=msginfo->dscrlen+sizeof(request_header_t);
256
armci_send_req(proc, msginfo, bufsize);
257
if(msginfo->operation==PUT){
258
bytes=armci_SendVectorToSocket(SRV_sock[armci_clus_id(proc)],darr,len,
259
(struct iovec *)((char*)(msginfo+1)+msginfo->dscrlen) );
264
int armci_direct_vector_get(request_header_t *msginfo , armci_giov_t darr[],
267
return armci_RecvVectorFromSocket(SRV_sock[armci_clus_id(proc)],darr,len,
268
(struct iovec *)((char*)(msginfo+1)+msginfo->dscrlen) );
271
int armci_direct_vector(request_header_t *msginfo , armci_giov_t darr[],
273
int bufsize=0,bytes=0,s;
274
for(s=0; s<len; s++){
275
bytes += darr[s].ptr_array_len * darr[s].bytes;/* data */
276
bufsize += darr[s].ptr_array_len *sizeof(void*)+2*sizeof(int);/*descr*/
278
bufsize += bytes + sizeof(long) +2*sizeof(double) +8;
279
if(msginfo->operation==GET)
280
bufsize = msginfo->dscrlen+sizeof(request_header_t);
281
if(msginfo->operation==PUT){
283
msginfo->bytes=msginfo->dscrlen;
284
bufsize=msginfo->dscrlen+sizeof(request_header_t);
286
armci_send_req(proc, msginfo, bufsize);
287
if(msginfo->operation==GET){
288
bytes=armci_RecvVectorFromSocket(SRV_sock[armci_clus_id(proc)],darr,len,
289
(struct iovec *)((char*)(msginfo+1)+msginfo->dscrlen) );
291
if(msginfo->operation==PUT){
292
bytes=armci_SendVectorToSocket(SRV_sock[armci_clus_id(proc)],darr,len,
293
(struct iovec *)((char*)(msginfo+1)+msginfo->dscrlen) );
300
/*\ client sends request message to server
302
int armci_send_req_msg(int proc, void *buf, int bytes)
304
int cluster = armci_clus_id(proc);
305
request_header_t* msginfo = (request_header_t*)buf;
308
THREAD_LOCK(armci_user_threads.net_lock);
310
/* mark sockets as active (only if reply is expected?) */
311
idx = _armci_buf_to_index(msginfo);
312
_armci_active_socks->socks[idx] = SRV_sock[cluster];
314
rc = (armci_WriteToSocket(SRV_sock[cluster], buf, bytes) < 0);
316
THREAD_UNLOCK(armci_user_threads.net_lock);
322
void armci_write_strided_sock(void *ptr, int stride_levels, int stride_arr[],
326
long idx; /* index offset of current block position to ptr */
327
int n1dim; /* number of 1 dim block */
328
int bvalue[MAX_STRIDE_LEVEL], bunit[MAX_STRIDE_LEVEL];
330
/* number of n-element of the first dimension */
332
for(i=1; i<=stride_levels; i++)
335
/* calculate the destination indices */
336
bvalue[0] = 0; bvalue[1] = 0; bunit[0] = 1; bunit[1] = 1;
337
for(i=2; i<=stride_levels; i++) {
339
bunit[i] = bunit[i-1] * count[i-1];
342
for(i=0; i<n1dim; i++) {
344
for(j=1; j<=stride_levels; j++) {
345
idx += bvalue[j] * stride_arr[j-1];
346
if((i+1) % bunit[j] == 0) bvalue[j]++;
347
if(bvalue[j] > (count[j]-1)) bvalue[j] = 0;
350
/* memcpy(buf, ((char*)ptr)+idx, count[0]); */
351
/* buf += count[0]; */
352
stat = armci_WriteToSocket(fd, ((char*)ptr)+idx, count[0]);
353
if(stat<0)armci_die("armci_write_strided_sock:write failed",stat);
359
void armci_read_strided_sock(void *ptr, int stride_levels, int stride_arr[],
363
long idx; /* index offset of current block position to ptr */
364
int n1dim; /* number of 1 dim block */
365
int bvalue[MAX_STRIDE_LEVEL], bunit[MAX_STRIDE_LEVEL];
366
/* number of n-element of the first dimension */
368
for(i=1; i<=stride_levels; i++)
371
/* calculate the destination indices */
372
bvalue[0] = 0; bvalue[1] = 0; bunit[0] = 1; bunit[1] = 1;
373
for(i=2; i<=stride_levels; i++) {
375
bunit[i] = bunit[i-1] * count[i-1];
378
for(i=0; i<n1dim; i++) {
380
for(j=1; j<=stride_levels; j++) {
381
idx += bvalue[j] * stride_arr[j-1];
382
if((i+1) % bunit[j] == 0) bvalue[j]++;
383
if(bvalue[j] > (count[j]-1)) bvalue[j] = 0;
386
/* memcpy(buf, ((char*)ptr)+idx, count[0]); */
387
/* buf += count[0]; */
388
stat = armci_ReadFromSocket(fd, ((char*)ptr)+idx, count[0]);
389
if(stat<0)armci_die("armci_read_strided_sock:read failed",stat);
393
/*\ client sends strided data + request to server
395
int armci_send_req_msg_strided(int proc, request_header_t *msginfo,char *ptr,
396
int strides, int stride_arr[], int count[])
398
int cluster = armci_clus_id(proc);
402
printf("%d:armci_send_req_msg_strided: op=%d to=%d bytes= %d \n",armci_me,
403
msginfo->operation,proc,msginfo->datalen);
407
/* we write header + data descriptor */
408
bytes = sizeof(request_header_t) + msginfo->dscrlen;
410
THREAD_LOCK(armci_user_threads.net_lock);
412
stat = armci_WriteToSocket(SRV_sock[cluster], msginfo, bytes);
413
if(stat<0)armci_die("armci_send_strided:write failed",stat);
414
#if defined(USE_SOCKET_VECTOR_API)
415
if(msginfo->operation==PUT && msginfo->datalen==0)
416
armci_SendStridedToSocket( SRV_sock[cluster],ptr,stride_arr,count,
417
strides,(struct iovec *)(msginfo+1) );
420
/* for larger blocks write directly to socket thus avoiding memcopy */
421
armci_write_strided_sock(ptr, strides,stride_arr,count,SRV_sock[cluster]);
423
THREAD_UNLOCK(armci_user_threads.net_lock);
429
char *armci_ReadFromDirect(int proc, request_header_t * msginfo, int len)
431
int cluster=armci_clus_id(proc);
435
printf("%d:armci_ReadFromDirect: from %d \n",armci_me,proc);
438
stat =armci_ReadFromSocket(SRV_sock[cluster],msginfo+1,len);
439
if(stat<0)armci_die("armci_rcv_data: read failed",stat);
440
return(char*)(msginfo+1);
444
/*\ client receives strided data from server
446
void armci_ReadStridedFromDirect(int proc, request_header_t* msginfo, void *ptr,
447
int strides, int stride_arr[], int count[])
449
int cluster=armci_clus_id(proc);
452
printf("%d:armci_ReadStridedFromDirect: from %d \n",armci_me,proc);
456
#if defined(USE_SOCKET_VECTOR_API)
457
if(msginfo->operation==GET && strides > 0)
458
armci_RecvStridedFromSocket( SRV_sock[cluster],ptr,stride_arr,count,
459
strides,(struct iovec *)((char*)(msginfo+1)+msginfo->dscrlen));
463
armci_read_strided_sock(ptr, strides, stride_arr, count, SRV_sock[cluster]);
467
/*********************************** server side ***************************/
469
#if defined(USE_SOCKET_VECTOR_API)
471
void armci_tcp_read_vector_data(request_header_t *msginfo,void *vdscr,int p){
476
armci_giov_t *mydarr;
477
bytes = msginfo->dscrlen;
479
printf("\n in armci_tcp_read_vector_data reading bytes=%d infonext=%p\n",
480
bytes,(void*)(msginfo+1));fflush(stdout);
482
stat = armci_ReadFromSocket(CLN_sock[p],
483
(MessageRcvBuffer+sizeof(request_header_t)),bytes);
485
if(stat<0)armci_die("armci_tcp_read_vector_data: read of data failed",stat);
486
dscr=(MessageRcvBuffer+sizeof(request_header_t));
488
*(void**)vdscr=(void *)dscr;
489
mydarr = (armci_giov_t *)(dscr+bytes);
490
GETBUF(dscr, long ,len);
493
GETBUF(dscr, int, mydarr[i].ptr_array_len);
494
GETBUF(dscr, int, mydarr[i].bytes);
495
mydarr[i].dst_ptr_array=(void**)dscr;
496
dscr+=mydarr[i].ptr_array_len*sizeof(char*);
498
j=armci_RecvVectorFromSocket(CLN_sock[p],mydarr,len,
499
(struct iovec *)((char*)dscr+2*bytes) );
505
void armci_tcp_read_strided_data(request_header_t *msginfo,void *vdscr,int p)
510
int stride_levels, *stride_arr,*count,stat;
511
bytes = msginfo->dscrlen;
513
printf("\n in armci tcp read strided data reading bytes=%d infonext=%p\n"
514
,bytes,(void*)(msginfo+1));fflush(stdout);
516
stat = armci_ReadFromSocket(CLN_sock[p],
517
(MessageRcvBuffer+sizeof(request_header_t)),bytes);
519
if(stat<0)armci_die("armci_tcp_read_strided_data:read of data failed",stat);
520
dscr=(MessageRcvBuffer+sizeof(request_header_t));
521
*(void**)vdscr=(void *)dscr;
522
ptr = *(void**)dscr; dscr += sizeof(void*);
523
stride_levels = *(int*)dscr; dscr += sizeof(int);
524
stride_arr = (int*)dscr; dscr += stride_levels*sizeof(int);
525
count = (int*)dscr; dscr += (stride_levels+1)*sizeof(int);
526
armci_RecvStridedFromSocket( CLN_sock[p],ptr,stride_arr,count,stride_levels,
527
(struct iovec *)dscr);
529
/*armci_RecvStridedFromSocket( CLN_sock[p],ptr,stride_arr,count,
530
stride_levels,(struct iovec *)((char*)(msginfo+1)+msginfo->dscrlen) );*/
534
/*\ server receives request
536
void armci_rcv_req(void *mesg, void *phdr, void *pdescr,void *pdata,int *buflen)
538
request_header_t *msginfo = (request_header_t*)MessageRcvBuffer;
539
int hdrlen = sizeof(request_header_t);
540
int stat, p = *(int*)mesg;
543
stat =armci_ReadFromSocket(CLN_sock[p],MessageRcvBuffer,hdrlen);
544
if(stat<0) armci_die("armci_rcv_req: failed to receive header ",p);
545
*(void**)phdr = msginfo;
546
#if defined(USE_SOCKET_VECTOR_API)
547
if(msginfo->operation == PUT && msginfo->datalen==0){
548
if(msginfo->format==STRIDED)
549
armci_tcp_read_strided_data(msginfo,pdescr,p);
550
if(msginfo->format==VECTOR){
552
armci_tcp_read_vector_data(msginfo,pdescr,p);
557
*buflen = MSG_BUFLEN - hdrlen;
558
if (msginfo->operation == GET)
559
bytes = msginfo->dscrlen;
561
bytes = msginfo->bytes;
562
if(bytes >*buflen)armci_die2("armci_rcv_req: message overflowing rcv buf",
563
msginfo->bytes,*buflen);
567
stat = armci_ReadFromSocket(CLN_sock[p],msginfo+1,bytes);
568
if(stat<0)armci_die("armci_rcv_req: read of data failed",stat);
569
*(void**)pdescr = msginfo+1;
570
*(void**)pdata = msginfo->dscrlen + (char*)(msginfo+1);
571
*buflen -= msginfo->dscrlen;
573
if (msginfo->operation != GET)
574
if(msginfo->datalen)*buflen -= msginfo->datalen;
578
*(void**)pdata = msginfo+1;
579
*(void**)pdescr = NULL;
582
if(msginfo->datalen>0 && msginfo->operation != GET){
584
if(msginfo->datalen > ((int)MSG_BUFLEN) -((int)hdrlen) -msginfo->dscrlen)
585
armci_die2("armci_rcv_req:data overflowing buffer",
586
msginfo->dscrlen,msginfo->datalen);
587
*buflen -= msginfo->datalen;
592
/*\ send data back to client
594
void armci_WriteToDirect(int to, request_header_t* msginfo, void *data)
596
int stat = armci_WriteToSocket(CLN_sock[to], data, msginfo->datalen);
597
if(stat<0)armci_die("armci_WriteToDirect:write failed",stat);
601
/*\ server sends strided data back to client
603
void armci_WriteStridedToDirect(int proc, request_header_t* msginfo,
604
void *ptr, int strides, int stride_arr[], int count[])
607
printf("%d:armci_WriteStridedToDirect:from %d\n",armci_me,proc);
611
#if defined(USE_SOCKET_VECTOR_API)
612
if(msginfo->operation==GET && strides>0)
613
armci_SendStridedToSocket(CLN_sock[proc],ptr,stride_arr,count,strides,
614
(struct iovec *)((char*)(msginfo+1)+msginfo->dscrlen) ) ;
617
armci_write_strided_sock(ptr, strides, stride_arr, count, CLN_sock[proc]);
621
/*\ server writes data to socket associated with process "to"
623
void armci_sock_send(int to, void* data, int len)
625
int stat = armci_WriteToSocket(CLN_sock[to], data, len);
626
if(stat<0)armci_die("armci_sock_send:write failed",stat);
630
/*\ close all open sockets, called before terminating/aborting
632
void armci_transport_cleanup()
635
if(readylist)free(readylist);
636
armci_ShutdownAll(CLN_sock,armci_nproc); /*server */
638
armci_ShutdownAll(SRV_sock,armci_nclus); /*client */
642
/*\ main routine for data server process in a cluster environment
643
* the process is blocked (in select) until message arrives from
644
* the clients and services the requests
646
void armci_call_data_server()
651
readylist = (int*)calloc(sizeof(int),armci_nproc);
652
if(!readylist)armci_die("armci_data_server:could not allocate readylist",0);
655
printf("%d server waiting for request\n",armci_me); fflush(stdout);
659
/* server main loop; wait for and service requests until QUIT requested */
662
nready = armci_WaitSock(CLN_sock, armci_nproc, readylist);
664
for(i = 0; i < armci_nproc; i++){
666
p = (up) ? i : armci_nproc -1 -i;
667
if(!readylist[p])continue;
669
armci_data_server(&p);
672
if(nready==0) break; /* all sockets read */
675
/* fairness attempt: each time process the list in a different direction*/
676
up = 1- up; /* switch directions for the next round */
679
armci_die("armci_dataserv:readylist not consistent with nready",nready);
684
extern int tcp_sendrcv_bufsize;
685
void armci_determine_sock_buf_size(){
686
if(armci_nclus<=8)return;
687
if(armci_nclus>=128){tcp_sendrcv_bufsize = 32768;return;}
688
tcp_sendrcv_bufsize =(int)pow(2,(22-(int)(log(armci_nclus)/log(2))));
691
/*\ Create Sockets for clients and servers
693
void armci_init_connections()
695
int i,n,p,master = armci_clus_info[armci_clus_me].master;
697
/* sockets for communication with data server */
698
SRV_sock = (int*) malloc(sizeof(int)*armci_nclus);
699
if(!SRV_sock)armci_die("ARMCI cannot allocate SRV_sock",armci_nclus);
701
/* array that will be used to exchange port info */
702
AR_port = (int*) calloc(armci_nproc * armci_nclus, sizeof(int));
703
if(!AR_port)armci_die("ARMCI cannot allocate AR_port",armci_nproc*armci_nclus);
705
/* create active sockets list select */
706
if (!(_armci_active_socks = malloc(sizeof(active_socks_t))))
707
armci_die("dataserv.c, malloc _armci_active_socks failed",0);
708
for(i=0,n=MAX_BUFS+MAX_SMALL_BUFS;i<n;i++)_armci_active_socks->socks[i]=-1;
710
/* create sockets for communication with each user process */
711
if(master==armci_me){
712
CLN_sock = (int*) malloc(sizeof(int)*armci_nproc);
713
if(!CLN_sock)armci_die("ARMCI cannot allocate CLN_sock",armci_nproc);
714
armci_determine_sock_buf_size();
715
for(p=0; p< armci_nproc; p++){
716
int off_port = armci_clus_me*armci_nproc;
717
# ifdef SERVER_THREAD
718
if(p >=armci_clus_first && p <= armci_clus_last) CLN_sock[p]=-1;
721
armci_CreateSocketAndBind(CLN_sock + p, AR_port + p +off_port);
725
/* skip sockets associated with processes on the current node */
726
if(armci_clus_first>0)
727
armci_ListenSockAll(CLN_sock, armci_clus_first);
729
if(armci_clus_last< armci_nproc-1)
730
armci_ListenSockAll(CLN_sock + armci_clus_last+1,
731
armci_nproc-armci_clus_last-1);
733
armci_ListenSockAll(CLN_sock, armci_nproc);
740
void armci_wait_for_server()
742
if(armci_me == armci_master){
743
#ifndef SERVER_THREAD
746
armci_wait_server_process();
751
void armci_client_connect_to_servers()
755
#ifndef SERVER_THREAD
758
/* master has to close all sockets -- they are used by server PROCESS */
759
if(armci_master==armci_me)for(p=0; p< armci_nproc; p++){
764
/* exchange port numbers with processes in all cluster nodes
765
* save number of messages by using global sum -only masters contribute
768
nall = armci_nclus*armci_nproc;
769
armci_msg_igop(AR_port,nall,"+");
770
/*using port number create socket & connect to data server in each clus node*/
771
for(c=0; c< armci_nclus; c++){
773
int off_port = c*armci_nproc;
776
/*no intra node socket connection with server thread*/
777
if(c == armci_clus_me) SRV_sock[c]=-1;
780
SRV_sock[c] = armci_CreateSocketAndConnect(armci_clus_info[c].hostname,
781
AR_port[off_port + armci_me]);
782
if(DEBUG_ && SRV_sock[c]!=-1){
783
printf("%d: client connected to %s:%d\n",armci_me,
784
armci_clus_info[c].hostname, AR_port[off_port + armci_me]);
792
for(c=0; c< armci_nclus; c++)if(SRV_sock[c]!=-1){
793
stat =armci_ReadFromSocket(SRV_sock[c],str, sizeof(msg)+1);
794
if(stat<0)armci_die("read failed",stat);
795
printf("in client %d message was=%s from%d\n",armci_me,str,c);
800
free(AR_port); /* we do not need the port numbers anymore */
804
/*\ establish connections with compute processes
806
void armci_server_initial_connection()
810
if(armci_clus_first>0)
811
armci_AcceptSockAll(CLN_sock, armci_clus_first);
812
if(armci_clus_last< armci_nproc-1)
813
armci_AcceptSockAll(CLN_sock + armci_clus_last+1,
814
armci_nproc-armci_clus_last-1);
816
armci_AcceptSockAll(CLN_sock, armci_nproc);
821
printf("%d: server connected to all clients\n",armci_me); fflush(stdout);
823
for(p=0; p<armci_nproc; p++)if(CLN_sock[p]!=-1){
824
stat = armci_WriteToSocket(CLN_sock[p], msg, sizeof(msg)+1);
825
if(stat<0)armci_die("write failed",stat);
830
#ifndef SERVER_THREAD
831
/* we do not need the port numbers anymore */