~ubuntu-branches/ubuntu/trusty/nwchem/trusty-proposed

« back to all changes in this revision

Viewing changes to src/tools/ga-5-2/armci/tcgmsg/ipcv4.0/snd.c

  • Committer: Package Import Robot
  • Author(s): Michael Banck, Daniel Leidert, Andreas Tille, Michael Banck
  • Date: 2013-07-04 12:14:55 UTC
  • mfrom: (1.1.2)
  • Revision ID: package-import@ubuntu.com-20130704121455-5tvsx2qabor3nrui
Tags: 6.3-1
* New upstream release.
* Fixes anisotropic properties (Closes: #696361).
* New features include:
  + Multi-reference coupled cluster (MRCC) approaches
  + Hybrid DFT calculations with short-range HF 
  + New density-functionals including Minnesota (M08, M11) and HSE hybrid
    functionals
  + X-ray absorption spectroscopy (XAS) with TDDFT
  + Analytical gradients for the COSMO solvation model
  + Transition densities from TDDFT 
  + DFT+U and Electron-Transfer (ET) methods for plane wave calculations
  + Exploitation of space group symmetry in plane wave geometry optimizations
  + Local density of states (LDOS) collective variable added to Metadynamics
  + Various new XC functionals added for plane wave calculations, including
    hybrid and range-corrected ones
  + Electric field gradients with relativistic corrections 
  + Nudged Elastic Band optimization method
  + Updated basis sets and ECPs 

[ Daniel Leidert ]
* debian/watch: Fixed.

[ Andreas Tille ]
* debian/upstream: References

[ Michael Banck ]
* debian/upstream (Name): New field.
* debian/patches/02_makefile_flags.patch: Refreshed.
* debian/patches/06_statfs_kfreebsd.patch: Likewise.
* debian/patches/07_ga_target_force_linux.patch: Likewise.
* debian/patches/05_avoid_inline_assembler.patch: Removed, no longer needed.
* debian/patches/09_backported_6.1.1_fixes.patch: Likewise.
* debian/control (Build-Depends): Added gfortran-4.7 and gcc-4.7.
* debian/patches/10_force_gcc-4.7.patch: New patch, explicitly sets
  gfortran-4.7 and gcc-4.7, fixes test suite hang with gcc-4.8 (Closes:
  #701328, #713262).
* debian/testsuite: Added tests for COSMO analytical gradients and MRCC.
* debian/rules (MRCC_METHODS): New variable, required to enable MRCC methods.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#if HAVE_CONFIG_H
 
2
#   include "config.h"
 
3
#endif
 
4
 
 
5
/* $Header: /tmp/hpctools/ga/tcgmsg/ipcv4.0/snd.c,v 1.21 2004-04-01 02:04:57 manoj Exp $ */
 
6
 
 
7
#include <stdio.h>
 
8
#include <stdlib.h>
 
9
#ifdef SEQUENT
 
10
#include <strings.h>
 
11
#else
 
12
#include <string.h>
 
13
#endif
 
14
 
 
15
#ifdef AIX
 
16
#include <sys/select.h>
 
17
#endif
 
18
 
 
19
#include <sys/types.h>
 
20
#include <sys/time.h>
 
21
 
 
22
#if defined(SHMEM) || defined(SYSV)
 
23
#   if (defined(SGI_N32) || defined(SGITFP))
 
24
#       define PARTIALSPIN
 
25
#   else
 
26
#       define NOSPIN
 
27
#   endif
 
28
#endif
 
29
 
 
30
#if (defined(SUN) && !defined(SOLARIS))
 
31
    extern char *sprintf();
 
32
#endif
 
33
 
 
34
extern void Error();
 
35
 
 
36
#include "sndrcv.h"
 
37
#include "sndrcvP.h"
 
38
#include "tcgsockets.h"
 
39
 
 
40
#ifdef GOTXDR
 
41
#include "xdrstuff.h"
 
42
#endif
 
43
 
 
44
#if defined(SHMEM) || defined(SYSV)
 
45
#if !defined(SEQUENT) && !defined(CONVEX)
 
46
#include <memory.h>
 
47
#endif
 
48
#include "sema.h"
 
49
#include "tcgshmem.h"
 
50
#if defined(USE_SRMOVER)
 
51
extern void SRmover();
 
52
#else
 
53
#define SRmover(a,b,n) memcpy(a,b,n)
 
54
#endif
 
55
#endif 
 
56
 
 
57
#ifdef EVENTLOG
 
58
#include "evlog.h"
 
59
#endif
 
60
 
 
61
extern void ListenOnSock(int sock);
 
62
extern int AcceptConnection(int sock);
 
63
 
 
64
void PrintProcInfo()
 
65
/*
 
66
  Print out the SR_proc_info structure array for this process
 
67
*/
 
68
{
 
69
  long i;
 
70
 
 
71
  (void) fprintf(stderr,"Process info for node %ld: \n",NODEID_());
 
72
 
 
73
  for (i=0; i<NNODES_(); i++)
 
74
    (void) fprintf(stderr,"[%ld] = {\n\
 
75
     clusid = %-8ld    slaveid = %-8ld      local = %-8ld\n\
 
76
       sock = %-8d      shmem = %-8p shmem_size = %-8ld\n\
 
77
   shmem_id = %-8ld     buffer = %-8p     buflen = %-8ld\n\
 
78
     header = %-8p      semid = %-8ld   sem_read = %-8ld\n\
 
79
sem_written = %-8ld      n_rcv = %-8ld     nb_rcv = %-8ld\n\
 
80
      t_rcv = %-8ld      n_snd = %-8ld     nb_snd = %-8ld\n\
 
81
      t_snd = %-8ld,     peeked = %-8ld}\n",
 
82
                   i,
 
83
                   SR_proc_info[i].clusid,
 
84
                   SR_proc_info[i].slaveid,
 
85
                   SR_proc_info[i].local,
 
86
                   SR_proc_info[i].sock,
 
87
                   SR_proc_info[i].shmem,
 
88
                   SR_proc_info[i].shmem_size,
 
89
                   SR_proc_info[i].shmem_id,
 
90
                   SR_proc_info[i].buffer,
 
91
                   SR_proc_info[i].buflen,
 
92
                   SR_proc_info[i].header,
 
93
                   SR_proc_info[i].semid,
 
94
                   SR_proc_info[i].sem_read,
 
95
                   SR_proc_info[i].sem_written,
 
96
                   SR_proc_info[i].n_rcv,
 
97
           (long)  SR_proc_info[i].nb_rcv,
 
98
           (long)  SR_proc_info[i].t_rcv,
 
99
                   SR_proc_info[i].n_snd,
 
100
           (long)  SR_proc_info[i].nb_snd,
 
101
           (long)  SR_proc_info[i].t_snd,
 
102
                   SR_proc_info[i].peeked);
 
103
                   
 
104
  (void) fflush(stderr);
 
105
}
 
106
 
 
107
static void PrintMessageHeader(info, header)
 
108
     char *info;
 
109
     MessageHeader *header;
 
110
/*
 
111
  Print out the contents of a message header along with info message
 
112
*/
 
113
{
 
114
  (void) printf("%2ld:%s: type=%ld, from=%ld, to=%ld, len=%ld, tag=%ld\n",
 
115
                NODEID_(),info, header->type, header->nodefrom, 
 
116
                header->nodeto, header->length, header->tag);
 
117
  (void) fflush(stdout);
 
118
}
 
119
 
 
120
 
 
121
#if defined(SHMEM) || defined(SYSV)
 
122
 
 
123
static int DummyRoutine()
 
124
{int i, sum=0; for(i=0; i<10; i++) sum += i; return sum;}
 
125
 
 
126
static long flag(p)
 
127
     long *p;
 
128
{
 
129
#if defined(CONVEX) && defined(HPUX)
 
130
  asm("sync");
 
131
#endif
 
132
 
 
133
  return *p;
 
134
}
 
135
 
 
136
static void Await(p, value)
 
137
     long *p;
 
138
     long value;
 
139
/*
 
140
  Wait until the value pointed to by p equals value.
 
141
  Since *ptr is volatile but cannot usually declare this
 
142
  include another level of procedure call to protect
 
143
  against compiler optimization.
 
144
*/
 
145
{
 
146
  int nspin = 0;
 
147
  if (DEBUG_) {
 
148
    printf("%2ld: Await p=%p, value=%ld\n", NODEID_(), p, value);
 
149
    fflush(stdout);
 
150
  }
 
151
 
 
152
  for (; flag(p) != value; nspin++) {
 
153
#if defined(NOSPIN) && !defined(PARTIALSPIN)
 
154
    if (nspin < 100)
 
155
      (void) DummyRoutine();
 
156
    else 
 
157
      USleep((long) 10000);
 
158
#else
 
159
    if (nspin < 10000000)
 
160
      (void) DummyRoutine();
 
161
    else {
 
162
/*      printf("%2ld: Await sleeping\n", NODEID_()); fflush(stdout); */
 
163
      USleep((long) 100000);
 
164
    }
 
165
#endif
 
166
  }
 
167
}
 
168
 
 
169
static void rcv_local(type, buf, lenbuf, lenmes, nodeselect, nodefrom)
 
170
     long *type;
 
171
     char *buf;
 
172
     long *lenbuf;
 
173
     long *lenmes;
 
174
     long *nodeselect;
 
175
     long *nodefrom;
 
176
{
 
177
  long me = NODEID_();
 
178
  long node = *nodeselect;
 
179
  MessageHeader *head = SR_proc_info[node].header;
 
180
  long buflen = SR_proc_info[node].buflen;
 
181
  char *buffer = SR_proc_info[node].buffer;
 
182
  long nodeto, len;
 
183
#ifdef NOSPIN
 
184
  long semid = SR_proc_info[node].semid;
 
185
  long sem_read = SR_proc_info[node].sem_read;
 
186
  long sem_written = SR_proc_info[node].sem_written;
 
187
  long semid_to = SR_proc_info[me].semid;
 
188
  long sem_pend = SR_proc_info[me].sem_pend;
 
189
#endif
 
190
#if !defined(NOSPIN) || defined(PARTIALSPIN)
 
191
  long *buffer_full = SR_proc_info[node].buffer_full;
 
192
#endif
 
193
  
 
194
  /* Error checking */
 
195
  
 
196
  if ( (buffer == (char *) NULL) || (head == (MessageHeader *) NULL) )
 
197
    Error("rcv_local: invalid shared memory", (long) node);
 
198
 
 
199
#ifdef NOSPIN
 
200
  if ( (semid < 0) || (sem_read < 0) || (sem_written < 0) || 
 
201
       (semid_to < 0) || (sem_pend < 0) )
 
202
    Error("rcv_local: invalid semaphore set", (long) node);
 
203
#endif
 
204
 
 
205
#ifdef NOSPIN
 
206
  SemWait(semid_to, sem_pend);
 
207
#endif
 
208
 
 
209
  Await(&head->nodeto, me);     /* Still have this possible spin */
 
210
 
 
211
#if defined(NOSPIN) && !defined(PARTIALSPIN)
 
212
  SemWait(semid, sem_written);
 
213
#else
 
214
  Await(buffer_full, (long) TRUE);
 
215
#endif
 
216
 
 
217
  /* Now have a message for me ... check the header info and
 
218
     copy the first block of the message. */
 
219
  
 
220
  if (DEBUG_)
 
221
    PrintMessageHeader("rcv_local ",head);
 
222
 
 
223
  nodeto = head->nodeto;        /* Always me ... history here */
 
224
  head->nodeto = -1;
 
225
 
 
226
  *nodefrom = head->nodefrom;
 
227
 
 
228
  if (head->type != *type) {
 
229
    PrintMessageHeader("rcv_local ",head);
 
230
/*    printf("rcv_local: type mismatch ... strong typing enforced\n"); */
 
231
/*    abort(); */
 
232
    Error("rcv_local: type mismatch ... strong typing enforced", (long) *type);
 
233
  }
 
234
 
 
235
  *lenmes = len = head->length;
 
236
 
 
237
   if ( *lenmes > *lenbuf )
 
238
     Error("rcv_local: message too long for buffer", (long) *lenmes);
 
239
   if (nodeto != me)
 
240
     Error("rcv_local: message meant for someone else?", (long) nodeto);
 
241
     
 
242
  if (len)
 
243
    (void) SRmover(buf, buffer, (len > buflen) ? buflen : len);
 
244
 
 
245
#if defined(NOSPIN) && !defined(PARTIALSPIN)
 
246
  SemPost(semid, sem_read);
 
247
#else
 
248
  *buffer_full = FALSE;
 
249
# if defined(CONVEX) && defined(HPUX)
 
250
     asm("sync");
 
251
# endif
 
252
 
 
253
#endif
 
254
 
 
255
  len -= buflen;
 
256
  buf += buflen;
 
257
 
 
258
  /* Copy the remainder of the message */
 
259
  
 
260
  while (len > 0) {
 
261
#if defined(NOSPIN) && !defined(PARTIALSPIN)
 
262
    SemWait(semid, sem_written);
 
263
#else    
 
264
    Await(buffer_full, (long) TRUE);
 
265
#endif
 
266
    (void) SRmover(buf, buffer, (len > buflen) ? buflen : len);
 
267
#if defined(NOSPIN) && !defined(PARTIALSPIN)
 
268
    SemPost(semid, sem_read);
 
269
#else
 
270
    *buffer_full = FALSE;
 
271
#endif
 
272
    len -= buflen;
 
273
    buf += buflen;
 
274
  }
 
275
}
 
276
 
 
277
static void snd_local(type, buf, lenbuf, node)
 
278
     long *type;
 
279
     char *buf;
 
280
     long *lenbuf;
 
281
     long *node;
 
282
{
 
283
  long me = NODEID_();
 
284
  MessageHeader *head = SR_proc_info[me].header;
 
285
  long buflen = SR_proc_info[me].buflen;
 
286
  long len = *lenbuf;
 
287
  char *buffer = SR_proc_info[me].buffer;
 
288
  long tag = SR_proc_info[*node].n_snd;
 
289
#ifdef NOSPIN
 
290
  long semid = SR_proc_info[me].semid;
 
291
  long sem_read = SR_proc_info[me].sem_read;
 
292
  long sem_written = SR_proc_info[me].sem_written;
 
293
  long semid_to = SR_proc_info[*node].semid;
 
294
  long sem_pend = SR_proc_info[*node].sem_pend;
 
295
#endif
 
296
#if !defined(NOSPIN) || defined(PARTIALSPIN)
 
297
  long *buffer_full = SR_proc_info[me].buffer_full;
 
298
#endif
 
299
 
 
300
  /* Error checking */
 
301
  
 
302
  if ( (buffer == (char *) NULL) || (head == (MessageHeader *) NULL) )
 
303
    Error("snd_local: invalid shared memory", (long) *node);
 
304
 
 
305
#ifdef NOSPIN
 
306
  if ( (semid < 0) || (semid_to < 0) || (sem_read < 0) || (sem_written < 0) )
 
307
    Error("snd_local: invalid semaphore set", (long) *node);
 
308
#endif
 
309
 
 
310
  /* Check that final segment of last message has been consumed */
 
311
 
 
312
#if defined(NOSPIN) && !defined(PARTIALSPIN)
 
313
  SemWait(semid, sem_read);
 
314
#else
 
315
  Await(buffer_full, (long) FALSE);
 
316
#endif
 
317
 
 
318
  /* Fill in message header */
 
319
  
 
320
  head->nodefrom = (char) me;
 
321
  head->type = *type;
 
322
  head->length = *lenbuf;
 
323
  head->tag = tag;
 
324
  head->nodeto = (char) *node;
 
325
#if defined(CONVEX) && defined(HPUX)
 
326
  asm("sync");
 
327
#endif
 
328
 
 
329
  if (DEBUG_) {
 
330
    PrintMessageHeader("snd_local ",head);
 
331
    (void) fflush(stdout);
 
332
  }
 
333
 
 
334
  /* Copy the first piece of the message so that send along with
 
335
     header to minimize use of semaphores. Also need to send header
 
336
     even for messages of zero length */
 
337
 
 
338
  if (len)
 
339
    (void) SRmover(buffer, buf, (len > buflen) ? buflen : len);
 
340
 
 
341
#if defined(NOSPIN) && !defined(PARTIALSPIN)
 
342
  SemPost(semid, sem_written);
 
343
#else
 
344
  *buffer_full = TRUE;
 
345
# if defined(CONVEX) && defined(HPUX)
 
346
   asm("sync");
 
347
# endif
 
348
#endif
 
349
#ifdef NOSPIN
 
350
  SemPost(semid_to, sem_pend);
 
351
#endif
 
352
 
 
353
  len -= buflen;
 
354
  buf += buflen;
 
355
 
 
356
  while (len > 0) {
 
357
#if defined(NOSPIN) && !defined(PARTIALSPIN)
 
358
    SemWait(semid, sem_read);
 
359
#else
 
360
    Await(buffer_full, (long) FALSE);
 
361
#endif
 
362
    (void) SRmover(buffer, buf, (len > buflen) ? buflen : len);
 
363
#if defined(NOSPIN) && !defined(PARTIALSPIN)
 
364
    SemPost(semid, sem_written);
 
365
#else
 
366
    *buffer_full = TRUE;
 
367
#   if defined(CONVEX) && defined(HPUX)
 
368
       asm("sync");
 
369
#   endif
 
370
#endif
 
371
    len -= buflen;
 
372
    buf += buflen;
 
373
  }
 
374
}    
 
375
#endif
 
376
 
 
377
static void snd_remote(type, buf, lenbuf, node)
 
378
     long *type;
 
379
     char *buf;
 
380
     long *lenbuf;
 
381
     long *node;
 
382
/*
 
383
  synchronous send to remote process
 
384
 
 
385
  long *type     = user defined integer message type (input)
 
386
  char *buf      = data buffer (input)
 
387
  long *lenbuf   = length of buffer in bytes (input)
 
388
  long *node     = node to send to (input)
 
389
 
 
390
  for zero length messages only the header is sent
 
391
*/
 
392
{
 
393
#define SHORT_MSG_BUF_SIZE (2048 + 40)
 
394
  static char fudge[SHORT_MSG_BUF_SIZE]; 
 
395
  MessageHeader header;
 
396
  long me=NODEID_();
 
397
  int sock=SR_proc_info[*node].sock;
 
398
  long len;
 
399
#ifdef SOCK_FULL_SYNC
 
400
  char sync=0;
 
401
#endif
 
402
 
 
403
  if ( sock < 0 )
 
404
    Error("snd_remote: sending to process without socket", (long) *node);
 
405
 
 
406
  header.nodefrom = me;
 
407
  header.nodeto   = *node;
 
408
  header.type     = *type;
 
409
  header.length   = *lenbuf;
 
410
  header.tag      = SR_proc_info[*node].n_snd;
 
411
 
 
412
  /* header.length is the no. of items if XDR is used or just the
 
413
     number of bytes */
 
414
 
 
415
#ifdef GOTXDR
 
416
  if ( *type & MSGDBL )
 
417
    header.length = *lenbuf / sizeof(double);
 
418
  else if ( *type & MSGINT ) 
 
419
    header.length = *lenbuf / sizeof(long);
 
420
  else if ( *type & MSGCHR )
 
421
    header.length = *lenbuf / sizeof(char);
 
422
  else
 
423
    header.length = *lenbuf;
 
424
#else
 
425
  header.length = *lenbuf;
 
426
#endif
 
427
 
 
428
  if (DEBUG_)
 
429
    PrintMessageHeader("snd_remote",&header);
 
430
 
 
431
#ifndef GOTXDR
 
432
  /* Combine header and messages less than a certain size to avoid
 
433
   * performance problem on (older?) linuxes */
 
434
  if ((*lenbuf + sizeof(header)) <= sizeof(fudge)) {
 
435
    memcpy(fudge,(char *) &header, sizeof(header));
 
436
    memcpy(fudge+sizeof(header), buf, *lenbuf);
 
437
    if ( (len = WriteToSocket(sock, fudge, sizeof(header)+*lenbuf)) != 
 
438
                                          ((long)sizeof(header)+*lenbuf))
 
439
      Error("snd_remote: writing message to socket",
 
440
            (long) (len+100000*(sock + 1000* *node)));
 
441
    return;
 
442
  }
 
443
#endif
 
444
 
 
445
#ifdef GOTXDR
 
446
  (void) WriteXdrLong(sock, (long *) &header, 
 
447
                      (long) (sizeof(header)/sizeof(long)));
 
448
#else
 
449
  if ( (len = WriteToSocket(sock, (char *) &header, (long) sizeof(header)))
 
450
                                                        != sizeof(header) )
 
451
      Error("snd_remote: writing header to socket", len);
 
452
#endif
 
453
 
 
454
  if (*lenbuf)  {
 
455
#ifdef GOTXDR
 
456
    if ( *type & MSGDBL )
 
457
      (void) WriteXdrDouble(sock, (double *) buf, header.length);
 
458
    else if ( *type & MSGINT )
 
459
      (void) WriteXdrLong(sock, (long *) buf, header.length);
 
460
    else if ( *type & MSGCHR )
 
461
      (void) WriteXdrChar(sock, (char *) buf, header.length);
 
462
    else if ( (len = WriteToSocket(sock, buf, header.length)) != 
 
463
                                                      header.length)
 
464
      Error("snd_remote: writing message to socket",
 
465
            (long) (len+100000*(sock + 1000* *node)));
 
466
#else
 
467
    if ( (len = WriteToSocket(sock, buf, header.length)) != 
 
468
                                         header.length)
 
469
      Error("snd_remote: writing message to socket",
 
470
            (long) (len+100000*(sock + 1000* *node)));
 
471
#endif
 
472
  }
 
473
 
 
474
#ifdef SOCK_FULL_SYNC
 
475
  /* this read (and write in rcv_remote) of an acknowledgment 
 
476
     forces synchronous */
 
477
 
 
478
  if ( ReadFromSocket(sock, &sync, (long) 1) != 1)
 
479
    Error("snd_remote: reading acknowledgement",
 
480
          (long) (len+100000*(sock + 1000* *node)));
 
481
#endif
 
482
}
 
483
 
 
484
/*ARGSUSED*/
 
485
void SND_(type, buf, lenbuf, node, sync)
 
486
     long *type;
 
487
     void *buf;
 
488
     long *lenbuf;
 
489
     long *node;
 
490
     long *sync;
 
491
/*
 
492
  mostly syncrhonous send
 
493
 
 
494
  long *type     = user defined integer message type (input)
 
495
  void *buf     = data buffer (input)
 
496
  long *lenbuf   = length of buffer in bytes (input)
 
497
  long *node     = node to send to (input)
 
498
  long *sync    = flag for sync/async ... IGNORED
 
499
 
 
500
  for zero length messages only the header is sent
 
501
*/
 
502
{
 
503
  long me=NODEID_();
 
504
  long nproc=NNODES_();
 
505
#ifdef TIMINGS
 
506
  double start;
 
507
#endif
 
508
 
 
509
  /* Error checking */
 
510
 
 
511
  if (*node == me)
 
512
    Error("SND_: cannot send message to self", (long) me);
 
513
 
 
514
  if ( (*node < 0) || (*node > nproc) )
 
515
    Error("SND_: out of range node requested", (long) *node);
 
516
 
 
517
  if ( (*lenbuf < 0) || (*lenbuf > (long)BIG_MESSAGE) )
 
518
    Error("SND_: message length out of range", (long) *lenbuf);
 
519
 
 
520
#ifdef EVENTLOG
 
521
  evlog(EVKEY_BEGIN,     EVENT_SND,
 
522
        EVKEY_MSG_LEN,  (int) *lenbuf,
 
523
        EVKEY_MSG_FROM, (int)  me,
 
524
        EVKEY_MSG_TO,   (int) *node,
 
525
        EVKEY_MSG_TYPE, (int) *type,
 
526
        EVKEY_MSG_SYNC, (int) *sync,
 
527
        EVKEY_LAST_ARG);
 
528
#endif
 
529
 
 
530
  /* Send via shared memory or sockets */
 
531
 
 
532
#ifdef TIMINGS
 
533
  start = TCGTIME_();
 
534
#endif
 
535
 
 
536
#if defined(SHMEM) || defined(SYSV)
 
537
  if (SR_proc_info[*node].local){
 
538
#ifdef KSR_NATIVE
 
539
    KSR_snd_local(type, buf, lenbuf, node);
 
540
#else
 
541
    snd_local(type, buf, lenbuf, node);
 
542
#endif
 
543
  } else {
 
544
#endif
 
545
      snd_remote(type, buf, lenbuf, node);
 
546
#if defined(SHMEM) || defined(SYSV)
 
547
  }
 
548
#endif
 
549
 
 
550
  /* Collect statistics */
 
551
 
 
552
  SR_proc_info[*node].n_snd += 1;
 
553
  SR_proc_info[*node].nb_snd += *lenbuf;
 
554
 
 
555
#ifdef TIMINGS
 
556
  SR_proc_info[*node].t_snd += TCGTIME_() - start;
 
557
#endif
 
558
 
 
559
#ifdef EVENTLOG
 
560
  evlog(EVKEY_END, EVENT_SND, EVKEY_LAST_ARG);
 
561
#endif
 
562
}    
 
563
    
 
564
static long MatchMessage(header, me, type)
 
565
     MessageHeader *header;
 
566
     long me, type;
 
567
/*
 
568
  Wrapper round check on if header is to me and of required
 
569
  type so that compiler does not optimize out fetching
 
570
  header info from shared memory.
 
571
*/
 
572
{
 
573
  return (long) ((header->nodeto == me) && (header->type == type));
 
574
}
 
575
 
 
576
static long NextReadyNode(type)
 
577
     long type;
 
578
/*
 
579
  Select a node from which input is pending ... also match the
 
580
  desired type.
 
581
 
 
582
  next_node is maintained as the last node that NextReadyNode chose
 
583
  plus one modulo NNODES_(). This aids in ensuring fairness.
 
584
 
 
585
  First use select to get info about the sockets and then loop
 
586
  through processes looking either at the bit in the fd_set for
 
587
  the socket (remote process) or the message header in the shared
 
588
  memory buffer (local process).
 
589
 
 
590
  This may be an expensive operation but fairness seems important.
 
591
 
 
592
  If only sockets are in use, just block in select until data is
 
593
  available.  
 
594
*/
 
595
{
 
596
  static long  next_node = 0;
 
597
 
 
598
  long  nproc = NNODES_();
 
599
  long  me = NODEID_();
 
600
  int i, nspin = 0;
 
601
 
 
602
  if (!SR_using_shmem) {
 
603
    int list[MAX_PROCESS];
 
604
    int nready;
 
605
    nready = WaitForSockets(SR_nsock,SR_socks,list);
 
606
    if (nready == 0) 
 
607
      Error("NextReadyNode: nready = 0\n", 0);
 
608
 
 
609
    /* Insert here type checking logic ... not yet done */
 
610
 
 
611
    return SR_socks_proc[list[0]];
 
612
  }
 
613
 
 
614
  /* With both local and remote processes end up with a busy wait
 
615
     as no way to wait for both a semaphore and a socket.
 
616
     Moderate this slightly by having short timeout in select */
 
617
 
 
618
  while (1) {
 
619
    
 
620
    for(i=0; i<nproc; i++, next_node = (next_node + 1) % nproc) {
 
621
 
 
622
      if (next_node == me) {
 
623
        ;  /* can't receive from self */
 
624
      }
 
625
      else if (SR_proc_info[next_node].local) {
 
626
        /* Look for local message */
 
627
 
 
628
#ifdef KSR_NATIVE
 
629
        if (KSR_MatchMessage(next_node, me, type))
 
630
#else
 
631
        if (MatchMessage(SR_proc_info[next_node].header, me, type))
 
632
#endif
 
633
          break;
 
634
      }
 
635
      else if (SR_proc_info[next_node].sock >= 0) {
 
636
        /* Look for message over socket */
 
637
 
 
638
        int sock = SR_proc_info[next_node].sock;
 
639
 
 
640
        /* Have we already peeked at this socket? */
 
641
 
 
642
        if (SR_proc_info[next_node].peeked) {
 
643
          if (SR_proc_info[next_node].head_peek.type == type)
 
644
            break;
 
645
        }
 
646
        else if (PollSocket(sock)) {
 
647
          /* Data is available ... let's peek at it */
 
648
#ifdef GOTXDR
 
649
          (void) ReadXdrLong(sock, 
 
650
                             (long *) &SR_proc_info[next_node].head_peek,
 
651
                             (long) (sizeof(MessageHeader)/sizeof(long)));
 
652
#else
 
653
          if (ReadFromSocket(sock, 
 
654
                             (char *) &SR_proc_info[next_node].head_peek,
 
655
                             (long) sizeof(MessageHeader))
 
656
              != sizeof(MessageHeader) )
 
657
            Error("NextReadyNode: reading header from socket", next_node);
 
658
#endif
 
659
          SR_proc_info[next_node].peeked = TRUE;
 
660
          if (DEBUG_)
 
661
            PrintMessageHeader("peeked_at ",
 
662
                               &SR_proc_info[next_node].head_peek);
 
663
 
 
664
          if (SR_proc_info[next_node].head_peek.type == type)
 
665
            break;
 
666
        }
 
667
      }
 
668
    }
 
669
    if (i < nproc)       /* If found a node skip out of the while loop */
 
670
      break;
 
671
 
 
672
    nspin++;             /* Compromise between low latency and low cpu use */
 
673
    if (nspin < 10)
 
674
      continue;
 
675
    else if (nspin < 100)
 
676
      USleep((long) 1000);
 
677
    else if (nspin < 600)
 
678
      USleep((long) 10000);
 
679
    else
 
680
      USleep((long) 100000);
 
681
  }
 
682
 
 
683
  i = next_node;
 
684
  next_node = (next_node + 1) % nproc;
 
685
  
 
686
  return (long) i;
 
687
}
 
688
 
 
689
long PROBE_(type, node)
 
690
     long *type, *node;
 
691
     /*
 
692
       Return 1/0 (TRUE/FALSE) if a message of the given type is available
 
693
       from the given node.  If the node is specified as -1, then all nodes
 
694
       will be examined.  Some attempt is made at ensuring fairness.
 
695
       
 
696
       First use select to get info about the sockets and then loop
 
697
       through processes looking either at the bit in the fd_set for
 
698
       the socket (remote process) or the message header in the shared
 
699
       memory buffer (local process).
 
700
       
 
701
       This may be an expensive operation but fairness seems important.
 
702
       */
 
703
{
 
704
  long  nproc = NNODES_();
 
705
  long  me = NODEID_();
 
706
  int i, proclo, prochi;
 
707
 
 
708
  if (*node == me)
 
709
    Error("PROBE_ : cannot recv message from self, msgtype=", *type);
 
710
      
 
711
      if (*node == -1) {                /* match anyone */
 
712
        proclo = 0;
 
713
        prochi = nproc-1;
 
714
      }
 
715
      else
 
716
        proclo = prochi = *node;
 
717
  
 
718
  for(i=proclo; i<=prochi; i++) {
 
719
    
 
720
    if (i == me) {
 
721
      ;  /* can't receive from self */
 
722
    }
 
723
    else if (SR_proc_info[i].local) {
 
724
      /* Look for local message */
 
725
      
 
726
#ifdef KSR_NATIVE
 
727
      if (KSR_MatchMessage(i, me, type))
 
728
#else
 
729
      if (MatchMessage(SR_proc_info[i].header, me, *type))
 
730
#endif
 
731
        break;
 
732
    }
 
733
    else if (SR_proc_info[i].sock >= 0) {
 
734
      /* Look for message over socket */
 
735
      
 
736
      int sock = SR_proc_info[i].sock;
 
737
      
 
738
      /* Have we already peeked at this socket? */
 
739
      
 
740
      if (SR_proc_info[i].peeked) {
 
741
        if (SR_proc_info[i].head_peek.type == *type)
 
742
          break;
 
743
      }
 
744
      else if (PollSocket(sock)) {
 
745
        /* Data is available ... let's peek at it */
 
746
#ifdef GOTXDR
 
747
        (void) ReadXdrLong(sock, 
 
748
                           (long *) &SR_proc_info[i].head_peek,
 
749
                           (long) (sizeof(MessageHeader)/sizeof(long)));
 
750
#else
 
751
        if (ReadFromSocket(sock, 
 
752
                           (char *) &SR_proc_info[i].head_peek,
 
753
                           (long) sizeof(MessageHeader))
 
754
            != sizeof(MessageHeader) )
 
755
          Error("NextReadyNode: reading header from socket", (long) i);
 
756
#endif
 
757
        SR_proc_info[i].peeked = TRUE;
 
758
        if (DEBUG_)
 
759
          PrintMessageHeader("peeked_at ",
 
760
                             &SR_proc_info[i].head_peek);
 
761
        
 
762
        if (SR_proc_info[i].head_peek.type == *type)
 
763
          break;
 
764
      }
 
765
    }
 
766
  }
 
767
 
 
768
  if (i <= prochi)
 
769
    return 1;
 
770
  else
 
771
    return 0;
 
772
}
 
773
 
 
774
 
 
775
static void rcv_remote(type, buf, lenbuf, lenmes, nodeselect, nodefrom)
 
776
     long *type;
 
777
     char *buf;
 
778
     long *lenbuf;
 
779
     long *lenmes;
 
780
     long *nodeselect;
 
781
     long *nodefrom;
 
782
/*
 
783
  synchronous receive of data
 
784
 
 
785
  long *type        = user defined type of received message (input)
 
786
  char *buf        = data buffer (output)
 
787
  long *lenbuf      = length of buffer in bytes (input)
 
788
  long *lenmes      = length of received message in bytes (output)
 
789
                     (exceeding receive buffer is hard error)
 
790
  long *nodeselect  = node to receive from (input)
 
791
                     -1 implies that any pending message may be received
 
792
                       
 
793
  long *nodefrom    = node message is received from (output)
 
794
*/
 
795
{
 
796
  long me = NODEID_();
 
797
  long node = *nodeselect;
 
798
  int sock = SR_proc_info[node].sock;
 
799
  long len;
 
800
  MessageHeader header;
 
801
#ifdef SOCK_FULL_SYNC
 
802
  char sync = 0;
 
803
#endif
 
804
 
 
805
  if ( sock < 0 )
 
806
    Error("rcv_remote: receiving from process without socket", (long) node);
 
807
 
 
808
  /* read the message header and check contents */
 
809
 
 
810
  if (SR_proc_info[node].peeked) {
 
811
    /* Have peeked at this socket ... get message header from buffer */
 
812
 
 
813
    if (DEBUG_)
 
814
      printf("%2ld: rcv_remote message has been peeked at\n", me);
 
815
 
 
816
    (void) memcpy((char *) &header, (char *) &SR_proc_info[node].head_peek,
 
817
                  sizeof(MessageHeader));
 
818
    SR_proc_info[node].peeked = FALSE;
 
819
  }
 
820
  else {
 
821
#ifdef GOTXDR
 
822
    (void) ReadXdrLong(sock, (long *) &header,
 
823
                       (long) (sizeof(header)/sizeof(long)));
 
824
#else
 
825
    if ( (len = ReadFromSocket(sock, (char *) &header, (long) sizeof(header)))
 
826
        != sizeof(header) )
 
827
      Error("rcv_remote: reading header from socket", len);
 
828
#endif
 
829
  }
 
830
 
 
831
  if (DEBUG_)
 
832
    PrintMessageHeader("rcv_remote",&header);
 
833
 
 
834
  if (header.nodeto != me) {
 
835
    PrintMessageHeader("rcv_remote",&header);
 
836
    Error("rcv_remote: got message meant for someone else",
 
837
          (long) header.nodeto);
 
838
  }
 
839
 
 
840
  *nodefrom = header.nodefrom;
 
841
  if (*nodefrom != node)
 
842
    Error("rcv_remote: got message from someone on incorrect socket",
 
843
          (long) *nodefrom);
 
844
 
 
845
  if (header.type != *type) {
 
846
    PrintMessageHeader("rcv_remote",&header);
 
847
    printf("rcv_remote: type mismatch ... strong typing enforced\n");
 
848
    abort();
 
849
    Error("rcv_remote: type mismatch ... strong typing enforced", (long) *type);
 
850
  }
 
851
 
 
852
#ifdef GOTXDR
 
853
  if ( *type & MSGDBL )
 
854
    *lenmes = header.length * sizeof(double);
 
855
  else if ( *type & MSGINT )
 
856
    *lenmes = header.length * sizeof(long);
 
857
  else if ( *type & MSGCHR )
 
858
    *lenmes = header.length * sizeof(char);
 
859
  else
 
860
    *lenmes = header.length; 
 
861
#else
 
862
  *lenmes = header.length; 
 
863
#endif
 
864
  
 
865
  if ( (*lenmes < 0) || (*lenmes > (long)BIG_MESSAGE) || (*lenmes > *lenbuf) ) {
 
866
    PrintMessageHeader("rcv_remote",&header);
 
867
    (void) fprintf(stderr, "rcv_remote err: lenbuf=%ld\n",*lenbuf);
 
868
    Error("rcv_remote: message length out of range",(long) *lenmes);
 
869
  }
 
870
 
 
871
  if (*lenmes > 0) {
 
872
#ifdef GOTXDR
 
873
    if ( *type & MSGDBL )
 
874
      (void) ReadXdrDouble(sock, (double *) buf, header.length);
 
875
    else if ( *type & MSGINT ) 
 
876
      (void) ReadXdrLong(sock, (long *) buf, header.length);
 
877
    else if ( *type & MSGCHR )
 
878
      (void) ReadXdrChar(sock, (char *) buf, header.length);
 
879
    else if ( (len = ReadFromSocket(sock, buf, *lenmes)) != *lenmes)
 
880
      Error("rcv_remote: reading message from socket",
 
881
            (long) (len+100000*(sock+ 1000* *nodefrom)));
 
882
#else
 
883
    if ( (len = ReadFromSocket(sock, buf, *lenmes)) != *lenmes)
 
884
      Error("rcv_remote: reading message from socket",
 
885
            (long) (len+100000*(sock+ 1000* *nodefrom)));
 
886
#endif
 
887
  }
 
888
 
 
889
  /* this write (and read in snd_remote) makes the link synchronous */
 
890
 
 
891
#ifdef SOCK_FULL_SYNC
 
892
  if ( WriteToSocket(sock, &sync, (long) 1) != 1)
 
893
    Error("rcv_remote: writing sync to socket", (long) node);
 
894
#endif
 
895
 
 
896
}
 
897
 
 
898
/*ARGSUSED*/
 
899
void RCV_(type, buf, lenbuf, lenmes, nodeselect, nodefrom, sync)
 
900
     long *type;
 
901
     void *buf;
 
902
     long *lenbuf;
 
903
     long *lenmes;
 
904
     long *nodeselect;
 
905
     long *nodefrom;
 
906
     long *sync;
 
907
/*
 
908
  long *type        = user defined type of received message (input)
 
909
  void *buf        = data buffer (output)
 
910
  long *lenbuf      = length of buffer in bytes (input)
 
911
  long *lenmes      = length of received message in bytes (output)
 
912
                     (exceeding receive buffer is hard error)
 
913
  long *nodeselect  = node to receive from (input)
 
914
                     -1 implies that any pending message may be received
 
915
                       
 
916
  long *nodefrom    = node message is received from (output)
 
917
  long *sync        = 0 for asynchronous, 1 for synchronous (NOT USED)
 
918
*/
 
919
{
 
920
  long me = NODEID_();
 
921
  long nproc = NNODES_();
 
922
  long node;
 
923
#ifdef TIMINGS
 
924
  double start;
 
925
#endif
 
926
 
 
927
#ifdef EVENTLOG
 
928
  evlog(EVKEY_BEGIN,     EVENT_RCV,
 
929
        EVKEY_MSG_FROM, (int) *nodeselect,
 
930
        EVKEY_MSG_TO,   (int)  me,
 
931
        EVKEY_MSG_TYPE, (int) *type,
 
932
        EVKEY_MSG_SYNC, (int) *sync,
 
933
        EVKEY_LAST_ARG);
 
934
#endif
 
935
 
 
936
  /* Assign the desired node or the next ready node */
 
937
 
 
938
#ifdef TIMINGS
 
939
  start = TCGTIME_();
 
940
#endif
 
941
 
 
942
  if (*nodeselect == -1)
 
943
    node = NextReadyNode(*type);
 
944
  else
 
945
    node = *nodeselect;
 
946
 
 
947
  /* Check for some errors ... need more checking here ...
 
948
     note that the overall master process has id nproc */
 
949
 
 
950
  if (node == me)
 
951
    Error("RCV_: cannot receive message from self", (long) me);
 
952
 
 
953
  if ( (node < 0) || (node > nproc) )
 
954
    Error("RCV_: out of range node requested", (long) node);
 
955
 
 
956
  /* Receive the message ... use shared memory, switch or socket */
 
957
 
 
958
#if defined(SHMEM) || defined(SYSV)
 
959
  if (SR_proc_info[node].local){
 
960
#ifdef KSR_NATIVE
 
961
    KSR_rcv_local(type, buf, lenbuf, lenmes, &node, nodefrom);
 
962
#else
 
963
    rcv_local(type, buf, lenbuf, lenmes, &node, nodefrom);
 
964
#endif
 
965
  } else {
 
966
#endif
 
967
      rcv_remote(type, buf, lenbuf, lenmes, &node, nodefrom);
 
968
#if defined(SHMEM) || defined(SYSV)
 
969
  }
 
970
#endif
 
971
 
 
972
  /* Collect statistics */
 
973
 
 
974
  SR_proc_info[node].n_rcv += 1;
 
975
  SR_proc_info[node].nb_rcv += *lenmes;
 
976
 
 
977
#ifdef TIMINGS
 
978
  SR_proc_info[node].t_rcv += TCGTIME_() - start;
 
979
#endif
 
980
 
 
981
#ifdef EVENTLOG
 
982
  evlog(EVKEY_END, EVENT_RCV,
 
983
        EVKEY_MSG_FROM, (int) node,
 
984
        EVKEY_MSG_LEN, (int) *lenmes,
 
985
        EVKEY_LAST_ARG);
 
986
#endif
 
987
}    
 
988
  
 
989
void RemoteConnect(a, b, c)
 
990
     long a, b, c;
 
991
/*
 
992
  Make a socket connection between processes a and b via the
 
993
  process c to which both are already connected.
 
994
*/
 
995
{
 
996
  long me = NODEID_();
 
997
  long nproc = NNODES_();
 
998
  long type = TYPE_CONNECT;  /* Overriden below */
 
999
  char cport[8];
 
1000
  long tmp, lenmes, nodefrom, clusid, lenbuf, sync=1;
 
1001
  int sock, port;
 
1002
  long lport;
 
1003
 
 
1004
  if ((a == b) || (a == c) || (b == c) )
 
1005
    return;        /* Gracefully ignore redundant connections */
 
1006
 
 
1007
  if ( (me != a) && (me != b) && (me != c) )
 
1008
    return;        /* I'm not involved in this connection */
 
1009
    
 
1010
 
 
1011
  if (a < b) {
 
1012
    tmp = a; a = b; b = tmp;
 
1013
  }
 
1014
 
 
1015
  type = (a + nproc*b) | MSGINT;  /* Create a unique type */
 
1016
 
 
1017
  if (DEBUG_) {
 
1018
    (void) printf("RC a=%ld, b=%ld, c=%ld, me=%ld\n",a,b,c,me);
 
1019
    (void) fflush(stdout);
 
1020
  }
 
1021
 
 
1022
  if (a == me) {
 
1023
    CreateSocketAndBind(&sock, &port);  /* Create port */
 
1024
    if (DEBUG_) {
 
1025
      (void) printf("RC node=%ld, sock=%d, port=%d\n",me, sock, port);
 
1026
      (void) fflush(stdout);
 
1027
    }
 
1028
    lport = port;
 
1029
    lenbuf = sizeof lport;
 
1030
    ListenOnSock(sock);
 
1031
    SND_(&type, (char *) &lport, &lenbuf, &c, &sync); /* Port to intermediate */
 
1032
    SR_proc_info[b].sock = AcceptConnection(sock); /* Accept connection
 
1033
                                                     and save socket info */
 
1034
  }
 
1035
  else if (b == me) {
 
1036
    clusid = SR_proc_info[a].clusid;
 
1037
    lenbuf = sizeof lport;
 
1038
    RCV_(&type, (char *) &lport, &lenbuf, &lenmes, &c, &nodefrom, &sync);
 
1039
    port = lport;
 
1040
    (void) sprintf(cport,"%d",port);
 
1041
    lenbuf = strlen(cport) + 1;
 
1042
    if (lenbuf > (long)sizeof(cport))
 
1043
      Error("RemoteConnect: cport too small", (long) lenbuf);
 
1044
    SR_proc_info[a].sock = 
 
1045
      CreateSocketAndConnect(SR_clus_info[clusid].hostname, cport); 
 
1046
  }
 
1047
  else if (c == me) {
 
1048
    lenbuf = sizeof lport;
 
1049
    RCV_(&type, (char *) &lport, &lenbuf, &lenmes, &a, &nodefrom, &sync);
 
1050
    SND_(&type, (char *) &lport, &lenbuf, &b, &sync);
 
1051
  }
 
1052
}