~ubuntu-branches/ubuntu/saucy/nwchem/saucy

« back to all changes in this revision

Viewing changes to src/tools/ga-5-1/armci/tcgmsg/ipcv5.0/async_send.c

  • Committer: Package Import Robot
  • Author(s): Michael Banck, Michael Banck, Daniel Leidert
  • Date: 2012-02-09 20:02:41 UTC
  • mfrom: (1.1.1)
  • Revision ID: package-import@ubuntu.com-20120209200241-jgk03qfsphal4ug2
Tags: 6.1-1
* New upstream release.

[ Michael Banck ]
* debian/patches/02_makefile_flags.patch: Updated.
* debian/patches/02_makefile_flags.patch: Use internal blas and lapack code.
* debian/patches/02_makefile_flags.patch: Define GCC4 for LINUX and LINUX64
  (Closes: #632611 and LP: #791308).
* debian/control (Build-Depends): Added openssh-client.
* debian/rules (USE_SCALAPACK, SCALAPACK): Removed variables (Closes:
  #654658).
* debian/rules (LIBDIR, USE_MPIF4, ARMCI_NETWORK): New variables.
* debian/TODO: New file.
* debian/control (Build-Depends): Removed libblas-dev, liblapack-dev and
  libscalapack-mpi-dev.
* debian/patches/04_show_testsuite_diff_output.patch: New patch, shows the
  diff output for failed tests.
* debian/patches/series: Adjusted.
* debian/testsuite: Optionally run all tests if "all" is passed as option.
* debian/rules: Run debian/testsuite with "all" if DEB_BUILD_OPTIONS
  contains "checkall".

[ Daniel Leidert ]
* debian/control: Used wrap-and-sort. Added Vcs-Svn and Vcs-Browser fields.
  (Priority): Moved to extra according to policy section 2.5.
  (Standards-Version): Bumped to 3.9.2.
  (Description): Fixed a typo.
* debian/watch: Added.
* debian/patches/03_hurd-i386_define_path_max.patch: Added.
  - Define MAX_PATH if not defines to fix FTBFS on hurd.
* debian/patches/series: Adjusted.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#if HAVE_CONFIG_H
 
2
#   include "config.h"
 
3
#endif
 
4
 
 
5
#if HAVE_STDIO_H
 
6
#   include <stdio.h>
 
7
#endif
 
8
 
 
9
extern void exit(int status);
 
10
 
 
11
#include "tcgmsgP.h"
 
12
 
 
13
static const Integer false = 0;
 
14
static const Integer true  = 1;
 
15
 
 
16
extern void Busy(int);
 
17
extern void flush_send_q(void);
 
18
 
 
19
/* All data movement to/from shared memory is done using the
 
20
   COPY_TO/FROM_SHMEM macros */
 
21
#define COPY_TO_LOCAL(src, dest, n, destnode) (void) memcpy(dest, src, n)
 
22
#define COPY_FROM_LOCAL(src, dest, n) (void)memcpy(dest, src, n)
 
23
#define COPY_FROM_REMOTE(src,dest,n,p) (void)memcpy(dest, src, n) 
 
24
#define COPY_TO_REMOTE(src,dest,n,p) (void)memcpy(dest, src, n)
 
25
#ifndef FLUSH_CACHE 
 
26
#    define FLUSH_CACHE          
 
27
#endif
 
28
#ifndef FLUSH_CACHE_LINE
 
29
#    define FLUSH_CACHE_LINE(x) 
 
30
#endif
 
31
 
 
32
/* #define TCG_ABS(a) (((a) >= 0) ? (a) : (-(a))) */
 
33
 
 
34
 
 
35
/**
 
36
 * Return the value of a volatile variable in shared memory
 
37
 * that is REMOTE to this processor
 
38
 */
 
39
static Integer remote_flag(Integer *p, Integer node)
 
40
{
 
41
    Integer tmp;
 
42
 
 
43
    /*  FLUSH_CACHE;*/ /* no need to flush for one word only*/
 
44
    COPY_FROM_REMOTE(p, &tmp, sizeof(tmp), node);
 
45
    return tmp;
 
46
}
 
47
 
 
48
 
 
49
/**
 
50
 * Return the value of a volatile variable in shared memory
 
51
 * that is LOCAL to this processor
 
52
 */
 
53
static Integer local_flag(Integer *p)
 
54
{
 
55
    FLUSH_CACHE_LINE(p);  
 
56
    return(*p);
 
57
}
 
58
 
 
59
 
 
60
/**
 
61
 * Wait for (*p == value)
 
62
 */
 
63
static void local_await(Integer *p, Integer value)
 
64
{
 
65
    Integer pval;
 
66
    Integer nspin = 0;
 
67
    Integer spinlim = 100000000;
 
68
 
 
69
    while ((pval = local_flag(p)) != value) {
 
70
 
 
71
        if (pval && (pval != value)) {
 
72
            fprintf(stdout,"%2ld: invalid value=%ld, local_flag=%p %ld\n", 
 
73
                    TCGMSG_nodeid, (long)value, p, (long)pval);
 
74
            fflush(stdout);
 
75
            exit(1);
 
76
        }
 
77
        nspin++;
 
78
        if((nspin&7)==0)flush_send_q();
 
79
        if (nspin < spinlim)
 
80
            Busy(100);
 
81
        else 
 
82
            usleep(1);
 
83
    }
 
84
}
 
85
 
 
86
 
 
87
/**
 
88
 * Entry points to info about a message ... determine which
 
89
 * transport mechanism is appropriate and send as much as
 
90
 * possible without blocking.
 
91
 * 
 
92
 * Right now just shared memory ... when sockets are working this
 
93
 * routine will become async_shmem_send.
 
94
 * 
 
95
 * Shared-memory protocol aims for low latency. Each process has
 
96
 * one buffer for every other process.  Thus, to send a message U
 
97
 * merely have to determine if the receivers buffer for you is empty
 
98
 * and copy directly into the receivers buffer.
 
99
 * 
 
100
 * Return 0 if more data is to be sent, 1 if the send is complete.
 
101
 */
 
102
Integer async_send(SendQEntry *entry)
 
103
{
 
104
    Integer node = entry->node;
 
105
    ShmemBuf *sendbuf= TCGMSG_proc_info[node].sendbuf;
 
106
    Integer nleft, ncopy;
 
107
    Integer pval;
 
108
    Integer info[4];
 
109
 
 
110
#ifdef DEBUG
 
111
    (void) fprintf(stdout,"%2ld: sending to %ld buf=%lx len=%ld\n",
 
112
                   TCGMSG_nodeid, node, entry->buf, entry->lenbuf); 
 
113
    (void) fprintf(stdout,"%2ld: sendbuf=%lx\n", TCGMSG_nodeid, sendbuf);
 
114
    (void) fflush(stdout);
 
115
#endif
 
116
 
 
117
    if ((pval = remote_flag(&sendbuf->info[3], node))) {
 
118
#ifdef DEBUG
 
119
        {
 
120
            Integer info[4];
 
121
            FLUSH_CACHE;
 
122
            COPY_FROM_REMOTE(sendbuf->info, info, sizeof(info), node);
 
123
            fprintf(stdout,"%2ld: snd info after full = %ld %ld %ld\n",
 
124
                    TCGMSG_nodeid, info[0], info[1], info[2]);
 
125
            fflush(stdout);
 
126
        }
 
127
        sleep(1);
 
128
#endif
 
129
 
 
130
        return 0;
 
131
    }
 
132
 
 
133
    info[0] = entry->type; info[1] = entry->lenbuf; info[2] = entry->tag;
 
134
 
 
135
    /* Copy over the first buffer load of the message */
 
136
 
 
137
    nleft = entry->lenbuf - entry->written;
 
138
    ncopy = (Integer) ((nleft <= SHMEM_BUF_SIZE) ? nleft : SHMEM_BUF_SIZE);
 
139
 
 
140
    if (ncopy&7) {
 
141
#ifdef DEBUG
 
142
        printf("%2ld: rounding buffer up %ld->%ld\n",
 
143
                TCGMSG_nodeid, ncopy, ncopy + 8 - (ncopy&7));
 
144
        fflush(stdout);
 
145
#endif
 
146
        ncopy = ncopy + 8 - (ncopy&7); 
 
147
    }
 
148
 
 
149
    if (ncopy) {
 
150
        COPY_TO_REMOTE(entry->buf+entry->written, sendbuf->buf, ncopy, node);
 
151
    }
 
152
 
 
153
 
 
154
    /* NOTE that SHMEM_BUF_SIZE is a multiple of 8 by construction so that
 
155
       this ncopy is only rounded up on the last write */
 
156
 
 
157
    ncopy = (Integer) ((nleft <= SHMEM_BUF_SIZE) ? nleft : SHMEM_BUF_SIZE);
 
158
    entry->written += ncopy;
 
159
    entry->buffer_number++;
 
160
 
 
161
    /* Copy over the header information include buffer full flag */
 
162
 
 
163
    info[3] = entry->buffer_number;
 
164
    COPY_TO_REMOTE(info, sendbuf->info, sizeof(info), node);
 
165
 
 
166
    return (Integer) (entry->written == entry->lenbuf);
 
167
}
 
168
 
 
169
 
 
170
/**
 
171
 * Receive a message of given type from the specified node, returning
 
172
 * the message and length of the message.
 
173
 * 
 
174
 * Right now just shared memory ... when sockets are working this
 
175
 * routine will become msg_shmem_rcv
 
176
 *
 
177
 * Shared-memory protocol aims for low latency. Each process has
 
178
 * one buffer for every other process.  Thus, to send a message U
 
179
 * merely have to determine if the receivers buffer for you is empty
 
180
 * and copy directly into the receivers buffer.
 
181
 *
 
182
 * Return 0 if more data is to be sent, 1 if the send is complete.
 
183
 */
 
184
void msg_rcv(Integer type, char *buf, Integer lenbuf, Integer *lenmes, Integer node)
 
185
{
 
186
    Integer me = TCGMSG_nodeid;
 
187
    ShmemBuf *recvbuf;        /* Points to receving buffer */
 
188
    Integer nleft;
 
189
    Integer msg_type, msg_tag, msg_len;
 
190
    Integer buffer_number = 1;
 
191
    Integer expected_tag = TCGMSG_proc_info[node].tag_rcv++;
 
192
 
 
193
    if (node<0 || node>=TCGMSG_nnodes)
 
194
        Error("msg_rcv: node is out of range", node);
 
195
 
 
196
    recvbuf = TCGMSG_proc_info[node].recvbuf;  
 
197
 
 
198
    /* Wait for first part message to be written */
 
199
 
 
200
#ifdef DEBUG
 
201
    (void) fprintf(stdout,"%2ld: receiving from %ld buf=%lx len=%ld\n",
 
202
                   me, node, buf, lenbuf); 
 
203
 
 
204
    (void) fprintf(stdout,"%2ld: recvbuf=%lx\n", me, recvbuf);
 
205
    (void) fflush(stdout);
 
206
#endif
 
207
 
 
208
    local_await(&recvbuf->info[3], buffer_number);
 
209
 
 
210
    /* Copy over the header information */
 
211
 
 
212
    /*  FLUSH_CACHE;*/
 
213
    msg_type = recvbuf->info[0]; 
 
214
    msg_len  = recvbuf->info[1];
 
215
    msg_tag  = recvbuf->info[2];
 
216
 
 
217
    /* Check type and size information */
 
218
 
 
219
    if (msg_tag != expected_tag) {
 
220
        (void) fprintf(stdout,
 
221
                       "rcv: me=%ld from=%ld type=%ld, tag=%ld, expectedtag=%ld\n",
 
222
                       (long)me, (long)node, (long)type, (long)msg_tag, (long)expected_tag);
 
223
        fflush(stdout);
 
224
        Error("msg_rcv: tag mismatch ... transport layer failed????", 0L);
 
225
    }
 
226
 
 
227
    if (msg_type != type) {
 
228
        (void) fprintf(stdout,
 
229
                       "rcv: me=%ld from=%ld type=(%ld != %ld) tag=%ld len=%ld\n",
 
230
                       (long)me, (long)node, (long)type, (long)msg_type, (long)msg_tag, (long)msg_len);
 
231
        fflush(stdout);
 
232
        Error("msg_rcv: type mismatch ... strong typing enforced\n", 0L);
 
233
    }
 
234
 
 
235
    if (msg_len > lenbuf) {
 
236
        (void) fprintf(stderr,
 
237
                       "rcv: me=%ld from=%ld type=%ld tag=%ld len=(%ld > %ld)\n",
 
238
                       (long)me, (long)node, (long)type, (long)msg_tag, (long)msg_len, (long)lenbuf);
 
239
        Error("msg_rcv: message too Integer for buffer\n", 0L);
 
240
    }
 
241
 
 
242
    nleft = *lenmes = msg_len;
 
243
    if (nleft == 0) {
 
244
        recvbuf->info[3] = false;
 
245
    }
 
246
 
 
247
    while (nleft) {
 
248
        Integer ncopy = (Integer) ((nleft <= SHMEM_BUF_SIZE) ? nleft : SHMEM_BUF_SIZE);
 
249
        { 
 
250
            Integer line;
 
251
            if(ncopy < 321) 
 
252
                for(line = 0; line < ncopy; line+=32) 
 
253
                    FLUSH_CACHE_LINE(recvbuf->buf+line);
 
254
            else 
 
255
                FLUSH_CACHE;
 
256
        }
 
257
 
 
258
        /*    if (buffer_number > 1) FLUSH_CACHE;*/
 
259
        COPY_FROM_LOCAL(recvbuf->buf, buf, ncopy);
 
260
 
 
261
        recvbuf->info[3] = false;
 
262
 
 
263
        nleft -= ncopy;
 
264
        buf   += ncopy;
 
265
 
 
266
        if (nleft) {
 
267
            buffer_number++;
 
268
            local_await(&recvbuf->info[3], buffer_number);
 
269
        }
 
270
    }
 
271
}
 
272
 
 
273
 
 
274
Integer MatchShmMessage(Integer node, Integer type)
 
275
{
 
276
    ShmemBuf *recvbuf;
 
277
    Integer  msg_type;
 
278
 
 
279
    recvbuf = TCGMSG_proc_info[node].recvbuf;
 
280
 
 
281
    if(recvbuf->info[3] == false) return (0); /* no message to receive */
 
282
 
 
283
    /* we have a message but let's see if want it */
 
284
 
 
285
    FLUSH_CACHE_LINE(recvbuf->info);
 
286
    COPY_FROM_LOCAL(recvbuf->info, &msg_type, sizeof(Integer));
 
287
    if(type == msg_type) return (1);
 
288
    return (0);
 
289
}