1
/* $Id: sockets.c,v 1.23.8.1 2007-02-09 17:10:18 andriy Exp $ */
2
/**************************************************************************
3
Some parts of this code were derived from the TCGMSG file sockets.c
4
Jarek Nieplocha, last update 10/28/99
5
02/28/00: modified armci_WaitSock to allow some elements of socklist to
6
be <0 (and ignored). Needed for the threaded version of server.
7
*************************************************************************/
15
# define bcopy(s1,s2,n) memcpy(s2,s1,n)
16
# define sleep(x) Sleep(1000*(x))
17
# define CLOSE closesocket
19
# include <sys/wait.h>
20
# include <sys/time.h>
21
# include <sys/types.h>
22
# include <sys/socket.h>
23
/*# include <sys/uio.h> */ /*moved to sockets.h*/
24
# include <netinet/in.h>
25
# include <netinet/tcp.h>
35
# include <standards.h>
36
# include <sys/select.h>
37
# ifdef _AIXVERSION_430
38
typedef socklen_t soclen_t;
40
typedef size_t soclen_t;
42
#elif defined(XLCLINUX)
43
typedef socklen_t soclen_t;
53
/* portability of socklen_t definition is iffy - we need to avoid it !!
54
#if defined(LINUX) && ( defined(_SOCKETBITS_H) || defined(__BITS_SOCKET_H))
56
typedef size_t socklen_t;
58
typedef int socklen_t;
62
#ifndef MAX_STRIDE_LEVEL
63
#define MAX_STRIDE_LEVEL 8
66
extern int armci_me, armci_nproc,armci_nclus;
67
int tcp_sendrcv_bufsize=131072;
70
#define CONNECT_TRIALS 4
71
#define MAX_INTR_NO_DATA 8
73
int armci_PollSocket(int sock)
75
Poll the socket for available input.
77
Return 1 if data is available, 0 otherwise.
81
struct timeval timelimit;
91
timelimit.tv_usec = 0;
93
nready = select(sock+1, &ready, (fd_set *) NULL, (fd_set *) NULL, &timelimit);
98
armci_die("armci_PollSocket: error from select", sock);
105
/*\ sleep in select until data appears on one of sockets
106
* return number of sockets ready and indicate which ones are in ready array
107
* allows <0 values in socklist array (ignores them)
109
int armci_WaitSock(int *socklist, int num, int *ready)
116
if(num<0) armci_die("armci_WaitSock: num <0",num);
122
for(sock=0; sock<num; sock++){
123
if(socklist[sock] > maxsock)maxsock = socklist[sock];
125
if(socklist[sock] >0){ /* ignore fd=-1 on the list */
126
FD_SET(socklist[sock], &dset);
131
nready = select(maxsock+1, &dset, (fd_set*)NULL, (fd_set*)NULL, NULL);
135
fprintf(stderr,"%d:interrupted in select\n",armci_me);
138
armci_die("armci_WaitSocket: error from select", sock);
142
for(sock=0; sock<num; sock++){
144
if(socklist[sock] < 0) continue;
145
if(FD_ISSET(socklist[sock],&dset)) ready[sock]=1;
151
/* same as armci_WaitSock with lim nano-seconds timeout */
152
int armci_WaitSockLim(int *socklist, int num, int *ready, int lim)
154
struct timeval timelimit;
159
if(num<0) armci_die("armci_WaitSock: num <0",num);
165
for(sock=0; sock<num; sock++){
166
if(socklist[sock] > maxsock)maxsock = socklist[sock];
168
if(socklist[sock] >0){ /* ignore fd=-1 on the list */
169
FD_SET(socklist[sock], &dset);
172
timelimit.tv_sec = 0;
173
timelimit.tv_usec = lim;
176
nready = select(maxsock+1, &dset, (fd_set*)NULL, (fd_set*)NULL, &timelimit);
180
fprintf(stderr,"%d:interrupted in select\n",armci_me);
183
armci_die("armci_WaitSocket: error from select", sock);
187
for(sock=0; sock<num; sock++){
189
if(socklist[sock] < 0) continue;
190
if(FD_ISSET(socklist[sock],&dset)) ready[sock]=1;
200
void armci_TcpNoDelay( int sock)
202
Turn off waiting for more input to improve buffering
203
by TCP layer ... improves performance for small messages by
204
a factor of 30 or more. Slightly degrades performance for
208
int status, level, value=1;
210
struct protoent *proto = getprotobyname("tcp");
212
struct protoent *proto = getprotobyname("TCP");
214
void *optval = &value;
216
if (proto == (struct protoent *) NULL)
217
armci_die("armci_TcpNoDelay: getprotobyname on TCP failed!", -1);
219
level = proto->p_proto;
221
status = setsockopt(sock, level, TCP_NODELAY, optval, sizeof(int));
224
armci_die("armci_TcpNoDelay: setsockopt failed", status);
229
void armci_ShutdownAll(int socklist[], int num)
231
close all sockets discarding any pending data in either direction.
236
for (i=0; i<num; i++)
237
if (socklist[i] >= 0) {
238
(void) shutdown(socklist[i], 2);
239
(void) CLOSE(socklist[i]);
245
#if defined(USE_SOCKET_VECTOR_API)
247
int _armci_tcp_writev(int sock, struct iovec *iovptr,int writeiovlength,int currentwritesize,struct iovec *iov){
249
while(n!=currentwritesize){
251
rc=writev(sock,iovptr,writeiovlength);
252
if(rc<0)perror("writev failed");
253
if(DEBUG1&&0)if(rc<currentwritesize){printf("\n%d:_armci_tcp_writev write %d bytes of %d bytes writeiovlen=%d",armci_me,rc,currentwritesize,writeiovlength);fflush(stdout);}
255
if(n<currentwritesize){
258
while(templength!=rc){
259
if(iovptr->iov_len+templength>rc){
260
iovptr->iov_base=(char *)((*iovptr).iov_base)+(rc-templength);
261
iovptr->iov_len-=(rc-templength);
262
templength+=(rc-templength);
265
templength+=iovptr->iov_len;
270
writeiovlength-=completediovs;
271
if(writeiovlength<=0)writeiovlength=1;
277
int _armci_tcp_readv(int sock, struct iovec *iovptr,int readiovlength,int currentreadsize,struct iovec *iov){
279
while(n!=currentreadsize){
281
rc=readv(sock,iovptr,readiovlength);
282
if(rc<0)perror("readv failed");
283
if(DEBUG1&&0)if(rc<currentreadsize){printf("\n%d:_armci_tcp_readv Read %d bytes of %d bytes readiovlen=%d",armci_me,rc,currentreadsize,readiovlength);fflush(stdout);}
285
if(n<currentreadsize){
288
while(templength!=rc){
289
if(iovptr->iov_len+templength>rc){
290
iovptr->iov_base=(char *)((*iovptr).iov_base)+(rc-templength);
291
iovptr->iov_len-=(rc-templength);
292
templength+=(rc-templength);
295
templength+=iovptr->iov_len;
300
readiovlength-=completediovs;
301
if(readiovlength<=0)readiovlength=1;
307
int armci_ReadVFromSocket(int sock,struct iovec *iov, int iovlength, int totalsize)
309
struct iovec *iovptr;
310
int i=0,num_xmit=1,lastiovoriglen=0,lastiovnewlen=0,lastiovindex=-1,n=0;
311
int readiovlength,currentreadsize=totalsize,totalreadsofar=0,byteslefttoread=0;
312
char *lastiovorigbase=NULL;
314
if(totalsize>PACKET_SIZE){
315
while(totalreadsofar!=totalsize){
318
iovptr=iov+lastiovindex;
319
if(lastiovoriglen!=0){
320
iov[lastiovindex].iov_base = (lastiovorigbase+lastiovnewlen);
321
iov[lastiovindex].iov_len=lastiovoriglen-lastiovnewlen;
326
if(totalsize-totalreadsofar<PACKET_SIZE)byteslefttoread=totalsize-totalreadsofar;
327
else byteslefttoread=PACKET_SIZE;
328
while(currentreadsize<byteslefttoread){
329
if(iov[i].iov_len+currentreadsize>byteslefttoread){
330
lastiovoriglen=iov[i].iov_len;lastiovorigbase=(char *)iov[i].iov_base;
332
iov[i].iov_len=byteslefttoread-currentreadsize;
333
currentreadsize+=iov[i].iov_len; lastiovnewlen=iov[i].iov_len;
336
currentreadsize+=iov[i].iov_len;
342
if(lastiovoriglen>0)iovlength+=1;
343
totalreadsofar+=currentreadsize;
345
readiovlength=iovlength;
346
n=_armci_tcp_readv(sock,iovptr,readiovlength,currentreadsize,iov);
347
if(DEBUG1){printf("\nFinished reading n=%d bytes iov of length %d \n",n,iovlength);fflush(stdout);}
352
readiovlength=iovlength;
354
n+=_armci_tcp_readv(sock,iovptr,readiovlength,currentreadsize,iov);
355
if(DEBUG1){printf("\nFits in one packet Finished reading n=%d bytes iov of length %d \n",n,iovlength);fflush(stdout);}
361
int armci_WriteVToSocket(int sock,struct iovec *iov, int iovlength, int totalsize){
363
int lastiovoriglen=0,lastiovnewlen=0,lastiovindex=-1,totalwritesofar=0,byteslefttowrite=0;
364
struct iovec *iovptr;
365
int i=0,num_xmit=0,n=0;
366
int currentwritesize=totalsize,writeiovlength;
367
char *lastiovorigbase=NULL;
369
if(totalsize>PACKET_SIZE){
370
while(totalwritesofar!=totalsize){
373
iovptr=iov+lastiovindex;
374
if(lastiovoriglen!=0){
375
iov[lastiovindex].iov_base = (lastiovorigbase+lastiovnewlen);
376
iov[lastiovindex].iov_len= lastiovoriglen-lastiovnewlen;
381
if(totalsize-totalwritesofar<PACKET_SIZE)byteslefttowrite=totalsize-totalwritesofar;
382
else byteslefttowrite=PACKET_SIZE;
383
while(currentwritesize<byteslefttowrite){
384
if(iov[i].iov_len+currentwritesize>byteslefttowrite){
385
lastiovoriglen=iov[i].iov_len;lastiovorigbase=(char *)iov[i].iov_base;
387
iov[i].iov_len=byteslefttowrite-currentwritesize;
388
currentwritesize+=iov[i].iov_len;lastiovnewlen=iov[i].iov_len;
391
currentwritesize+=iov[i].iov_len;
397
totalwritesofar+=currentwritesize;
399
if(lastiovoriglen>0)iovlength+=1;
400
writeiovlength=iovlength;
401
n=_armci_tcp_writev(sock,iovptr,writeiovlength,currentwritesize,iov);
402
if(DEBUG1){printf("\nFinished writing n=%d iov of length %d \n",n,iovlength);fflush(stdout);}
403
if(n!=currentwritesize)armci_die2("\n problems with writing using writev\n",n,currentwritesize);
409
writeiovlength=iovlength;
411
n= _armci_tcp_writev(sock,iovptr,writeiovlength,currentwritesize,iov);
412
if(n<0)perror("write failed");
413
if(DEBUG1){printf("\nFits in one packet Finished writing n=%d iov of length %d \n",n,iovlength);fflush(stdout);}
414
if(n!=currentwritesize)armci_die2("\n problems with writing using writev\n",n,currentwritesize);
419
#endif /*for use_socket_vec_api*/
421
int armci_ReadFromSocket(int sock, void* buffer, int lenbuf)
423
Read from the socket until we get all we want.
427
int nread, status, nintr=0;
428
char *buf = (char*)buffer;
433
nread = recv(sock, buf, lenbuf, 0);
434
/* on linux 0 can be returned if socket is closed by sender */
435
if(nread < 0 || ((nread == 0) && errno ) ){
439
fprintf(stderr,"%d:interrupted in recv\n",armci_me);
442
/* retry a few times if nread==0 */
443
if(nread==0) nintr++;
445
if(nintr>MAX_INTR_NO_DATA) return -1; /* the socket must be closed */
451
(void) fprintf(stderr,"sock=%d, pid=%d, nread=%d, len=%d\n",
452
sock, armci_me, nread, lenbuf);
453
if(errno)perror("armci_ReadFromSocket: recv failed");
467
int armci_WriteToSocket (int sock, void* buffer, int lenbuf)
469
Write to the socket in packets of PACKET_SIZE bytes
474
char *buf = (char*)buffer;
477
printf("%d armci_WriteToSocket sock=%d lenbuf=%d\n",armci_me,sock,lenbuf);
484
len = (lenbuf > PACKET_SIZE) ? PACKET_SIZE : lenbuf;
485
nsent = send(sock, buf, len, 0);
487
if (nsent < 0) { /* This is bad news */
490
fprintf(stderr,"%d:interrupted in socket send\n",armci_me);
496
(void) fprintf(stderr,"sock=%d, pid=%d, nsent=%d, len=%d\n",
497
sock, armci_me, nsent, lenbuf);
498
(void) fflush(stderr);
513
void armci_CreateSocketAndBind(int *sock, int *port)
515
Create a socket, bind it to a wildcard internet name and return
516
the info so that its port number may be advertised
520
struct sockaddr_in server;
521
int size = PACKET_SIZE;
524
length = sizeof (struct sockaddr_in);
528
if ( (*sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
529
armci_die("armci_CreateSocketAndBind: socket creation failed", *sock);
531
if(setsockopt(*sock, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof on) == -1)
532
armci_die("armci_CreateSocketAndBind: error from setsockopt", -1);
534
/* Increase size of socket buffers to improve long message
535
performance and increase size of message that goes asynchronously */
537
if(setsockopt(*sock, SOL_SOCKET, SO_RCVBUF, (char *) &size, sizeof(size)))
538
armci_die("armci_CreateSocketAndBind: error setting SO_RCVBUF", size);
539
if(setsockopt(*sock, SOL_SOCKET, SO_SNDBUF, (char *) &size, sizeof(size)))
540
armci_die("armci_CreateSocketAndBind: error setting SO_SNDBUF", size);
542
armci_TcpNoDelay(*sock);
544
/* Name socket with wildcards */
546
server.sin_family = AF_INET;
547
server.sin_addr.s_addr = INADDR_ANY;
549
if (bind(*sock, (struct sockaddr *) &server, length) < 0)
550
armci_die("armci_CreateSocketAndBind: bind failed", 0);
552
/* Find out port number etc. */
554
if (getsockname(*sock, (struct sockaddr *) &server, &length) < 0)
555
armci_die("armci_CreateSocketAndBind: getsockname failed", 0);
557
*port = ntohs(server.sin_port);
562
/*\ listen for socket connections
564
void armci_ListenSockAll(int* socklist, int num)
568
if(num<0)armci_die("armci_ListenSockAll invalid number of sockets",num);
570
for(i=0; i< num; i++){
572
if (listen(socklist[i], num) < 0) {
576
armci_die("armci_ListenSockAll: listen failed", 0);
581
(void) printf("process %d out of listen on %d sockets\n",armci_me,num);
582
(void) fflush(stdout);
586
void armci_tcp_get_sock_buf_size(int msgsock){
587
int buffer_orig=32768,r;
589
while ( r < 0 && (tcp_sendrcv_bufsize > buffer_orig) ) {
591
r= setsockopt(msgsock, SOL_SOCKET, SO_SNDBUF, (char *) &tcp_sendrcv_bufsize, sizeof(tcp_sendrcv_bufsize));
592
r= setsockopt(msgsock, SOL_SOCKET, SO_RCVBUF, (char *) &tcp_sendrcv_bufsize, sizeof(tcp_sendrcv_bufsize));
593
if( r < 0 ) tcp_sendrcv_bufsize =(tcp_sendrcv_bufsize/2);
597
/*\ accept connections on the specified sockets
599
void armci_AcceptSockAll(int* socklist, int num)
601
fd_set ready, fdzero;
602
struct timeval timelimit;
603
int maxsock, msgsock, nready, num_accept=0;
606
if(num<0)armci_die("armci_AcceptSockAll invalid number of sockets",num);
608
/* Use select to wait for someone to try and establish a connection
609
so that we can add a short timeout to avoid hangs */
616
/* we negate socket number on the list to mark already connected */
618
for(i=0; i<num; i++){
619
if(socklist[i] > maxsock)maxsock = socklist[i]; /* find largest value*/
620
if(socklist[i]>0) FD_SET(socklist[i], &ready);
621
/* printf("%d: accepting socket%d=%d of %d\n",armci_me,i,socklist[i],num);*/
624
timelimit.tv_sec = TIMEOUT_ACCEPT;
625
timelimit.tv_usec = 0;
626
nready = select(maxsock+1, &ready, (fd_set *) NULL, (fd_set *) NULL,
629
/* error screening */
630
if ( (nready <= 0) && (errno == EINTR) )
633
armci_die("armci_AcceptSockAll: error from select",nready);
634
else if (nready == 0)
635
armci_die("armci_AcceptSockAll:timeout waiting for connection",nready);
637
/* if (bcmp(&ready,&fdzero,sizeof(fdzero)))*/
638
/* armci_die("armci_AcceptSockAll: out of select but not ready!",nready);*/
640
/* accept connection from newly contacted clients */
641
for(i=0; i< num; i++){
642
int sock = socklist[i];
643
if(sock<0) continue; /* accepted already */
644
if(!FD_ISSET(sock, &ready)) continue; /* not contacted yet */
648
msgsock = accept(sock, (struct sockaddr *) NULL, (soclen_t *) NULL);
651
msgsock2 = dup(msgsock);
652
/*(void) CLOSE(msgsock);*/
660
armci_die("armci_AcceptSockAll: accept failed", msgsock);
664
(void) printf("process %d out of accept socket=%d\n",armci_me,msgsock);
665
(void) fflush(stdout);
668
/* Increase size of socket buffers to improve long message
669
performance and increase size of message that goes asynchronously */
670
armci_tcp_get_sock_buf_size(msgsock);
672
armci_TcpNoDelay(msgsock);
674
(void) CLOSE(sock); /* will not be needing this again */
676
socklist[i] = -msgsock; /* negate connected socket on the list */
684
for(i=0; i< num; i++)
686
armci_die("armci_AcceptSockAll: not connected",socklist[i]);
688
socklist[i] = - socklist[i];
693
int armci_ListenAndAccept(int sock)
695
Listen and accept a connection on the specified socket
696
which was created with CreateSocketAndBind
700
struct timeval timelimit;
702
int size = PACKET_SIZE;
705
if (listen(sock, 1) < 0) {
709
armci_die("armci_ListenAndAccept: listen failed", 0);
713
(void) printf("process %d out of listen on socket %d\n",armci_me,sock);
714
(void) fflush(stdout);
717
/* Use select to wait for someone to try and establish a connection
718
so that we can add a short timeout to avoid hangs */
722
FD_SET(sock, &ready);
724
timelimit.tv_sec = TIMEOUT_ACCEPT;
725
timelimit.tv_usec = 0;
726
nready = select(sock+1, &ready, (fd_set *) NULL, (fd_set *) NULL,
728
if ( (nready <= 0) && (errno == EINTR) )
731
armci_die("armci_ListenAndAccept: error from select", nready);
732
else if (nready == 0)
733
armci_die("armci_ListenAndAccept: timeout waiting for connection", nready);
735
if (!FD_ISSET(sock, &ready))
736
armci_die("armci_ListenAndAccept: out of select but not ready!", nready);
739
msgsock = accept(sock, (struct sockaddr *) NULL, (soclen_t *) NULL);
744
armci_die("armci_ListenAndAccept: accept failed", msgsock);
748
(void) printf("process %d out of accept on socket %d\n", armci_me,msgsock);
749
(void) fflush(stdout);
752
/* Increase size of socket buffers to improve long message
753
performance and increase size of message that goes asynchronously */
755
if(setsockopt(msgsock, SOL_SOCKET, SO_RCVBUF, (char *) &size, sizeof size))
756
armci_die("armci_ListenAndAccept: error setting SO_RCVBUF", size);
757
if(setsockopt(msgsock, SOL_SOCKET, SO_SNDBUF, (char *) &size, sizeof size))
758
armci_die("armci_ListenAndAccept: error setting SO_SNDBUF", size);
760
armci_TcpNoDelay(sock);
762
(void) CLOSE(sock); /* will not be needing this again */
767
int armci_CreateSocketAndConnect(char *hostname, int port)
769
Return the file descriptor of the socket which connects me to the
770
remote process on hostname at port
772
hostname = hostname of the remote process
773
port = port number of remote socket
777
struct sockaddr_in server;
781
#if !defined(SGI) && !defined(WIN32)
782
struct hostent *gethostbyname();
787
if ( (sock = socket(AF_INET, SOCK_STREAM, 0)) < 0 ) {
788
(void) fprintf(stderr,"trying to connect to host=%s, port=%d\n",
790
armci_die("armci_CreateSocketAndConnect: socket failed", sock);
793
/* the following can be disabled */
794
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
795
(char *) &on, sizeof on) == -1)
796
armci_die("armci_CreateSocketAndConnect: error setting REUSEADDR", -1);
799
server.sin_family = AF_INET;
800
hp = gethostbyname(hostname);
802
(void) fprintf(stderr,"trying to connect to host=%s, port=%d\n",
804
armci_die("armci_CreateSocketAndConnect: gethostbyname failed", 0);
807
bcopy((char *) hp->h_addr, (char *) &server.sin_addr, hp->h_length);
808
server.sin_port = htons((unsigned short) port);
813
connect(sock, (struct sockaddr *) &server, sizeof server)) < 0) {
816
else if(trial>CONNECT_TRIALS){
818
(void) fprintf(stderr,"%d:trying connect to host=%s, port=%d t=%d %d\n",
819
armci_me,hostname, port,trial,errno);
820
perror("trying to connect:");
821
armci_die("armci_CreateSocketAndConnect: connect failed", status);
830
/* Increase size of socket buffers to improve long message
831
performance and increase size of message that goes asynchronously */
833
armci_tcp_get_sock_buf_size(sock);
834
armci_TcpNoDelay(sock);