~ubuntu-branches/ubuntu/trusty/nwchem/trusty-proposed

« back to all changes in this revision

Viewing changes to src/tools/ga-5-2/armci/src/devices/sockets/dataserv.c

  • Committer: Package Import Robot
  • Author(s): Michael Banck, Daniel Leidert, Andreas Tille, Michael Banck
  • Date: 2013-07-04 12:14:55 UTC
  • mfrom: (1.1.2)
  • Revision ID: package-import@ubuntu.com-20130704121455-5tvsx2qabor3nrui
Tags: 6.3-1
* New upstream release.
* Fixes anisotropic properties (Closes: #696361).
* New features include:
  + Multi-reference coupled cluster (MRCC) approaches
  + Hybrid DFT calculations with short-range HF 
  + New density-functionals including Minnesota (M08, M11) and HSE hybrid
    functionals
  + X-ray absorption spectroscopy (XAS) with TDDFT
  + Analytical gradients for the COSMO solvation model
  + Transition densities from TDDFT 
  + DFT+U and Electron-Transfer (ET) methods for plane wave calculations
  + Exploitation of space group symmetry in plane wave geometry optimizations
  + Local density of states (LDOS) collective variable added to Metadynamics
  + Various new XC functionals added for plane wave calculations, including
    hybrid and range-corrected ones
  + Electric field gradients with relativistic corrections 
  + Nudged Elastic Band optimization method
  + Updated basis sets and ECPs 

[ Daniel Leidert ]
* debian/watch: Fixed.

[ Andreas Tille ]
* debian/upstream: References

[ Michael Banck ]
* debian/upstream (Name): New field.
* debian/patches/02_makefile_flags.patch: Refreshed.
* debian/patches/06_statfs_kfreebsd.patch: Likewise.
* debian/patches/07_ga_target_force_linux.patch: Likewise.
* debian/patches/05_avoid_inline_assembler.patch: Removed, no longer needed.
* debian/patches/09_backported_6.1.1_fixes.patch: Likewise.
* debian/control (Build-Depends): Added gfortran-4.7 and gcc-4.7.
* debian/patches/10_force_gcc-4.7.patch: New patch, explicitly sets
  gfortran-4.7 and gcc-4.7, fixes test suite hang with gcc-4.8 (Closes:
  #701328, #713262).
* debian/testsuite: Added tests for COSMO analytical gradients and MRCC.
* debian/rules (MRCC_METHODS): New variable, required to enable MRCC methods.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#if HAVE_CONFIG_H
 
2
#   include "config.h"
 
3
#endif
 
4
 
 
5
/* $Id: dataserv.c,v 1.30.8.5 2007-07-02 05:18:13 d3p687 Exp $ */
 
6
#include "armcip.h"
 
7
#include "request.h"
 
8
#include "copy.h"
 
9
#if HAVE_STDIO_H
 
10
#   include <stdio.h>
 
11
#endif
 
12
#if HAVE_ERRNO_H
 
13
#   include <errno.h>
 
14
#endif
 
15
#if HAVE_MATH_H
 
16
#   include <math.h>
 
17
#endif
 
18
 
 
19
#define DEBUG_ 0
 
20
#define DEBUG1 0
 
21
#define USE_VECTOR_FORMAT_ 1
 
22
 
 
23
active_socks_t *_armci_active_socks;
 
24
 
 
25
extern int AR_ready_sigchld;
 
26
int *SRV_sock;
 
27
int *AR_port;
 
28
int *CLN_sock;
 
29
 
 
30
char *msg="hello from server";
 
31
static int *readylist=(int*)0;
 
32
 
 
33
#define GETBUF(buf,type,var) (var) = *(type*)(buf); (buf) += sizeof(type)
 
34
 
 
35
#if defined(USE_SOCKET_VECTOR_API)
 
36
int armci_RecvVectorFromSocket(int sock,armci_giov_t darr[], int len,
 
37
       struct iovec *iov){
 
38
    int i,j=0,k,num_xmit=0,lastiovlength,iovlength,n=0,max_iovec,totalsize=0;
 
39
    int totaliovs=0,dim1=0,dim2=0;
 
40
    struct iovec *saveiov=iov;
 
41
    max_iovec = MAX_IOVEC;
 
42
 
 
43
    for(i=0;i<len;i++)
 
44
        totaliovs+=darr[i].ptr_array_len;
 
45
    num_xmit = totaliovs/max_iovec;
 
46
    lastiovlength = totaliovs%max_iovec;
 
47
    if(num_xmit == 0) num_xmit = 1;
 
48
    else if(lastiovlength!=0)num_xmit++;
 
49
    dim2=darr[dim1].ptr_array_len;
 
50
    for(k=0;k<num_xmit;k++){
 
51
       if(lastiovlength!=0 && k==(num_xmit-1))iovlength=lastiovlength;
 
52
       else iovlength=max_iovec;
 
53
       iov=saveiov;
 
54
       for(j=0;j<iovlength;j++){
 
55
         if(dim2==0){dim1+=1;dim2=darr[dim1].ptr_array_len;}
 
56
         iov[j].iov_base=darr[dim1].dst_ptr_array[darr[dim1].ptr_array_len-dim2];
 
57
         iov[j].iov_len = darr[dim1].bytes;totalsize+=iov[j].iov_len;
 
58
         dim2--;
 
59
       }
 
60
       n+=armci_ReadVFromSocket(sock,iov,j,totalsize);
 
61
       if(DEBUG1){
 
62
         printf("\n%d:armci_RecvVectorFromSocket recved  iovlength=%d totalsize=%d n=%d",armci_me,iovlength,totalsize,n);
 
63
         fflush(stdout);
 
64
       }
 
65
       totalsize=0;
 
66
    }
 
67
    return(n);
 
68
}
 
69
 
 
70
 
 
71
int armci_SendVectorToSocket(int sock,armci_giov_t darr[], int len,
 
72
       struct iovec *iov){
 
73
    int i,j=0,k,num_xmit=0,lastiovlength,iovlength,n=0,max_iovec,totalsize=0;
 
74
    int totaliovs=0,dim1=0,dim2=0;
 
75
    struct iovec *saveiov=iov;
 
76
    max_iovec = MAX_IOVEC;
 
77
    for(i=0;i<len;i++)
 
78
        totaliovs+=darr[i].ptr_array_len;
 
79
    num_xmit = totaliovs/max_iovec;
 
80
    lastiovlength = totaliovs%max_iovec;
 
81
    if(num_xmit == 0) num_xmit = 1;
 
82
    else if(lastiovlength!=0)num_xmit++;
 
83
    dim2=darr[dim1].ptr_array_len;
 
84
    for(k=0;k<num_xmit;k++){
 
85
       if(lastiovlength!=0 && k==(num_xmit-1))iovlength=lastiovlength;
 
86
       else iovlength=max_iovec;
 
87
       iov=saveiov;
 
88
       for(j=0;j<iovlength;j++){
 
89
         if(dim2==0){dim1++;dim2=darr[dim1].ptr_array_len;}
 
90
         iov[j].iov_base=darr[dim1].src_ptr_array[darr[dim1].ptr_array_len-dim2];
 
91
         iov[j].iov_len = darr[dim1].bytes;totalsize+=iov[j].iov_len;
 
92
         dim2--;
 
93
       }
 
94
       n+=armci_WriteVToSocket(sock,iov,j,totalsize);
 
95
       if(DEBUG1){
 
96
         printf("\n%d:armci_SendVectorToSocket done iovlen=%d totalsiz=%d n=%d",
 
97
                armci_me,iovlength,totalsize,n);
 
98
         fflush(stdout);
 
99
       }
 
100
       totalsize = 0;
 
101
    }
 
102
    return(n);
 
103
}
 
104
 
 
105
 
 
106
int armci_RecvStridedFromSocket(int sock,void *dst_ptr, int dst_stride_arr[],
 
107
                     int count[],int stride_levels,struct iovec *iov){
 
108
 
 
109
char *dst=(char*)dst_ptr;
 
110
char *dst1;
 
111
int i,j,k,num_xmit=0,lastiovlength,iovlength,n=0,max_iovec,totalsize=0,vecind;
 
112
int total_of_2D=1;
 
113
int index[MAX_STRIDE_LEVEL], unit[MAX_STRIDE_LEVEL];
 
114
 
 
115
    max_iovec = MAX_IOVEC;
 
116
    if(DEBUG1){
 
117
       printf("\nin readv count[0] is %d and strarr[0] is%d\n",count[0],
 
118
             dst_stride_arr[0]);
 
119
       fflush(stdout);
 
120
    }
 
121
    index[2] = 0; unit[2] = 1;
 
122
    if(stride_levels>1){
 
123
       total_of_2D = count[2];
 
124
       for(j=3; j<=stride_levels; j++) {
 
125
         index[j] = 0; unit[j] = unit[j-1] * count[j-1];
 
126
         total_of_2D *= count[j];
 
127
       }
 
128
    }
 
129
 
 
130
    num_xmit = (total_of_2D*count[1])/max_iovec;
 
131
    lastiovlength = (total_of_2D*count[1])%max_iovec;
 
132
    if(num_xmit == 0) num_xmit = 1;
 
133
    else if(lastiovlength!=0)num_xmit++;
 
134
 
 
135
    k=0;vecind=0;
 
136
    if(lastiovlength!=0 && k==(num_xmit-1))iovlength=lastiovlength;
 
137
    else iovlength=max_iovec;
 
138
 
 
139
    for(i=0; i<total_of_2D; i++) {
 
140
       dst = (char *)dst_ptr;
 
141
       for(j=2; j<=stride_levels; j++) {
 
142
         dst += index[j] * dst_stride_arr[j-1];
 
143
         if(((i+1) % unit[j]) == 0) index[j]++;
 
144
         if(index[j] >= count[j]) index[j] = 0;
 
145
       }
 
146
       dst1=dst;
 
147
       for(j=0;j<count[1];j++,vecind++){
 
148
         if(vecind==iovlength){
 
149
           n+=armci_ReadVFromSocket(sock,iov,iovlength,totalsize);
 
150
           vecind = 0; totalsize=0; k++;
 
151
           if(lastiovlength!=0 && k==(num_xmit-1))iovlength=lastiovlength;
 
152
           else iovlength=max_iovec;
 
153
         }
 
154
         iov[vecind].iov_base = dst1;
 
155
         iov[vecind].iov_len = count[0];totalsize+=count[0];
 
156
         dst1+=dst_stride_arr[0];
 
157
       }
 
158
       if(vecind==iovlength){
 
159
         n+=armci_ReadVFromSocket(sock,iov,iovlength,totalsize);
 
160
         vecind = 0; totalsize=0; k++;
 
161
         if(lastiovlength!=0 && k==(num_xmit-1))iovlength=lastiovlength;
 
162
         else iovlength=max_iovec;
 
163
       }
 
164
       if(DEBUG1){
 
165
         printf("\n%d:armci_RecvStridedFromSocket iovlen=%d totalsize=%d n=%d",
 
166
               armci_me,iovlength,totalsize,n);
 
167
         fflush(stdout);
 
168
       }
 
169
    }
 
170
    return(n);
 
171
}
 
172
 
 
173
 
 
174
int armci_SendStridedToSocket(int sock,void *src_ptr, int src_stride_arr[],
 
175
                     int count[], int stride_levels,struct iovec *iov){
 
176
char *src=(char*)src_ptr;
 
177
char *src1;
 
178
int i,j,k,num_xmit=0,lastiovlength,iovlength,n=0,max_iovec,totalsize=0,vecind;
 
179
int total_of_2D=1;
 
180
int index[MAX_STRIDE_LEVEL], unit[MAX_STRIDE_LEVEL];
 
181
 
 
182
    max_iovec = MAX_IOVEC;
 
183
    if(DEBUG1){
 
184
       printf("\nin writev count[0] is %d and strarr[0] is%d\n",count[0],
 
185
             src_stride_arr[0]);
 
186
       fflush(stdout);
 
187
    }
 
188
    index[2] = 0; unit[2] = 1;
 
189
    if(stride_levels>1){
 
190
        total_of_2D = count[2];
 
191
        for(j=3; j<=stride_levels; j++) {
 
192
          index[j] = 0; unit[j] = unit[j-1] * count[j-1];
 
193
          total_of_2D *= count[j];
 
194
        }
 
195
    }
 
196
    num_xmit = total_of_2D*count[1]/max_iovec;
 
197
    lastiovlength = (total_of_2D*count[1])%max_iovec;
 
198
    if(num_xmit == 0) num_xmit = 1;
 
199
    else if(lastiovlength!=0)num_xmit++;
 
200
 
 
201
    k=0;vecind=0;
 
202
    if(lastiovlength!=0 && k==(num_xmit-1))iovlength=lastiovlength;
 
203
    else iovlength=max_iovec;
 
204
 
 
205
    for(i=0; i<total_of_2D; i++) {
 
206
       src = (char *)src_ptr;
 
207
       for(j=2; j<=stride_levels; j++) {
 
208
         src += index[j] * src_stride_arr[j-1];
 
209
         if(((i+1) % unit[j]) == 0) index[j]++;
 
210
         if(index[j] >= count[j]) index[j] = 0;
 
211
       }
 
212
       src1=src;
 
213
       for(j=0;j<count[1];j++,vecind++){
 
214
         if(vecind==iovlength){
 
215
           n+=armci_WriteVToSocket(sock,iov,iovlength,totalsize);
 
216
           vecind = 0; totalsize=0; k++;
 
217
           if(lastiovlength!=0 && k==(num_xmit-1))iovlength=lastiovlength;
 
218
           else iovlength=max_iovec;
 
219
         }
 
220
         iov[vecind].iov_base = src1;
 
221
         iov[vecind].iov_len = count[0];totalsize+=count[0];
 
222
         src1+=src_stride_arr[0];
 
223
       }
 
224
       if(vecind==iovlength){
 
225
         n+=armci_WriteVToSocket(sock,iov,iovlength,totalsize);
 
226
         vecind = 0; totalsize=0; k++;
 
227
         if(lastiovlength!=0 && k==(num_xmit-1))iovlength=lastiovlength;
 
228
         else iovlength=max_iovec;
 
229
       }
 
230
       if(DEBUG1){
 
231
         printf("\n%d:armci_SendStridedToSocket iovlength=%d totalsize=%d n=%d",
 
232
               armci_me,iovlength, totalsize,n);fflush(stdout);
 
233
       }
 
234
     }
 
235
     return(n);
 
236
}
 
237
 
 
238
 
 
239
int armci_direct_vector_snd(request_header_t *msginfo , armci_giov_t darr[],
 
240
       int len, int proc)
 
241
{
 
242
    int bufsize=0,bytes=0,s;
 
243
 
 
244
    for(s=0; s<len; s++){
 
245
        bytes   += darr[s].ptr_array_len * darr[s].bytes;/* data */
 
246
        bufsize += darr[s].ptr_array_len *sizeof(void*)+2*sizeof(int);/*descr*/
 
247
    }
 
248
    bufsize += bytes + sizeof(long) +2*sizeof(double) +8;
 
249
    if(msginfo->operation==GET)
 
250
        bufsize = msginfo->dscrlen+sizeof(request_header_t);
 
251
    if(msginfo->operation==PUT){
 
252
            msginfo->datalen=0;
 
253
        msginfo->bytes=msginfo->dscrlen;
 
254
        bufsize=msginfo->dscrlen+sizeof(request_header_t);
 
255
    }
 
256
    armci_send_req(proc, msginfo, bufsize);
 
257
    if(msginfo->operation==PUT){
 
258
       bytes=armci_SendVectorToSocket(SRV_sock[armci_clus_id(proc)],darr,len,
 
259
                 (struct iovec *)((char*)(msginfo+1)+msginfo->dscrlen) );
 
260
    }
 
261
    return(bytes);
 
262
}
 
263
 
 
264
int armci_direct_vector_get(request_header_t *msginfo , armci_giov_t darr[],
 
265
               int len, int proc)
 
266
{
 
267
    return armci_RecvVectorFromSocket(SRV_sock[armci_clus_id(proc)],darr,len,
 
268
                (struct iovec *)((char*)(msginfo+1)+msginfo->dscrlen) );
 
269
}
 
270
 
 
271
int armci_direct_vector(request_header_t *msginfo , armci_giov_t darr[],
 
272
       int len, int proc){
 
273
int bufsize=0,bytes=0,s;
 
274
    for(s=0; s<len; s++){
 
275
        bytes   += darr[s].ptr_array_len * darr[s].bytes;/* data */
 
276
        bufsize += darr[s].ptr_array_len *sizeof(void*)+2*sizeof(int);/*descr*/
 
277
    }
 
278
     bufsize += bytes + sizeof(long) +2*sizeof(double) +8;
 
279
    if(msginfo->operation==GET)
 
280
       bufsize = msginfo->dscrlen+sizeof(request_header_t);
 
281
    if(msginfo->operation==PUT){
 
282
        msginfo->datalen=0;
 
283
        msginfo->bytes=msginfo->dscrlen;
 
284
        bufsize=msginfo->dscrlen+sizeof(request_header_t);
 
285
    }
 
286
    armci_send_req(proc, msginfo, bufsize);
 
287
    if(msginfo->operation==GET){
 
288
       bytes=armci_RecvVectorFromSocket(SRV_sock[armci_clus_id(proc)],darr,len,
 
289
                 (struct iovec *)((char*)(msginfo+1)+msginfo->dscrlen) );
 
290
    }
 
291
    if(msginfo->operation==PUT){
 
292
       bytes=armci_SendVectorToSocket(SRV_sock[armci_clus_id(proc)],darr,len,
 
293
                 (struct iovec *)((char*)(msginfo+1)+msginfo->dscrlen) );
 
294
    }
 
295
    return(bytes);
 
296
}
 
297
 
 
298
#endif
 
299
 
 
300
/*\ client sends request message to server
 
301
\*/
 
302
int armci_send_req_msg(int proc, void *buf, int bytes)
 
303
{
 
304
    int cluster = armci_clus_id(proc);
 
305
    request_header_t* msginfo = (request_header_t*)buf;
 
306
    int idx, rc;
 
307
 
 
308
    THREAD_LOCK(armci_user_threads.net_lock);
 
309
 
 
310
    /* mark sockets as active (only if reply is expected?) */
 
311
    idx = _armci_buf_to_index(msginfo);
 
312
    _armci_active_socks->socks[idx] = SRV_sock[cluster];
 
313
 
 
314
    rc = (armci_WriteToSocket(SRV_sock[cluster], buf, bytes) < 0);
 
315
 
 
316
    THREAD_UNLOCK(armci_user_threads.net_lock);
 
317
 
 
318
    return rc;
 
319
}
 
320
 
 
321
 
 
322
void armci_write_strided_sock(void *ptr, int stride_levels, int stride_arr[], 
 
323
                                   int count[], int fd)
 
324
{
 
325
    int i, j, stat;
 
326
    long idx;    /* index offset of current block position to ptr */
 
327
    int n1dim;  /* number of 1 dim block */
 
328
    int bvalue[MAX_STRIDE_LEVEL], bunit[MAX_STRIDE_LEVEL]; 
 
329
 
 
330
    /* number of n-element of the first dimension */
 
331
    n1dim = 1;
 
332
    for(i=1; i<=stride_levels; i++)
 
333
        n1dim *= count[i];
 
334
 
 
335
    /* calculate the destination indices */
 
336
    bvalue[0] = 0; bvalue[1] = 0; bunit[0] = 1; bunit[1] = 1;
 
337
    for(i=2; i<=stride_levels; i++) {
 
338
        bvalue[i] = 0;
 
339
        bunit[i] = bunit[i-1] * count[i-1];
 
340
    }
 
341
 
 
342
    for(i=0; i<n1dim; i++) {
 
343
        idx = 0;
 
344
        for(j=1; j<=stride_levels; j++) {
 
345
            idx += bvalue[j] * stride_arr[j-1];
 
346
            if((i+1) % bunit[j] == 0) bvalue[j]++;
 
347
            if(bvalue[j] > (count[j]-1)) bvalue[j] = 0;
 
348
        }
 
349
 
 
350
            /* memcpy(buf, ((char*)ptr)+idx, count[0]); */
 
351
            /* buf += count[0]; */
 
352
        stat = armci_WriteToSocket(fd, ((char*)ptr)+idx, count[0]);
 
353
        if(stat<0)armci_die("armci_write_strided_sock:write failed",stat);
 
354
    }
 
355
}
 
356
 
 
357
 
 
358
 
 
359
void armci_read_strided_sock(void *ptr, int stride_levels, int stride_arr[], 
 
360
                                   int count[], int fd)
 
361
{
 
362
    int i, j, stat;
 
363
    long idx;    /* index offset of current block position to ptr */
 
364
    int n1dim;  /* number of 1 dim block */
 
365
    int bvalue[MAX_STRIDE_LEVEL], bunit[MAX_STRIDE_LEVEL]; 
 
366
    /* number of n-element of the first dimension */
 
367
    n1dim = 1;
 
368
    for(i=1; i<=stride_levels; i++)
 
369
        n1dim *= count[i];
 
370
 
 
371
    /* calculate the destination indices */
 
372
    bvalue[0] = 0; bvalue[1] = 0; bunit[0] = 1; bunit[1] = 1;
 
373
    for(i=2; i<=stride_levels; i++) {
 
374
        bvalue[i] = 0;
 
375
        bunit[i] = bunit[i-1] * count[i-1];
 
376
    }
 
377
 
 
378
    for(i=0; i<n1dim; i++) {
 
379
        idx = 0;
 
380
        for(j=1; j<=stride_levels; j++) {
 
381
            idx += bvalue[j] * stride_arr[j-1];
 
382
            if((i+1) % bunit[j] == 0) bvalue[j]++;
 
383
            if(bvalue[j] > (count[j]-1)) bvalue[j] = 0;
 
384
        }
 
385
 
 
386
        /* memcpy(buf, ((char*)ptr)+idx, count[0]); */
 
387
        /* buf += count[0]; */
 
388
        stat = armci_ReadFromSocket(fd, ((char*)ptr)+idx, count[0]);
 
389
        if(stat<0)armci_die("armci_read_strided_sock:read failed",stat);
 
390
    }
 
391
}
 
392
 
 
393
/*\ client sends strided data + request to server
 
394
\*/
 
395
int armci_send_req_msg_strided(int proc, request_header_t *msginfo,char *ptr,
 
396
                               int strides, int stride_arr[], int count[])
 
397
{
 
398
int cluster = armci_clus_id(proc);
 
399
int stat, bytes;
 
400
 
 
401
    if(DEBUG_){
 
402
      printf("%d:armci_send_req_msg_strided: op=%d to=%d bytes= %d \n",armci_me,
 
403
             msginfo->operation,proc,msginfo->datalen);
 
404
      fflush(stdout);
 
405
    }
 
406
 
 
407
    /* we write header + data descriptor */
 
408
    bytes = sizeof(request_header_t) + msginfo->dscrlen;
 
409
 
 
410
    THREAD_LOCK(armci_user_threads.net_lock);
 
411
 
 
412
    stat = armci_WriteToSocket(SRV_sock[cluster], msginfo, bytes);
 
413
    if(stat<0)armci_die("armci_send_strided:write failed",stat);
 
414
#if defined(USE_SOCKET_VECTOR_API)
 
415
    if(msginfo->operation==PUT && msginfo->datalen==0)
 
416
        armci_SendStridedToSocket( SRV_sock[cluster],ptr,stride_arr,count,
 
417
             strides,(struct iovec *)(msginfo+1) );
 
418
    else
 
419
#endif
 
420
    /* for larger blocks write directly to socket thus avoiding memcopy */
 
421
    armci_write_strided_sock(ptr, strides,stride_arr,count,SRV_sock[cluster]);
 
422
 
 
423
    THREAD_UNLOCK(armci_user_threads.net_lock);
 
424
 
 
425
    return 0;
 
426
}
 
427
 
 
428
 
 
429
char *armci_ReadFromDirect(int proc, request_header_t * msginfo, int len)
 
430
{
 
431
int cluster=armci_clus_id(proc);
 
432
int stat;
 
433
 
 
434
    if(DEBUG_){
 
435
      printf("%d:armci_ReadFromDirect:  from %d \n",armci_me,proc);
 
436
      fflush(stdout);
 
437
    }
 
438
    stat =armci_ReadFromSocket(SRV_sock[cluster],msginfo+1,len);
 
439
    if(stat<0)armci_die("armci_rcv_data: read failed",stat);
 
440
    return(char*)(msginfo+1);
 
441
}
 
442
 
 
443
 
 
444
/*\ client receives strided data from server
 
445
\*/
 
446
void armci_ReadStridedFromDirect(int proc, request_header_t* msginfo, void *ptr,
 
447
                                 int strides, int stride_arr[], int count[])
 
448
{
 
449
int cluster=armci_clus_id(proc);
 
450
 
 
451
    if(DEBUG_){
 
452
      printf("%d:armci_ReadStridedFromDirect:  from %d \n",armci_me,proc);
 
453
      fflush(stdout);
 
454
    }
 
455
 
 
456
#if defined(USE_SOCKET_VECTOR_API)
 
457
    if(msginfo->operation==GET && strides > 0)
 
458
        armci_RecvStridedFromSocket( SRV_sock[cluster],ptr,stride_arr,count,
 
459
             strides,(struct iovec *)((char*)(msginfo+1)+msginfo->dscrlen));
 
460
    else
 
461
#endif
 
462
 
 
463
    armci_read_strided_sock(ptr, strides, stride_arr, count, SRV_sock[cluster]);
 
464
}
 
465
 
 
466
 
 
467
/*********************************** server side ***************************/
 
468
 
 
469
#if defined(USE_SOCKET_VECTOR_API)
 
470
 
 
471
void armci_tcp_read_vector_data(request_header_t *msginfo,void *vdscr,int p){
 
472
int bytes,i,j,stat;
 
473
void **ptr;
 
474
char *dscr;
 
475
long len;
 
476
armci_giov_t *mydarr;
 
477
    bytes = msginfo->dscrlen;
 
478
    if(DEBUG1){
 
479
      printf("\n in armci_tcp_read_vector_data reading bytes=%d infonext=%p\n",
 
480
      bytes,(void*)(msginfo+1));fflush(stdout);
 
481
    }
 
482
    stat = armci_ReadFromSocket(CLN_sock[p],
 
483
                (MessageRcvBuffer+sizeof(request_header_t)),bytes);
 
484
 
 
485
    if(stat<0)armci_die("armci_tcp_read_vector_data: read of data failed",stat);        
 
486
    dscr=(MessageRcvBuffer+sizeof(request_header_t)); 
 
487
    ptr=(void**)dscr;
 
488
    *(void**)vdscr=(void *)dscr;
 
489
    mydarr = (armci_giov_t *)(dscr+bytes);  
 
490
    GETBUF(dscr, long ,len);
 
491
    if(len!=0){
 
492
       for(i=0;i<len;i++){
 
493
         GETBUF(dscr, int, mydarr[i].ptr_array_len);
 
494
         GETBUF(dscr, int, mydarr[i].bytes);
 
495
         mydarr[i].dst_ptr_array=(void**)dscr;
 
496
         dscr+=mydarr[i].ptr_array_len*sizeof(char*);  
 
497
       }
 
498
       j=armci_RecvVectorFromSocket(CLN_sock[p],mydarr,len,
 
499
              (struct iovec *)((char*)dscr+2*bytes) );
 
500
    }     
 
501
    
 
502
    
 
503
}
 
504
 
 
505
void armci_tcp_read_strided_data(request_header_t *msginfo,void *vdscr,int p)
 
506
{
 
507
int bytes;
 
508
void *ptr;
 
509
char *dscr;
 
510
int stride_levels, *stride_arr,*count,stat;
 
511
    bytes = msginfo->dscrlen;
 
512
    if(DEBUG1){
 
513
       printf("\n in armci tcp read strided data reading bytes=%d infonext=%p\n"
 
514
             ,bytes,(void*)(msginfo+1));fflush(stdout);
 
515
    }
 
516
    stat = armci_ReadFromSocket(CLN_sock[p],
 
517
                (MessageRcvBuffer+sizeof(request_header_t)),bytes);
 
518
 
 
519
    if(stat<0)armci_die("armci_tcp_read_strided_data:read of data failed",stat);
 
520
    dscr=(MessageRcvBuffer+sizeof(request_header_t));
 
521
    *(void**)vdscr=(void *)dscr;
 
522
    ptr = *(void**)dscr;           dscr += sizeof(void*);
 
523
    stride_levels = *(int*)dscr;   dscr += sizeof(int);
 
524
    stride_arr = (int*)dscr;       dscr += stride_levels*sizeof(int);
 
525
    count = (int*)dscr;            dscr += (stride_levels+1)*sizeof(int); 
 
526
    armci_RecvStridedFromSocket( CLN_sock[p],ptr,stride_arr,count,stride_levels,
 
527
         (struct iovec *)dscr);
 
528
 
 
529
    /*armci_RecvStridedFromSocket( CLN_sock[p],ptr,stride_arr,count,
 
530
     stride_levels,(struct iovec *)((char*)(msginfo+1)+msginfo->dscrlen) );*/
 
531
}
 
532
#endif
 
533
 
 
534
/*\ server receives request
 
535
\*/
 
536
void armci_rcv_req(void *mesg, void *phdr, void *pdescr,void *pdata,int *buflen)
 
537
{
 
538
request_header_t *msginfo = (request_header_t*)MessageRcvBuffer;
 
539
int hdrlen = sizeof(request_header_t);
 
540
int stat, p = *(int*)mesg;
 
541
int bytes;
 
542
 
 
543
    stat =armci_ReadFromSocket(CLN_sock[p],MessageRcvBuffer,hdrlen);
 
544
    if(stat<0) armci_die("armci_rcv_req: failed to receive header ",p);
 
545
     *(void**)phdr = msginfo;  
 
546
#if defined(USE_SOCKET_VECTOR_API)
 
547
    if(msginfo->operation == PUT && msginfo->datalen==0){
 
548
        if(msginfo->format==STRIDED)  
 
549
            armci_tcp_read_strided_data(msginfo,pdescr,p);
 
550
        if(msginfo->format==VECTOR){
 
551
            
 
552
            armci_tcp_read_vector_data(msginfo,pdescr,p);
 
553
        }  
 
554
        return;
 
555
    }
 
556
#endif
 
557
    *buflen = MSG_BUFLEN - hdrlen; 
 
558
    if (msginfo->operation == GET)
 
559
      bytes = msginfo->dscrlen; 
 
560
    else{
 
561
      bytes = msginfo->bytes;
 
562
      if(bytes >*buflen)armci_die2("armci_rcv_req: message overflowing rcv buf",
 
563
                                    msginfo->bytes,*buflen);
 
564
    }
 
565
 
 
566
    if(msginfo->bytes){
 
567
       stat = armci_ReadFromSocket(CLN_sock[p],msginfo+1,bytes);
 
568
       if(stat<0)armci_die("armci_rcv_req: read of data failed",stat);
 
569
       *(void**)pdescr = msginfo+1;
 
570
       *(void**)pdata  = msginfo->dscrlen + (char*)(msginfo+1); 
 
571
       *buflen -= msginfo->dscrlen; 
 
572
 
 
573
       if (msginfo->operation != GET)
 
574
           if(msginfo->datalen)*buflen -= msginfo->datalen;
 
575
 
 
576
    }else {
 
577
 
 
578
       *(void**)pdata  = msginfo+1;
 
579
       *(void**)pdescr = NULL;
 
580
    }
 
581
    
 
582
    if(msginfo->datalen>0 && msginfo->operation != GET){
 
583
 
 
584
       if(msginfo->datalen > ((int)MSG_BUFLEN) -((int)hdrlen) -msginfo->dscrlen)
 
585
          armci_die2("armci_rcv_req:data overflowing buffer",
 
586
                      msginfo->dscrlen,msginfo->datalen);
 
587
       *buflen -= msginfo->datalen;
 
588
    }
 
589
}
 
590
 
 
591
 
 
592
/*\ send data back to client
 
593
\*/
 
594
void armci_WriteToDirect(int to, request_header_t* msginfo, void *data)
 
595
{
 
596
int stat = armci_WriteToSocket(CLN_sock[to], data, msginfo->datalen);
 
597
    if(stat<0)armci_die("armci_WriteToDirect:write failed",stat);
 
598
}
 
599
 
 
600
 
 
601
/*\ server sends strided data back to client 
 
602
\*/
 
603
void armci_WriteStridedToDirect(int proc, request_header_t* msginfo,
 
604
                         void *ptr, int strides, int stride_arr[], int count[])
 
605
{
 
606
    if(DEBUG_){ 
 
607
      printf("%d:armci_WriteStridedToDirect:from %d\n",armci_me,proc);
 
608
      fflush(stdout);
 
609
    }
 
610
 
 
611
#if defined(USE_SOCKET_VECTOR_API)
 
612
    if(msginfo->operation==GET && strides>0)
 
613
        armci_SendStridedToSocket(CLN_sock[proc],ptr,stride_arr,count,strides,
 
614
             (struct iovec *)((char*)(msginfo+1)+msginfo->dscrlen) ) ;
 
615
    else
 
616
#endif
 
617
    armci_write_strided_sock(ptr, strides, stride_arr, count, CLN_sock[proc]);
 
618
}
 
619
 
 
620
 
 
621
/*\ server writes data to socket associated with process "to"
 
622
\*/
 
623
void armci_sock_send(int to, void* data, int len)
 
624
{
 
625
int stat = armci_WriteToSocket(CLN_sock[to], data, len);
 
626
    if(stat<0)armci_die("armci_sock_send:write failed",stat);
 
627
}
 
628
 
 
629
 
 
630
/*\ close all open sockets, called before terminating/aborting
 
631
\*/
 
632
void armci_transport_cleanup()
 
633
{
 
634
     if(SERVER_CONTEXT){ 
 
635
         if(readylist)free(readylist);
 
636
         armci_ShutdownAll(CLN_sock,armci_nproc); /*server */
 
637
     }else
 
638
         armci_ShutdownAll(SRV_sock,armci_nclus); /*client */
 
639
}
 
640
 
 
641
 
 
642
/*\ main routine for data server process in a cluster environment
 
643
 *  the process is blocked (in select) until message arrives from
 
644
 *  the clients and services the requests
 
645
\*/
 
646
void armci_call_data_server()
 
647
{
 
648
int nready;
 
649
int up=1;
 
650
 
 
651
    readylist = (int*)calloc(sizeof(int),armci_nproc);
 
652
    if(!readylist)armci_die("armci_data_server:could not allocate readylist",0);
 
653
 
 
654
    if(DEBUG_){
 
655
      printf("%d server waiting for request\n",armci_me); fflush(stdout);
 
656
      sleep(1);
 
657
    }
 
658
 
 
659
    /* server main loop; wait for and service requests until QUIT requested */
 
660
    for(;;){
 
661
      int i, p;
 
662
      nready = armci_WaitSock(CLN_sock, armci_nproc, readylist);
 
663
 
 
664
      for(i = 0; i < armci_nproc; i++){
 
665
 
 
666
          p = (up) ? i : armci_nproc -1 -i;
 
667
          if(!readylist[p])continue;
 
668
 
 
669
          armci_data_server(&p);
 
670
 
 
671
          nready--;
 
672
          if(nready==0) break; /* all sockets read */
 
673
      }
 
674
 
 
675
      /* fairness attempt: each time process the list in a different direction*/
 
676
      up = 1- up; /* switch directions for the next round */
 
677
 
 
678
      if(nready)
 
679
        armci_die("armci_dataserv:readylist not consistent with nready",nready);
 
680
    }
 
681
}
 
682
 
 
683
 
 
684
extern int tcp_sendrcv_bufsize;
 
685
void armci_determine_sock_buf_size(){
 
686
  if(armci_nclus<=8)return;
 
687
  if(armci_nclus>=128){tcp_sendrcv_bufsize = 32768;return;}
 
688
  tcp_sendrcv_bufsize =(int)pow(2,(22-(int)(log(armci_nclus)/log(2))));
 
689
  return;
 
690
}
 
691
/*\ Create Sockets for clients and servers
 
692
\*/
 
693
void armci_init_connections()
 
694
{
 
695
  int i,n,p,master = armci_clus_info[armci_clus_me].master;
 
696
  _armci_buf_init();
 
697
  /* sockets for communication with data server */
 
698
  SRV_sock = (int*) malloc(sizeof(int)*armci_nclus);
 
699
  if(!SRV_sock)armci_die("ARMCI cannot allocate SRV_sock",armci_nclus);
 
700
 
 
701
  /* array that will be used to exchange port info */
 
702
  AR_port = (int*) calloc(armci_nproc * armci_nclus, sizeof(int));
 
703
  if(!AR_port)armci_die("ARMCI cannot allocate AR_port",armci_nproc*armci_nclus);
 
704
 
 
705
  /* create active sockets list select */
 
706
  if (!(_armci_active_socks = malloc(sizeof(active_socks_t))))
 
707
      armci_die("dataserv.c, malloc _armci_active_socks failed",0);
 
708
  for(i=0,n=MAX_BUFS+MAX_SMALL_BUFS;i<n;i++)_armci_active_socks->socks[i]=-1;
 
709
 
 
710
  /* create sockets for communication with each user process */
 
711
  if(master==armci_me){
 
712
     CLN_sock = (int*) malloc(sizeof(int)*armci_nproc);
 
713
     if(!CLN_sock)armci_die("ARMCI cannot allocate CLN_sock",armci_nproc);
 
714
     armci_determine_sock_buf_size();
 
715
     for(p=0; p< armci_nproc; p++){
 
716
       int off_port = armci_clus_me*armci_nproc;
 
717
#      ifdef SERVER_THREAD
 
718
         if(p >=armci_clus_first && p <= armci_clus_last) CLN_sock[p]=-1;
 
719
         else
 
720
#      endif
 
721
         armci_CreateSocketAndBind(CLN_sock + p, AR_port + p +off_port);
 
722
     }
 
723
 
 
724
#ifdef SERVER_THREAD
 
725
     /* skip sockets associated with processes on the current node */
 
726
     if(armci_clus_first>0)
 
727
        armci_ListenSockAll(CLN_sock, armci_clus_first);
 
728
 
 
729
     if(armci_clus_last< armci_nproc-1)
 
730
        armci_ListenSockAll(CLN_sock + armci_clus_last+1,
 
731
                            armci_nproc-armci_clus_last-1);
 
732
#else
 
733
     armci_ListenSockAll(CLN_sock, armci_nproc);
 
734
#endif
 
735
 
 
736
  }
 
737
}
 
738
 
 
739
 
 
740
void armci_wait_for_server()
 
741
{
 
742
  if(armci_me == armci_master){
 
743
#ifndef SERVER_THREAD
 
744
     RestoreSigChldDfl();
 
745
     armci_serv_quit();
 
746
     armci_wait_server_process();
 
747
#endif
 
748
  }
 
749
}
 
750
 
 
751
void armci_client_connect_to_servers()
 
752
{
 
753
  int stat,c, nall;
 
754
  char str[100];
 
755
#ifndef SERVER_THREAD
 
756
  int p;
 
757
 
 
758
  /* master has to close all sockets -- they are used by server PROCESS */ 
 
759
  if(armci_master==armci_me)for(p=0; p< armci_nproc; p++){
 
760
     close(CLN_sock[p]);
 
761
  } 
 
762
#endif
 
763
 
 
764
  /* exchange port numbers with processes in all cluster nodes
 
765
   * save number of messages by using global sum -only masters contribute
 
766
   */
 
767
 
 
768
  nall = armci_nclus*armci_nproc;
 
769
  armci_msg_igop(AR_port,nall,"+");
 
770
  /*using port number create socket & connect to data server in each clus node*/
 
771
  for(c=0; c< armci_nclus; c++){
 
772
      
 
773
      int off_port = c*armci_nproc; 
 
774
 
 
775
#ifdef SERVER_THREAD
 
776
      /*no intra node socket connection with server thread*/
 
777
      if(c == armci_clus_me) SRV_sock[c]=-1; 
 
778
      else
 
779
#endif
 
780
       SRV_sock[c] = armci_CreateSocketAndConnect(armci_clus_info[c].hostname,
 
781
                                                  AR_port[off_port + armci_me]);
 
782
      if(DEBUG_ && SRV_sock[c]!=-1){
 
783
         printf("%d: client connected to %s:%d\n",armci_me,
 
784
             armci_clus_info[c].hostname, AR_port[off_port + armci_me]);
 
785
         fflush(stdout);
 
786
      }
 
787
  }
 
788
 
 
789
  if(DEBUG_){
 
790
     bzero(str,99);
 
791
  
 
792
     for(c=0; c< armci_nclus; c++)if(SRV_sock[c]!=-1){
 
793
        stat =armci_ReadFromSocket(SRV_sock[c],str, sizeof(msg)+1);
 
794
        if(stat<0)armci_die("read failed",stat);
 
795
        printf("in client %d message was=%s from%d\n",armci_me,str,c); 
 
796
        fflush(stdout);
 
797
     }
 
798
  }
 
799
 
 
800
  free(AR_port); /* we do not need the port numbers anymore */
 
801
}
 
802
 
 
803
 
 
804
/*\ establish connections with compute processes
 
805
\*/
 
806
void armci_server_initial_connection()
 
807
{
 
808
 
 
809
#ifdef SERVER_THREAD
 
810
     if(armci_clus_first>0)
 
811
        armci_AcceptSockAll(CLN_sock, armci_clus_first);
 
812
     if(armci_clus_last< armci_nproc-1)
 
813
        armci_AcceptSockAll(CLN_sock + armci_clus_last+1, 
 
814
                            armci_nproc-armci_clus_last-1);
 
815
#else
 
816
     armci_AcceptSockAll(CLN_sock, armci_nproc);
 
817
#endif
 
818
 
 
819
     if(DEBUG_){
 
820
       int stat, p;
 
821
       printf("%d: server connected to all clients\n",armci_me); fflush(stdout);
 
822
       sleep(1);
 
823
       for(p=0; p<armci_nproc; p++)if(CLN_sock[p]!=-1){
 
824
         stat = armci_WriteToSocket(CLN_sock[p], msg, sizeof(msg)+1);
 
825
         if(stat<0)armci_die("write failed",stat);
 
826
       }
 
827
       sleep(5);
 
828
     }
 
829
 
 
830
#ifndef SERVER_THREAD
 
831
     /* we do not need the port numbers anymore */
 
832
     free(AR_port);
 
833
#endif
 
834
}