~ubuntu-branches/ubuntu/utopic/adios/utopic

« back to all changes in this revision

Viewing changes to .pc/mpi_in_place.patch/src/adios_mpi_lustre.c

  • Committer: Package Import Robot
  • Author(s): Alastair McKinstry
  • Date: 2013-12-09 15:21:31 UTC
  • mfrom: (1.1.2)
  • Revision ID: package-import@ubuntu.com-20131209152131-jtd4fpmdv3xnunnm
Tags: 1.5.0-1
* New upstream.
* Standards-Version: 3.9.5
* Include latest config.{sub,guess} 
* New watch file.
* Create libadios-bin for binaries.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* 
2
 
 * ADIOS is freely available under the terms of the BSD license described
3
 
 * in the COPYING file in the top level directory of this source distribution.
4
 
 *
5
 
 * Copyright (c) 2008 - 2009.  UT-BATTELLE, LLC. All rights reserved.
6
 
 */
7
 
#include <unistd.h>
8
 
#include <fcntl.h>
9
 
#include <stdlib.h>
10
 
#include <math.h>
11
 
#include <string.h>
12
 
#include <errno.h>
13
 
 
14
 
// mpi
15
 
#include "mpi.h"
16
 
 
17
 
// xml parser
18
 
#include <mxml.h>
19
 
 
20
 
#include "adios.h"
21
 
#include "adios_transport_hooks.h"
22
 
#include "adios_bp_v1.h"
23
 
#include "adios_internals.h"
24
 
#include "buffer.h"
25
 
 
26
 
static int adios_mpi_lustre_initialized = 0;
27
 
 
28
 
#define COLLECT_METRICS 0
29
 
 
30
 
struct adios_MPI_data_struct
31
 
{
32
 
    MPI_File fh;
33
 
    MPI_Request req;
34
 
    MPI_Status status;
35
 
    MPI_Comm group_comm;
36
 
    int rank;
37
 
    int size;
38
 
 
39
 
    void * comm; // temporary until moved from should_buffer to open
40
 
 
41
 
    struct adios_bp_buffer_struct_v1 b;
42
 
 
43
 
    struct adios_index_process_group_struct_v1 * old_pg_root;
44
 
    struct adios_index_var_struct_v1 * old_vars_root;
45
 
    struct adios_index_attribute_struct_v1 * old_attrs_root;
46
 
 
47
 
    uint64_t vars_start;
48
 
    uint64_t vars_header_size;
49
 
 
50
 
    uint64_t striping_unit;  // file system stripe size
51
 
    uint64_t block_unit;
52
 
};
53
 
 
54
 
#if COLLECT_METRICS
55
 
// see adios_adaptive_finalize for what each represents
56
 
struct timeval t0, t1, t2, t3, t4, t5, t6, t7, t8, t9, t10, t11, t12, t13, t14, t15, t16, t17, t18, t19, t20, t21, t22, t23, t24, t25;
57
 
 
58
 
// Subtract the `struct timeval' values X and Y,
59
 
// storing the result in RESULT.
60
 
// Return 1 if the difference is negative, otherwise 0.
61
 
static int timeval_subtract (struct timeval * result
62
 
                            ,struct timeval * x, struct timeval * y
63
 
                            )
64
 
{
65
 
  // Perform the carry for the later subtraction by updating y.
66
 
  if (x->tv_usec < y->tv_usec)
67
 
  {
68
 
    int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;
69
 
    y->tv_usec -= 1000000 * nsec;
70
 
    y->tv_sec += nsec;
71
 
  }
72
 
  if (x->tv_usec - y->tv_usec > 1000000)
73
 
  {
74
 
    int nsec = (x->tv_usec - y->tv_usec) / 1000000;
75
 
    y->tv_usec += 1000000 * nsec;
76
 
    y->tv_sec -= nsec;
77
 
  }
78
 
 
79
 
  // Compute the time remaining to wait.
80
 
  // tv_usec is certainly positive.
81
 
  result->tv_sec = x->tv_sec - y->tv_sec;
82
 
  result->tv_usec = x->tv_usec - y->tv_usec;
83
 
 
84
 
  // Return 1 if result is negative.
85
 
  return x->tv_sec < y->tv_sec;
86
 
}
87
 
 
88
 
static
89
 
void print_metrics (struct adios_MPI_data_struct * md, int iteration)
90
 
{
91
 
    struct timeval diff;
92
 
    if (md->rank == 0)
93
 
    {
94
 
        timeval_subtract (&diff, &t2, &t1);
95
 
        printf ("cc\t%2d\tFile create (stripe setup):\t%02d.%06d\n"
96
 
               ,iteration, diff.tv_sec, diff.tv_usec);
97
 
        
98
 
        timeval_subtract (&diff, &t6, &t5);
99
 
        printf ("dd\t%2d\tMass file open:\t%02d.%06d\n"
100
 
               ,iteration, diff.tv_sec, diff.tv_usec);
101
 
        
102
 
        timeval_subtract (&diff, &t17, &t16);
103
 
        printf ("ee\t%2d\tBuild file offsets:\t%02d.%06d\n"
104
 
               ,iteration, diff.tv_sec, diff.tv_usec);
105
 
    }
106
 
    if (md->rank == md->size - 1)
107
 
    {   
108
 
        timeval_subtract (&diff, &t10, &t9);
109
 
        printf ("ff\t%2d\tGlobal index creation:\t%02d.%06d\n"
110
 
               ,iteration, diff.tv_sec, diff.tv_usec);
111
 
        
112
 
        timeval_subtract (&diff, &t8, &t7);
113
 
        printf ("gg\t%2d\tAll writes complete (w/ local index):\t%02d.%06d\n"
114
 
               ,iteration, diff.tv_sec, diff.tv_usec);
115
 
        
116
 
        timeval_subtract (&diff, &t11, &t0);
117
 
        printf ("hh\t%2d\tTotal time:\t%02d.%06d\n"
118
 
               ,iteration, diff.tv_sec, diff.tv_usec);
119
 
    }
120
 
    
121
 
    timeval_subtract (&diff, &t13, &t12);
122
 
    printf ("ii\t%2d\tLocal index creation:\t%6d\t%02d.%06d\n"
123
 
           ,iteration, md->rank, diff.tv_sec, diff.tv_usec);
124
 
    
125
 
    timeval_subtract (&diff, &t22, &t21);
126
 
    printf ("kk\t%2d\tshould buffer time:\t%6d\t%02d.%06d\n"
127
 
           ,iteration, md->rank, diff.tv_sec, diff.tv_usec);
128
 
    
129
 
    timeval_subtract (&diff, &t19, &t23);
130
 
    printf ("ll\t%2d\tclose startup time:\t%6d\t%02d.%06d\n"
131
 
           ,iteration, md->rank, diff.tv_sec, diff.tv_usec);
132
 
    
133
 
    timeval_subtract (&diff, &t19, &t0);
134
 
    printf ("mm\t%2d\tsetup time:\t%6d\t%02d.%06d\n"
135
 
           ,iteration, md->rank, diff.tv_sec, diff.tv_usec);
136
 
    
137
 
    timeval_subtract (&diff, &t14, &t20);
138
 
    printf ("nn\t%2d\tcleanup time:\t%6d\t%02d.%06d\n"
139
 
           ,iteration, md->rank, diff.tv_sec, diff.tv_usec);
140
 
    
141
 
    timeval_subtract (&diff, &t21, &t0);
142
 
    printf ("oo\t%2d\topen->should_buffer time:\t%6d\t%02d.%06d\n"
143
 
           ,iteration, md->rank, diff.tv_sec, diff.tv_usec);
144
 
    
145
 
    timeval_subtract (&diff, &t24, &t21);
146
 
    printf ("pp\t%2d\tshould_buffer->write1 time:\t%6d\t%02d.%06d\n"
147
 
           ,iteration, md->rank, diff.tv_sec, diff.tv_usec);
148
 
    
149
 
    timeval_subtract (&diff, &t25, &t24);
150
 
    printf ("qq1\t%2d\twrite1->write2 time:\t%6d\t%02d.%06d\n"
151
 
           ,iteration, md->rank, diff.tv_sec, diff.tv_usec);
152
 
    
153
 
    timeval_subtract (&diff, &t23, &t25);
154
 
    printf ("qq2\t%2d\twrite2->close start time:\t%6d\t%02d.%06d\n"
155
 
           ,iteration, md->rank, diff.tv_sec, diff.tv_usec);
156
 
}
157
 
#endif
158
 
 
159
 
#if defined(__APPLE__)
160
 
#       include <sys/param.h>
161
 
#       include <sys/mount.h>
162
 
#else
163
 
#       include <sys/statfs.h>
164
 
#endif
165
 
 
166
 
// this should be determined at configure time
167
 
//#define ADIOS_LUSTRE
168
 
 
169
 
//#ifdef ADIOS_LUSTRE
170
 
#include <sys/ioctl.h>
171
 
//#include <lustre/lustre_user.h>
172
 
//#endif
173
 
// from /usr/include/lustre/lustre_user.h
174
 
#define LUSTRE_SUPER_MAGIC 0x0BD00BD0
175
 
#  define LOV_USER_MAGIC 0x0BD10BD0
176
 
#  define LL_IOC_LOV_SETSTRIPE  _IOW ('f', 154, long)
177
 
#  define LL_IOC_LOV_GETSTRIPE  _IOW ('f', 155, long)
178
 
#define O_LOV_DELAY_CREATE 0100000000
179
 
 
180
 
struct lov_user_ost_data {           // per-stripe data structure
181
 
        uint64_t l_object_id;        // OST object ID
182
 
        uint64_t l_object_gr;        // OST object group (creating MDS number)
183
 
        uint32_t l_ost_gen;          // generation of this OST index
184
 
        uint32_t l_ost_idx;          // OST index in LOV
185
 
} __attribute__((packed));
186
 
struct lov_user_md {                 // LOV EA user data (host-endian)
187
 
        uint32_t lmm_magic;          // magic number = LOV_USER_MAGIC_V1
188
 
        uint32_t lmm_pattern;        // LOV_PATTERN_RAID0, LOV_PATTERN_RAID1
189
 
        uint64_t lmm_object_id;      // LOV object ID
190
 
        uint64_t lmm_object_gr;      // LOV object group
191
 
        uint32_t lmm_stripe_size;    // size of stripe in bytes
192
 
        uint16_t lmm_stripe_count;   // num stripes in use for this object
193
 
        uint16_t lmm_stripe_offset;  // starting stripe offset in lmm_objects
194
 
        struct lov_user_ost_data  lmm_objects[0]; // per-stripe data
195
 
} __attribute__((packed));
196
 
struct obd_uuid {
197
 
        char uuid[40];
198
 
};
199
 
 
200
 
static void trim_spaces (char * str)
201
 
{
202
 
    char * t = str, * p = NULL;
203
 
    while (*t != '\0')
204
 
    {
205
 
        if (*t == ' ')
206
 
        {
207
 
            p = t + 1;
208
 
            strcpy (t, p);
209
 
        }
210
 
        else
211
 
            t++;
212
 
    }
213
 
 
214
 
}
215
 
 
216
 
static void
217
 
adios_mpi_lustre_set_striping_unit(char *filename, char *parameters, struct adios_MPI_data_struct * md)
218
 
{
219
 
    MPI_File fh = md->fh;
220
 
    int nproc = md->size;
221
 
    struct statfs fsbuf;
222
 
    int err = 0, flag;
223
 
//    uint64_t striping_unit = 0;
224
 
    uint64_t block_unit = 0;
225
 
    uint16_t striping_count = 0;
226
 
    uint16_t stripe_offset = -1;
227
 
    char     value[64], *temp_string, *p_count,*p_size;
228
 
    MPI_Info info_used;
229
 
 
230
 
    int fd, old_mask, perm, num_ost, rc;
231
 
    struct lov_user_md lum;
232
 
    struct obd_uuid uuids[1024], * uuidp;
233
 
 
234
 
    old_mask = umask(022);
235
 
    umask(old_mask);
236
 
    perm = old_mask ^ 0666;
237
 
 
238
 
    fd =  open(filename, O_RDONLY | O_CREAT | O_LOV_DELAY_CREATE, perm);
239
 
    if (fd == -1)
240
 
        return;
241
 
 
242
 
#ifdef HAVE_LUSTRE
243
 
    // To get the number of ost's in the system
244
 
    num_ost = 1024;
245
 
    rc = llapi_lov_get_uuids(fd, uuids, &num_ost);
246
 
    if (rc != 0)
247
 
    {   
248
 
        fprintf (stderr, "get uuids failed: %s\n"
249
 
                       ,strerror(errno)
250
 
                );
251
 
    }
252
 
#else
253
 
    num_ost = 0;
254
 
#endif
255
 
 
256
 
    temp_string = (char *) malloc (strlen (parameters) + 1);
257
 
    strcpy (temp_string, parameters);
258
 
    trim_spaces (temp_string);
259
 
 
260
 
    if (p_count = strstr (temp_string, "stripe_count"))
261
 
    {
262
 
        char * p = strchr (p_count, '=');
263
 
        char * q = strtok (p, ",");
264
 
        if (!q)
265
 
            striping_count = atoi (q + 1);
266
 
        else
267
 
            striping_count = atoi (p + 1);
268
 
    }
269
 
    else
270
 
    {
271
 
#ifdef HAVE_LUSTRE
272
 
        striping_count = (nproc > num_ost ? -1 : nproc);
273
 
#else
274
 
        striping_count = 4;
275
 
#endif
276
 
    }
277
 
 
278
 
    strcpy (temp_string, parameters);
279
 
    trim_spaces (temp_string);
280
 
 
281
 
    if (p_size = strstr (temp_string, "stripe_size"))
282
 
    {
283
 
        char * p = strchr (p_size, '=');
284
 
        char * q = strtok (p, ",");
285
 
        if (!q)
286
 
            md->striping_unit = atoi(q + 1);
287
 
        else
288
 
            md->striping_unit = atoi(p + 1);
289
 
    }
290
 
    else
291
 
    {
292
 
        if (md->striping_unit <= 0)
293
 
            md->striping_unit = 1048576;
294
 
    }
295
 
 
296
 
    strcpy (temp_string, parameters);
297
 
    trim_spaces (temp_string);
298
 
 
299
 
    if (p_size = strstr (temp_string, "stripe_offset"))
300
 
    {
301
 
        char * p = strchr (p_size, '=');
302
 
        char * q = strtok (p, ",");
303
 
        if (!q)
304
 
            stripe_offset = atoi (q + 1);
305
 
        else
306
 
            stripe_offset = atoi (p + 1);
307
 
    }
308
 
    else
309
 
    {
310
 
        // let Lustre manage stripe offset
311
 
        stripe_offset = -1;
312
 
    }
313
 
 
314
 
    strcpy (temp_string, parameters);
315
 
    trim_spaces (temp_string);
316
 
 
317
 
    if (p_size = strstr (temp_string, "block_size"))
318
 
    {
319
 
        char * p = strchr (p_size, '=');
320
 
        char * q = strtok (p, ",");
321
 
        if (!q)
322
 
            block_unit = atoi(q + 1);
323
 
        else
324
 
            block_unit = atoi(p + 1);
325
 
    }
326
 
    else
327
 
    {
328
 
        // set block_unit to 0 to make one large write
329
 
        block_unit = 0;
330
 
    }
331
 
 
332
 
    free (temp_string);
333
 
 
334
 
    if (fd != -1) {
335
 
        struct lov_user_md lum;
336
 
        lum.lmm_magic = LOV_USER_MAGIC;
337
 
        lum.lmm_pattern = 0;
338
 
        lum.lmm_stripe_size = md->striping_unit;
339
 
        lum.lmm_stripe_count = striping_count;
340
 
        lum.lmm_stripe_offset = stripe_offset;
341
 
        ioctl (fd, LL_IOC_LOV_SETSTRIPE
342
 
              ,(void *) &lum
343
 
              );
344
 
 
345
 
        if (err == 0 && lum.lmm_stripe_size > 0) {
346
 
            md->striping_unit = lum.lmm_stripe_size;
347
 
        }
348
 
        close(fd);
349
 
    }
350
 
    else
351
 
        printf("Warning: open failed on file %s %s.\n",filename,strerror(errno));
352
 
//#endif
353
 
}
354
 
 
355
 
static void
356
 
adios_mpi_lustre_set_block_unit(uint64_t *block_unit, char *parameters)
357
 
{
358
 
    char *temp_string, *p_count,*p_size;
359
 
 
360
 
    temp_string = (char *) malloc (strlen (parameters) + 1);
361
 
    strcpy (temp_string, parameters);
362
 
    trim_spaces (temp_string);
363
 
 
364
 
    if (p_size = strstr (temp_string, "block_size"))
365
 
    {
366
 
        char * p = strchr (p_size, '=');
367
 
        char * q = strtok (p, ",");
368
 
        if (!q)
369
 
            *block_unit = atoi(q + 1);
370
 
        else
371
 
            *block_unit = atoi(p + 1);
372
 
    }
373
 
 
374
 
    if (*block_unit <= 0)
375
 
        *block_unit = 1048576;
376
 
 
377
 
    free (temp_string);
378
 
}
379
 
 
380
 
static int
381
 
adios_mpi_lustre_get_striping_unit(MPI_File fh, char *filename)
382
 
{
383
 
    struct statfs fsbuf;
384
 
    int err, flag;
385
 
    uint64_t striping_unit = 1048576;
386
 
    char     value[64];
387
 
    MPI_Info info_used;
388
 
 
389
 
#if COLLECT_METRICS
390
 
    gettimeofday (&t1, NULL);
391
 
#endif
392
 
 
393
 
    // get striping_unit from MPI hint if it has
394
 
    MPI_File_get_info(fh, &info_used);
395
 
    MPI_Info_get(info_used, "striping_unit", 63, value, &flag);
396
 
    MPI_Info_free(&info_used);
397
 
 
398
 
    if (flag) return atoi(value);
399
 
 
400
 
    // if striping_unit is not set in MPI file info, get it from system
401
 
    err = statfs(filename, &fsbuf);
402
 
    if (err == -1) {
403
 
        printf("Warning: statfs failed %s %s.\n",filename,strerror(errno));
404
 
        return striping_unit;
405
 
    }
406
 
 
407
 
    if (!err && fsbuf.f_type == LUSTRE_SUPER_MAGIC) {
408
 
        int fd, old_mask, perm;
409
 
 
410
 
        old_mask = umask(022);
411
 
        umask(old_mask);
412
 
        perm = old_mask ^ 0666;
413
 
 
414
 
        fd =  open(filename, O_RDONLY, perm);
415
 
        if (fd != -1) {
416
 
            struct lov_user_md lum;
417
 
            memset (&lum, 0, sizeof(struct lov_user_md));
418
 
            lum.lmm_magic = LOV_USER_MAGIC;
419
 
            err = ioctl(fd, LL_IOC_LOV_GETSTRIPE, (void *) &lum);
420
 
            if (err == 0 && lum.lmm_stripe_size > 0) {
421
 
                striping_unit = lum.lmm_stripe_size;
422
 
            }
423
 
            close(fd);
424
 
        }
425
 
        else
426
 
            printf("Warning: open failed on file %s %s.\n",filename,strerror(errno));
427
 
    }
428
 
 
429
 
#if COLLECT_METRICS         
430
 
    gettimeofday (&t2, NULL);
431
 
#endif
432
 
    // set the file striping size
433
 
    return striping_unit;
434
 
}
435
 
 
436
 
static uint64_t
437
 
adios_mpi_lustre_striping_unit_write(MPI_File    fh,
438
 
                              MPI_Offset  offset,
439
 
                              void        *buf,
440
 
                              uint64_t    len,
441
 
                              uint64_t    block_unit 
442
 
                              )
443
 
{
444
 
    uint64_t err = -1;
445
 
    MPI_Status status;
446
 
 
447
 
    if (len == 0) return 0;
448
 
 
449
 
    if (offset == -1) // use current position
450
 
        MPI_File_get_position(fh, &offset);
451
 
    else
452
 
        MPI_File_seek (fh, offset, MPI_SEEK_SET);
453
 
 
454
 
    if (block_unit > 0) {
455
 
        MPI_Offset  rem_off = offset;
456
 
        uint64_t    rem_size = len;
457
 
        char       *buf_ptr = buf;
458
 
 
459
 
        err = 0;
460
 
        while (rem_size > 0) {
461
 
            uint64_t rem_unit  = block_unit - rem_off % block_unit;
462
 
            int write_len = (rem_unit < rem_size) ? rem_unit : rem_size;
463
 
            int ret_len;
464
 
 
465
 
#ifdef _WKL_CHECK_STRIPE_IO
466
 
printf("adios_mpi_lustre_striping_unit_write offset=%12lld len=%12d\n",offset,write_len);offset+=write_len;
467
 
#endif
468
 
            MPI_File_write (fh, buf_ptr, write_len, MPI_BYTE, &status);
469
 
            MPI_Get_count(&status, MPI_BYTE, &ret_len);
470
 
            if (ret_len < 0) {err = ret_len; break;}
471
 
            err += ret_len;
472
 
            if (ret_len != write_len) break;
473
 
            buf_ptr  += write_len;
474
 
            rem_off  += write_len;
475
 
            rem_size -= write_len;
476
 
        }
477
 
    }
478
 
    else {
479
 
        uint64_t total_written = 0;
480
 
        uint64_t to_write = len;
481
 
        int write_len = 0;
482
 
        int count;
483
 
        char * buf_ptr = buf;
484
 
        while (total_written < len)
485
 
        {
486
 
            write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
487
 
            MPI_File_write (fh, buf_ptr, write_len, MPI_BYTE, &status);
488
 
            MPI_Get_count(&status, MPI_BYTE, &count);
489
 
            if (count != write_len)
490
 
            {
491
 
                err = count;
492
 
                break;
493
 
            }
494
 
            total_written += count;
495
 
            buf_ptr += count;
496
 
            to_write -= count;
497
 
            err = total_written;
498
 
        }
499
 
    }
500
 
    return err;
501
 
}
502
 
 
503
 
static void adios_var_to_comm (const char * comm_name
504
 
                              ,enum ADIOS_FLAG host_language_fortran
505
 
                              ,void * data
506
 
                              ,MPI_Comm * comm
507
 
                              )
508
 
{
509
 
    if (data)
510
 
    {
511
 
        int t = *(int *) data;
512
 
 
513
 
        if (!comm_name)
514
 
        {
515
 
            if (!t)
516
 
            {
517
 
                fprintf (stderr, "communicator not provided and none "
518
 
                                 "listed in XML.  Defaulting to "
519
 
                                 "MPI_COMM_SELF\n"
520
 
                        );
521
 
 
522
 
                *comm = MPI_COMM_SELF;
523
 
            }
524
 
            else
525
 
            {
526
 
                if (host_language_fortran == adios_flag_yes)
527
 
                {
528
 
                    *comm = MPI_Comm_f2c (t);
529
 
                }
530
 
                else
531
 
                {
532
 
                    *comm = *(MPI_Comm *) data;
533
 
                }
534
 
            }
535
 
        }
536
 
        else
537
 
        {
538
 
            if (!strcmp (comm_name, ""))
539
 
            {
540
 
                if (!t)
541
 
                {
542
 
                    fprintf (stderr, "communicator not provided and none "
543
 
                                     "listed in XML.  Defaulting to "
544
 
                                     "MPI_COMM_SELF\n"
545
 
                            );
546
 
 
547
 
                    *comm = MPI_COMM_SELF;
548
 
                }
549
 
                else
550
 
                {
551
 
                    if (host_language_fortran == adios_flag_yes)
552
 
                    {
553
 
                        *comm = MPI_Comm_f2c (t);
554
 
                    }
555
 
                    else
556
 
                    {
557
 
                        *comm = *(MPI_Comm *) data;
558
 
                    }
559
 
                }
560
 
            }
561
 
            else
562
 
            {
563
 
                if (!t)
564
 
                {
565
 
                    fprintf (stderr, "communicator not provided but one "
566
 
                                     "listed in XML.  Defaulting to "
567
 
                                     "MPI_COMM_WORLD\n"
568
 
                            );
569
 
 
570
 
                    *comm = MPI_COMM_WORLD;
571
 
                }
572
 
                else
573
 
                {
574
 
                    if (host_language_fortran == adios_flag_yes)
575
 
                    {
576
 
                        *comm = MPI_Comm_f2c (t);
577
 
                    }
578
 
                    else
579
 
                    {
580
 
                        *comm = *(MPI_Comm *) data;
581
 
                    }
582
 
                }
583
 
            }
584
 
        }
585
 
    }
586
 
    else
587
 
    {
588
 
        fprintf (stderr, "coordination-communication not provided. "
589
 
                         "Using MPI_COMM_WORLD instead\n"
590
 
                );
591
 
 
592
 
        *comm = MPI_COMM_WORLD;
593
 
    }
594
 
}
595
 
 
596
 
void adios_mpi_lustre_init (const char * parameters
597
 
                    ,struct adios_method_struct * method
598
 
                    )
599
 
{
600
 
    struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
601
 
                                                    method->method_data;
602
 
    if (!adios_mpi_lustre_initialized)
603
 
    {
604
 
        adios_mpi_lustre_initialized = 1;
605
 
    }
606
 
    method->method_data = malloc (sizeof (struct adios_MPI_data_struct));
607
 
    md = (struct adios_MPI_data_struct *) method->method_data;
608
 
    md->fh = 0;
609
 
    md->req = 0;
610
 
    memset (&md->status, 0, sizeof (MPI_Status));
611
 
    md->rank = 0;
612
 
    md->size = 0;
613
 
    md->group_comm = MPI_COMM_NULL;
614
 
    md->old_pg_root = 0;
615
 
    md->old_vars_root = 0;
616
 
    md->old_attrs_root = 0;
617
 
    md->vars_start = 0;
618
 
    md->vars_header_size = 0;
619
 
    md->striping_unit = 0;
620
 
    md->block_unit = 0;
621
 
 
622
 
    adios_buffer_struct_init (&md->b);
623
 
}
624
 
 
625
 
int adios_mpi_lustre_open (struct adios_file_struct * fd
626
 
                   ,struct adios_method_struct * method, void * comm
627
 
                   )
628
 
{
629
 
    struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
630
 
                                                    method->method_data;
631
 
 
632
 
#if COLLECT_METRICS
633
 
    gettimeofday (&t0, NULL); // only used on rank == size - 1, but we don't
634
 
                              // have the comm yet to get the rank/size
635
 
#endif
636
 
    // we have to wait for the group_size (should_buffer) to get the comm
637
 
    // before we can do an open for any of the modes
638
 
    md->comm = comm;
639
 
 
640
 
    return 1;
641
 
}
642
 
 
643
 
static
644
 
void build_offsets (struct adios_bp_buffer_struct_v1 * b
645
 
                   ,MPI_Offset * offsets, uint64_t size, char * group_name
646
 
                   ,struct adios_index_process_group_struct_v1 * pg_root
647
 
                   )
648
 
{
649
 
    while (pg_root)
650
 
    {
651
 
        if (!strcasecmp (pg_root->group_name, group_name))
652
 
        {
653
 
            MPI_Offset size = 0;
654
 
 
655
 
            if (pg_root->next)
656
 
            {
657
 
                size = pg_root->next->offset_in_file - pg_root->offset_in_file;
658
 
            }
659
 
            else
660
 
            {
661
 
                size = b->pg_index_offset - pg_root->offset_in_file;
662
 
            }
663
 
 
664
 
            offsets [pg_root->process_id * 3] = pg_root->offset_in_file;
665
 
            offsets [pg_root->process_id * 3 + 1] = size;
666
 
            offsets [pg_root->process_id * 3 + 2] = b->version;
667
 
        }
668
 
 
669
 
        pg_root = pg_root->next;
670
 
    }
671
 
}
672
 
 
673
 
enum ADIOS_FLAG adios_mpi_lustre_should_buffer (struct adios_file_struct * fd
674
 
                                        ,struct adios_method_struct * method
675
 
                                        )
676
 
{
677
 
    int i;
678
 
    struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
679
 
                                                      method->method_data;
680
 
    char * name;
681
 
    int err;
682
 
    int flag;    // used for coordinating the MPI_File_open
683
 
 
684
 
    int previous;
685
 
    int current;
686
 
    int next;
687
 
 
688
 
#if COLLECT_METRICS
689
 
    gettimeofday (&t21, NULL);
690
 
#endif
691
 
 
692
 
    name = malloc (strlen (method->base_path) + strlen (fd->name) + 1);
693
 
    sprintf (name, "%s%s", method->base_path, fd->name);
694
 
 
695
 
    adios_var_to_comm (fd->group->group_comm
696
 
                      ,fd->group->adios_host_language_fortran
697
 
                      ,md->comm
698
 
                      ,&md->group_comm
699
 
                      );
700
 
    if (md->group_comm != MPI_COMM_NULL)
701
 
    {
702
 
        MPI_Comm_rank (md->group_comm, &md->rank);
703
 
        MPI_Comm_size (md->group_comm, &md->size);
704
 
    }
705
 
    fd->group->process_id = md->rank;
706
 
 
707
 
    if (md->rank == md->size - 1)
708
 
        next = -1;
709
 
    else
710
 
        next = md->rank + 1;
711
 
    previous = md->rank - 1;
712
 
    current = md->rank;
713
 
 
714
 
    fd->base_offset = 0;
715
 
 
716
 
#define LUSTRE_STRIPE_UNIT 65536
717
 
 
718
 
    switch (fd->mode)
719
 
    {
720
 
        case adios_mode_read:
721
 
        {
722
 
            if (md->group_comm == MPI_COMM_NULL || md->rank == 0)
723
 
            {
724
 
                err = MPI_File_open (MPI_COMM_SELF, name, MPI_MODE_RDONLY
725
 
                                    ,MPI_INFO_NULL, &md->fh
726
 
                                    );
727
 
                if (err != MPI_SUCCESS)
728
 
                {
729
 
                    char e [MPI_MAX_ERROR_STRING];
730
 
                    int len = 0;
731
 
                    memset (e, 0, MPI_MAX_ERROR_STRING);
732
 
                    MPI_Error_string (err, e, &len);
733
 
                    fprintf (stderr, "MPI open read failed for %s: '%s'\n"
734
 
                            ,name, e
735
 
                            );
736
 
                    free (name);
737
 
 
738
 
                    return adios_flag_no;
739
 
                }
740
 
 
741
 
                MPI_Offset file_size;
742
 
                MPI_File_get_size (md->fh, &file_size);
743
 
                md->b.file_size = file_size;
744
 
 
745
 
                adios_init_buffer_read_version (&md->b);
746
 
                MPI_File_seek (md->fh, md->b.file_size - md->b.length
747
 
                              ,MPI_SEEK_SET
748
 
                              );
749
 
                MPI_File_read (md->fh, md->b.buff, md->b.length, MPI_BYTE
750
 
                              ,&md->status
751
 
                              );
752
 
                adios_parse_version (&md->b, &md->b.version);
753
 
 
754
 
                adios_init_buffer_read_index_offsets (&md->b);
755
 
                // already in the buffer
756
 
                adios_parse_index_offsets_v1 (&md->b);
757
 
 
758
 
                adios_init_buffer_read_process_group_index (&md->b);
759
 
                MPI_File_seek (md->fh, md->b.pg_index_offset
760
 
                              ,MPI_SEEK_SET
761
 
                              );
762
 
                MPI_File_read (md->fh, md->b.buff, md->b.pg_size, MPI_BYTE
763
 
                              ,&md->status
764
 
                              );
765
 
                adios_parse_process_group_index_v1 (&md->b
766
 
                                                   ,&md->old_pg_root
767
 
                                                   );
768
 
 
769
 
#if 1
770
 
                adios_init_buffer_read_vars_index (&md->b);
771
 
                MPI_File_seek (md->fh, md->b.vars_index_offset
772
 
                              ,MPI_SEEK_SET
773
 
                              );
774
 
                MPI_File_read (md->fh, md->b.buff, md->b.vars_size, MPI_BYTE
775
 
                              ,&md->status
776
 
                              );
777
 
                adios_parse_vars_index_v1 (&md->b, &md->old_vars_root);
778
 
 
779
 
                adios_init_buffer_read_attributes_index (&md->b);
780
 
                MPI_File_seek (md->fh, md->b.attrs_index_offset
781
 
                              ,MPI_SEEK_SET
782
 
                              );
783
 
                MPI_File_read (md->fh, md->b.buff, md->b.attrs_size, MPI_BYTE
784
 
                              ,&md->status
785
 
                              );
786
 
                adios_parse_attributes_index_v1 (&md->b, &md->old_attrs_root);
787
 
#endif
788
 
 
789
 
                fd->base_offset = md->b.end_of_pgs;
790
 
            }
791
 
 
792
 
            if (   md->group_comm != MPI_COMM_NULL
793
 
                && md->group_comm != MPI_COMM_SELF
794
 
               )
795
 
            {
796
 
                if (md->rank == 0)
797
 
                {
798
 
                    MPI_Offset * offsets = malloc (  sizeof (MPI_Offset)
799
 
                                                   * md->size * 3
800
 
                                                  );
801
 
                    memset (offsets, 0, sizeof (MPI_Offset) * md->size * 3);
802
 
 
803
 
                    // go through the pg index to build the offsets array
804
 
                    build_offsets (&md->b, offsets, md->size
805
 
                                  ,fd->group->name, md->old_pg_root
806
 
                                  );
807
 
                    MPI_Scatter (offsets, 3, MPI_LONG_LONG
808
 
                                ,MPI_IN_PLACE, 3, MPI_LONG_LONG
809
 
                                ,0, md->group_comm
810
 
                                );
811
 
                    md->b.read_pg_offset = offsets [0];
812
 
                    md->b.read_pg_size = offsets [1];
813
 
                    free (offsets);
814
 
                }
815
 
                else
816
 
                {
817
 
                    MPI_Offset offset [3];
818
 
                    offset [0] = offset [1] = offset [2] = 0;
819
 
 
820
 
                    MPI_Scatter (0, 0, 0
821
 
                                ,offset, 3, MPI_LONG_LONG
822
 
                                ,0, md->group_comm
823
 
                                );
824
 
 
825
 
                    md->b.read_pg_offset = offset [0];
826
 
                    md->b.read_pg_size = offset [1];
827
 
                    md->b.version = offset [2];
828
 
                }
829
 
            }
830
 
 
831
 
            // cascade the opens to avoid trashing the metadata server
832
 
            if (previous == -1)
833
 
            {
834
 
                // note rank 0 is already open
835
 
                // don't open it again here
836
 
 
837
 
                if (next != -1)
838
 
                {
839
 
                    MPI_Isend (&flag, 1, MPI_INT, next, current
840
 
                              ,md->group_comm, &md->req
841
 
                              );
842
 
                }
843
 
            }
844
 
            else
845
 
            {
846
 
                MPI_Recv (&flag, 1, MPI_INT, previous, previous
847
 
                         ,md->group_comm, &md->status
848
 
                         );
849
 
                if (next != -1)
850
 
                {
851
 
                    MPI_Isend (&flag, 1, MPI_INT, next, current
852
 
                              ,md->group_comm, &md->req
853
 
                              );
854
 
                }
855
 
                err = MPI_File_open (MPI_COMM_SELF, name
856
 
                                    ,MPI_MODE_RDONLY
857
 
                                    ,MPI_INFO_NULL
858
 
                                    ,&md->fh
859
 
                                    );
860
 
            }
861
 
 
862
 
            if (err != MPI_SUCCESS)
863
 
            {
864
 
                char e [MPI_MAX_ERROR_STRING];
865
 
                int len = 0;
866
 
                memset (e, 0, MPI_MAX_ERROR_STRING);
867
 
                MPI_Error_string (err, e, &len);
868
 
                fprintf (stderr, "MPI open write failed for %s: '%s'\n"
869
 
                        ,name, e
870
 
                        );
871
 
                free (name);
872
 
 
873
 
                return adios_flag_no;
874
 
            }
875
 
 
876
 
            break;
877
 
        }
878
 
 
879
 
        case adios_mode_write:
880
 
        {
881
 
            fd->base_offset = 0;
882
 
            fd->pg_start_in_file = 0;
883
 
#if COLLECT_METRICS                     
884
 
            gettimeofday (&t16, NULL);
885
 
#endif
886
 
 
887
 
            if (md->group_comm != MPI_COMM_NULL)
888
 
            {
889
 
                if (md->rank == 0)
890
 
                {
891
 
                    MPI_Offset * offsets = malloc (  sizeof (MPI_Offset)
892
 
                                                   * md->size
893
 
                                                  );
894
 
 
895
 
                    // round up to LUSTRE_STRIPE_UNIT (64KB)
896
 
                    if (fd->write_size_bytes % LUSTRE_STRIPE_UNIT)
897
 
                        offsets [0] =  (fd->write_size_bytes / LUSTRE_STRIPE_UNIT + 1)
898
 
                                     * LUSTRE_STRIPE_UNIT;
899
 
                    else
900
 
                        offsets [0] = fd->write_size_bytes;
901
 
 
902
 
                    MPI_Gather (MPI_IN_PLACE, 1, MPI_LONG_LONG
903
 
                               ,offsets, 1, MPI_LONG_LONG
904
 
                               ,0, md->group_comm
905
 
                               );
906
 
 
907
 
                    uint64_t last_offset = offsets [0];
908
 
                    offsets [0] = fd->base_offset;
909
 
                    for (i = 1; i < md->size; i++)
910
 
                    {
911
 
                        uint64_t this_offset = offsets [i];
912
 
                        offsets [i] = offsets [i - 1] + last_offset;
913
 
                        last_offset = this_offset;
914
 
                    }
915
 
                    // How to handle that each processor has varying amount of data??
916
 
                    md->striping_unit = offsets[1] - offsets[0];
917
 
                    if (md->striping_unit > 4 * 1024 * 1024 * 1024L)
918
 
                    {
919
 
                        md->striping_unit = 4 * 1024 * 1024 * 1024L;
920
 
                    }
921
 
 
922
 
                    md->b.pg_index_offset =   offsets [md->size - 1]
923
 
                                            + last_offset;
924
 
                    MPI_Scatter (offsets, 1, MPI_LONG_LONG
925
 
                                ,MPI_IN_PLACE, 1, MPI_LONG_LONG
926
 
                                ,0, md->group_comm
927
 
                                );
928
 
                    fd->base_offset = offsets [0];
929
 
                    fd->pg_start_in_file = fd->base_offset;
930
 
                    free (offsets);
931
 
                }
932
 
                else
933
 
                {
934
 
                    MPI_Offset offset;
935
 
                    if (fd->write_size_bytes % LUSTRE_STRIPE_UNIT)
936
 
                        offset =  (fd->write_size_bytes / LUSTRE_STRIPE_UNIT + 1)
937
 
                                  * LUSTRE_STRIPE_UNIT;
938
 
                    else
939
 
                        offset = fd->write_size_bytes;
940
 
 
941
 
                    MPI_Gather (&offset, 1, MPI_LONG_LONG
942
 
                               ,0, 0, 0
943
 
                               ,0, md->group_comm
944
 
                               );
945
 
 
946
 
                    MPI_Scatter (0, 0, 0
947
 
                                ,&offset, 1, MPI_LONG_LONG
948
 
                                ,0, md->group_comm
949
 
                                );
950
 
                    fd->base_offset = offset;
951
 
                    fd->pg_start_in_file = fd->base_offset;
952
 
                }
953
 
            }
954
 
            else
955
 
            {
956
 
                md->b.pg_index_offset = fd->write_size_bytes;
957
 
            }
958
 
 
959
 
#if COLLECT_METRICS
960
 
            gettimeofday (&t6, NULL);
961
 
#endif
962
 
            // cascade the opens to avoid trashing the metadata server
963
 
            if (previous == -1)
964
 
            {
965
 
                unlink (name);  // make sure clean
966
 
 
967
 
                if (method->parameters)
968
 
                {
969
 
                    adios_mpi_lustre_set_striping_unit (name
970
 
                                                       ,method->parameters
971
 
                                                       ,md);
972
 
                }
973
 
                adios_mpi_lustre_set_block_unit (&md->block_unit, method->parameters);
974
 
 
975
 
                err = MPI_File_open (MPI_COMM_SELF, name
976
 
                                    ,MPI_MODE_WRONLY | MPI_MODE_CREATE
977
 
                                    ,MPI_INFO_NULL
978
 
                                    ,&md->fh
979
 
                                    );
980
 
 
981
 
                md->striping_unit = adios_mpi_lustre_get_striping_unit(md->fh, name);
982
 
 
983
 
                if (next != -1)
984
 
                {
985
 
                    MPI_Isend (&flag, 1, MPI_INT, next, current
986
 
                              ,md->group_comm, &md->req
987
 
                              );
988
 
                }
989
 
            }
990
 
            else
991
 
            {
992
 
                MPI_Recv (&flag, 1, MPI_INT, previous, previous
993
 
                         ,md->group_comm, &md->status
994
 
                         );
995
 
                if (next != -1)
996
 
                {
997
 
                    MPI_Isend (&flag, 1, MPI_INT, next, current
998
 
                              ,md->group_comm, &md->req
999
 
                              );
1000
 
                }
1001
 
 
1002
 
                adios_mpi_lustre_set_block_unit (&md->block_unit, method->parameters);
1003
 
                err = MPI_File_open (MPI_COMM_SELF, name
1004
 
                                    ,MPI_MODE_WRONLY
1005
 
                                    ,MPI_INFO_NULL
1006
 
                                    ,&md->fh
1007
 
                                    );
1008
 
                md->striping_unit = adios_mpi_lustre_get_striping_unit(md->fh, name);
1009
 
            }
1010
 
 
1011
 
            if (err != MPI_SUCCESS)
1012
 
            {
1013
 
                char e [MPI_MAX_ERROR_STRING];
1014
 
                int len = 0;
1015
 
                memset (e, 0, MPI_MAX_ERROR_STRING);
1016
 
                MPI_Error_string (err, e, &len);
1017
 
                fprintf (stderr, "MPI open write failed for %s: '%s'\n"
1018
 
                        ,name, e
1019
 
                        );
1020
 
                free (name);
1021
 
 
1022
 
                return adios_flag_no;
1023
 
            }
1024
 
 
1025
 
            break;
1026
 
        }
1027
 
 
1028
 
        case adios_mode_append:
1029
 
        {
1030
 
            int old_file = 1;
1031
 
            adios_buffer_struct_clear (&md->b);
1032
 
 
1033
 
            err = MPI_File_open (MPI_COMM_SELF, name, MPI_MODE_RDONLY
1034
 
                                ,MPI_INFO_NULL, &md->fh
1035
 
                                );
1036
 
 
1037
 
            if (err != MPI_SUCCESS)
1038
 
            {
1039
 
                old_file = 0;
1040
 
                err = MPI_File_open (MPI_COMM_SELF, name
1041
 
                                    ,MPI_MODE_WRONLY | MPI_MODE_CREATE
1042
 
                                    ,MPI_INFO_NULL, &md->fh
1043
 
                                    );
1044
 
 
1045
 
                if (err != MPI_SUCCESS)
1046
 
                {
1047
 
                    char e [MPI_MAX_ERROR_STRING];
1048
 
                    int len = 0;
1049
 
                    memset (e, 0, MPI_MAX_ERROR_STRING);
1050
 
                    MPI_Error_string (err, e, &len);
1051
 
                    fprintf (stderr, "MPI open write failed for %s: '%s'\n"
1052
 
                            ,name, e
1053
 
                            );
1054
 
                    free (name);
1055
 
 
1056
 
                    return adios_flag_no;
1057
 
                }
1058
 
                md->striping_unit = adios_mpi_lustre_get_striping_unit(md->fh, name);
1059
 
            }
1060
 
 
1061
 
            if (old_file)
1062
 
            {
1063
 
                if (md->group_comm == MPI_COMM_NULL || md->rank == 0)
1064
 
                {
1065
 
                    if (err != MPI_SUCCESS)
1066
 
                    {
1067
 
                        md->b.file_size = 0;
1068
 
                    }
1069
 
                    else
1070
 
                    {
1071
 
                        MPI_Offset file_size;
1072
 
                        MPI_File_get_size (md->fh, &file_size);
1073
 
                        md->b.file_size = file_size;
1074
 
                    }
1075
 
 
1076
 
                    adios_init_buffer_read_version (&md->b);
1077
 
                    MPI_File_seek (md->fh, md->b.file_size - md->b.length
1078
 
                                  ,MPI_SEEK_SET
1079
 
                                  );
1080
 
                    MPI_File_read (md->fh, md->b.buff, md->b.length, MPI_BYTE
1081
 
                                  ,&md->status
1082
 
                                  );
1083
 
                    adios_parse_version (&md->b, &md->b.version);
1084
 
 
1085
 
                    adios_init_buffer_read_index_offsets (&md->b);
1086
 
                    // already in the buffer
1087
 
                    adios_parse_index_offsets_v1 (&md->b);
1088
 
 
1089
 
                    adios_init_buffer_read_process_group_index (&md->b);
1090
 
                    MPI_File_seek (md->fh, md->b.pg_index_offset
1091
 
                                  ,MPI_SEEK_SET
1092
 
                                  );
1093
 
                    MPI_File_read (md->fh, md->b.buff, md->b.pg_size, MPI_BYTE
1094
 
                                  ,&md->status
1095
 
                                  );
1096
 
                    adios_parse_process_group_index_v1 (&md->b
1097
 
                                                       ,&md->old_pg_root
1098
 
                                                       );
1099
 
 
1100
 
                    adios_init_buffer_read_vars_index (&md->b);
1101
 
                    MPI_File_seek (md->fh, md->b.vars_index_offset
1102
 
                                  ,MPI_SEEK_SET
1103
 
                                  );
1104
 
                    MPI_File_read (md->fh, md->b.buff, md->b.vars_size, MPI_BYTE
1105
 
                                  ,&md->status
1106
 
                                  );
1107
 
                    adios_parse_vars_index_v1 (&md->b, &md->old_vars_root);
1108
 
 
1109
 
                    adios_init_buffer_read_attributes_index (&md->b);
1110
 
                    MPI_File_seek (md->fh, md->b.attrs_index_offset
1111
 
                                  ,MPI_SEEK_SET
1112
 
                                  );
1113
 
                    MPI_File_read (md->fh, md->b.buff, md->b.attrs_size
1114
 
                                  ,MPI_BYTE, &md->status
1115
 
                                  );
1116
 
                    adios_parse_attributes_index_v1 (&md->b
1117
 
                                                    ,&md->old_attrs_root
1118
 
                                                    );
1119
 
 
1120
 
                    fd->base_offset = md->b.end_of_pgs;
1121
 
                    fd->pg_start_in_file = fd->base_offset;
1122
 
                }
1123
 
                else
1124
 
                {
1125
 
                    fd->base_offset = 0;
1126
 
                    fd->pg_start_in_file = 0;
1127
 
                }
1128
 
 
1129
 
                MPI_File_close (&md->fh);
1130
 
            }
1131
 
            else
1132
 
            {
1133
 
                fd->base_offset = 0;
1134
 
                fd->pg_start_in_file = 0;
1135
 
            }
1136
 
#if 0
1137
 
            // cascade the opens to avoid trashing the metadata server
1138
 
            if (previous == -1)
1139
 
            {
1140
 
                // we know it exists, because we created it if it didn't
1141
 
                // when reading the old file so can just open wronly
1142
 
                // but adding the create for consistency with write mode
1143
 
                // so it is easier to merge write/append later
1144
 
                err = MPI_File_open (MPI_COMM_SELF, name
1145
 
                                    ,MPI_MODE_WRONLY | MPI_MODE_CREATE
1146
 
                                    ,MPI_INFO_NULL
1147
 
                                    ,&md->fh
1148
 
                                    );
1149
 
                md->striping_unit = adios_mpi_lustre_get_striping_unit(md->fh, name);
1150
 
                if (next != -1)
1151
 
                {
1152
 
                    MPI_Isend (&flag, 1, MPI_INT, next, current
1153
 
                              ,md->group_comm, &md->req
1154
 
                              );
1155
 
                }
1156
 
            }
1157
 
            else
1158
 
            {
1159
 
                MPI_Recv (&flag, 1, MPI_INT, previous, previous
1160
 
                         ,md->group_comm, &md->status
1161
 
                         );
1162
 
                if (next != -1)
1163
 
                {
1164
 
                    MPI_Isend (&flag, 1, MPI_INT, next, current
1165
 
                              ,md->group_comm, &md->req
1166
 
                              );
1167
 
                }
1168
 
                err = MPI_File_open (MPI_COMM_SELF, name
1169
 
                                    ,MPI_MODE_WRONLY
1170
 
                                    ,MPI_INFO_NULL
1171
 
                                    ,&md->fh
1172
 
                                    );
1173
 
                md->striping_unit = adios_mpi_lustre_get_striping_unit(md->fh, name);
1174
 
            }
1175
 
 
1176
 
            if (err != MPI_SUCCESS)
1177
 
            {
1178
 
                char e [MPI_MAX_ERROR_STRING];
1179
 
                int len = 0;
1180
 
                memset (e, 0, MPI_MAX_ERROR_STRING);
1181
 
                MPI_Error_string (err, e, &len);
1182
 
                fprintf (stderr, "MPI open write failed for %s: '%s'\n"
1183
 
                        ,name, e
1184
 
                        );
1185
 
                free (name);
1186
 
 
1187
 
                return adios_flag_no;
1188
 
            }
1189
 
#endif
1190
 
            if (md->group_comm != MPI_COMM_NULL)
1191
 
            {
1192
 
                if (md->rank == 0)
1193
 
                {
1194
 
                    MPI_Offset * offsets = malloc (  sizeof (MPI_Offset)
1195
 
                                                   * md->size
1196
 
                                                  );
1197
 
 
1198
 
                    if (fd->write_size_bytes % LUSTRE_STRIPE_UNIT)
1199
 
                        offsets [0] =  (fd->write_size_bytes / LUSTRE_STRIPE_UNIT + 1)
1200
 
                                     * LUSTRE_STRIPE_UNIT;
1201
 
                    else
1202
 
                        offsets [0] = fd->write_size_bytes;
1203
 
 
1204
 
                    MPI_Gather (MPI_IN_PLACE, 1, MPI_LONG_LONG
1205
 
                               ,offsets, 1, MPI_LONG_LONG
1206
 
                               ,0, md->group_comm
1207
 
                               );
1208
 
 
1209
 
                    uint64_t last_offset = offsets [0];
1210
 
                    offsets [0] = fd->base_offset;
1211
 
                    for (i = 1; i < md->size; i++)
1212
 
                    {
1213
 
                        uint64_t this_offset = offsets [i];
1214
 
                        offsets [i] = offsets [i - 1] + last_offset;
1215
 
                        last_offset = this_offset;
1216
 
                    }
1217
 
                    md->b.pg_index_offset =   offsets [md->size - 1]
1218
 
                                            + last_offset;
1219
 
                    MPI_Scatter (offsets, 1, MPI_LONG_LONG
1220
 
                                ,MPI_IN_PLACE, 1, MPI_LONG_LONG
1221
 
                                ,0, md->group_comm
1222
 
                                );
1223
 
                    fd->base_offset = offsets [0];
1224
 
                    fd->pg_start_in_file = fd->base_offset;
1225
 
                    free (offsets);
1226
 
                }
1227
 
                else
1228
 
                {
1229
 
                    MPI_Offset offset;
1230
 
                    if (fd->write_size_bytes % LUSTRE_STRIPE_UNIT)
1231
 
                        offset =  (fd->write_size_bytes / LUSTRE_STRIPE_UNIT + 1)
1232
 
                                     * LUSTRE_STRIPE_UNIT;
1233
 
                    else
1234
 
                        offset = fd->write_size_bytes;
1235
 
 
1236
 
 
1237
 
                    MPI_Gather (&offset, 1, MPI_LONG_LONG
1238
 
                               ,0, 0, 0
1239
 
                               ,0, md->group_comm
1240
 
                               );
1241
 
 
1242
 
                    MPI_Scatter (0, 0, 0
1243
 
                                ,&offset, 1, MPI_LONG_LONG
1244
 
                                ,0, md->group_comm
1245
 
                                );
1246
 
                    fd->base_offset = offset;
1247
 
                    fd->pg_start_in_file = fd->base_offset;
1248
 
                }
1249
 
            }
1250
 
            else
1251
 
            {
1252
 
                md->b.pg_index_offset = fd->write_size_bytes;
1253
 
            }
1254
 
 
1255
 
            // cascade the opens to avoid trashing the metadata server
1256
 
            if (previous == -1)
1257
 
            {   
1258
 
                // we know it exists, because we created it if it didn't
1259
 
                // when reading the old file so can just open wronly
1260
 
                // but adding the create for consistency with write mode
1261
 
                // so it is easier to merge write/append later
1262
 
                err = MPI_File_open (MPI_COMM_SELF, name
1263
 
                                    ,MPI_MODE_WRONLY | MPI_MODE_CREATE
1264
 
                                    ,MPI_INFO_NULL
1265
 
                                    ,&md->fh
1266
 
                                    );
1267
 
                md->striping_unit = adios_mpi_lustre_get_striping_unit(md->fh, name);
1268
 
                if (next != -1)
1269
 
                {   
1270
 
                    MPI_Isend (&flag, 1, MPI_INT, next, current
1271
 
                              ,md->group_comm, &md->req
1272
 
                              );
1273
 
                }
1274
 
            }
1275
 
            else
1276
 
            {   
1277
 
                MPI_Recv (&flag, 1, MPI_INT, previous, previous
1278
 
                         ,md->group_comm, &md->status
1279
 
                         );
1280
 
                if (next != -1)
1281
 
                {   
1282
 
                    MPI_Isend (&flag, 1, MPI_INT, next, current
1283
 
                              ,md->group_comm, &md->req
1284
 
                              );
1285
 
                }
1286
 
                err = MPI_File_open (MPI_COMM_SELF, name
1287
 
                                    ,MPI_MODE_WRONLY
1288
 
                                    ,MPI_INFO_NULL
1289
 
                                    ,&md->fh
1290
 
                                    );
1291
 
                md->striping_unit = adios_mpi_lustre_get_striping_unit(md->fh, name);
1292
 
            }
1293
 
 
1294
 
            if (err != MPI_SUCCESS)
1295
 
            {
1296
 
                char e [MPI_MAX_ERROR_STRING];
1297
 
                int len = 0;
1298
 
                memset (e, 0, MPI_MAX_ERROR_STRING);
1299
 
                MPI_Error_string (err, e, &len);
1300
 
                fprintf (stderr, "MPI open write failed for %s: '%s'\n"
1301
 
                        ,name, e
1302
 
                        );
1303
 
                free (name);
1304
 
 
1305
 
                return adios_flag_no;
1306
 
            }
1307
 
 
1308
 
            break;
1309
 
        }
1310
 
 
1311
 
        default:
1312
 
        {
1313
 
            fprintf (stderr, "Unknown file mode: %d\n", fd->mode);
1314
 
 
1315
 
            free (name);
1316
 
 
1317
 
            return adios_flag_no;
1318
 
        }
1319
 
    }
1320
 
 
1321
 
    free (name);
1322
 
 
1323
 
    if (fd->shared_buffer == adios_flag_no && fd->mode != adios_mode_read)
1324
 
    {
1325
 
        // write the process group header
1326
 
        adios_write_process_group_header_v1 (fd, fd->write_size_bytes);
1327
 
 
1328
 
        uint64_t count;
1329
 
        count = adios_mpi_lustre_striping_unit_write(
1330
 
                          md->fh,
1331
 
                          fd->base_offset,
1332
 
                          fd->buffer,
1333
 
                          fd->bytes_written,
1334
 
                          md->block_unit);
1335
 
        if (count != fd->bytes_written)
1336
 
        {
1337
 
            fprintf (stderr, "a:MPI method tried to write %llu, "
1338
 
                             "only wrote %llu\n"
1339
 
                    ,fd->bytes_written
1340
 
                    ,count
1341
 
                    );
1342
 
        }
1343
 
        fd->base_offset += count;
1344
 
        fd->offset = 0;
1345
 
        fd->bytes_written = 0;
1346
 
        adios_shared_buffer_free (&md->b);
1347
 
 
1348
 
        // setup for writing vars
1349
 
        adios_write_open_vars_v1 (fd);
1350
 
        md->vars_start = fd->base_offset;
1351
 
        md->vars_header_size = fd->offset;
1352
 
        fd->base_offset += fd->offset;
1353
 
        MPI_File_seek (md->fh, md->vars_header_size, MPI_SEEK_CUR);
1354
 
        fd->offset = 0;
1355
 
        fd->bytes_written = 0;
1356
 
        adios_shared_buffer_free (&md->b);
1357
 
    }
1358
 
 
1359
 
#if COLLECT_METRICS
1360
 
    gettimeofday (&t22, NULL);
1361
 
#endif
1362
 
    return fd->shared_buffer;
1363
 
}
1364
 
 
1365
 
void adios_mpi_lustre_write (struct adios_file_struct * fd
1366
 
                     ,struct adios_var_struct * v
1367
 
                     ,void * data
1368
 
                     ,struct adios_method_struct * method
1369
 
                     )
1370
 
{
1371
 
    struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
1372
 
                                                      method->method_data;
1373
 
 
1374
 
    if (v->got_buffer == adios_flag_yes)
1375
 
    {
1376
 
        if (data != v->data)  // if the user didn't give back the same thing
1377
 
        {
1378
 
            if (v->free_data == adios_flag_yes)
1379
 
            {
1380
 
                free (v->data);
1381
 
                adios_method_buffer_free (v->data_size);
1382
 
            }
1383
 
        }
1384
 
        else
1385
 
        {
1386
 
            // we already saved all of the info, so we're ok.
1387
 
            return;
1388
 
        }
1389
 
    }
1390
 
 
1391
 
    if (fd->shared_buffer == adios_flag_no)
1392
 
    {
1393
 
        // var payload sent for sizing information
1394
 
        adios_write_var_header_v1 (fd, v);
1395
 
 
1396
 
        uint64_t count;
1397
 
        count = adios_mpi_lustre_striping_unit_write(
1398
 
                          md->fh,
1399
 
                          -1,
1400
 
                          fd->buffer,
1401
 
                          fd->bytes_written,
1402
 
                          md->block_unit);
1403
 
        if (count != fd->bytes_written)
1404
 
        {
1405
 
            fprintf (stderr, "b:MPI method tried to write %llu, "
1406
 
                             "only wrote %llu\n"
1407
 
                    ,fd->bytes_written
1408
 
                    ,count
1409
 
                    );
1410
 
        }
1411
 
        fd->base_offset += count;
1412
 
        fd->offset = 0;
1413
 
        fd->bytes_written = 0;
1414
 
        adios_shared_buffer_free (&md->b);
1415
 
 
1416
 
        // write payload
1417
 
        // adios_write_var_payload_v1 (fd, v);
1418
 
        uint64_t var_size = adios_get_var_size (v, fd->group, v->data);
1419
 
        if (fd->base_offset + var_size > fd->pg_start_in_file + fd->write_size_bytes) 
1420
 
            fprintf (stderr, "adios_mpi_write exceeds pg bound. File is corrupted. "
1421
 
                             "Need to enlarge group size. \n");
1422
 
        count = adios_mpi_lustre_striping_unit_write(
1423
 
                          md->fh,
1424
 
                          -1,
1425
 
                          v->data,
1426
 
                          var_size,
1427
 
                          md->block_unit);
1428
 
        if (count != var_size)
1429
 
        {
1430
 
            fprintf (stderr, "c:MPI method tried to write %llu, "
1431
 
                             "only wrote %llu\n"
1432
 
                    ,var_size
1433
 
                    ,count
1434
 
                    );
1435
 
        }
1436
 
        fd->base_offset += count;
1437
 
        fd->offset = 0;
1438
 
        fd->bytes_written = 0;
1439
 
        adios_shared_buffer_free (&md->b);
1440
 
    }
1441
 
#if COLLECT_METRICS
1442
 
    static int writes_seen = 0;
1443
 
 
1444
 
    if (writes_seen == 0) gettimeofday (&t24, NULL);
1445
 
    else if (writes_seen == 1) gettimeofday (&t25, NULL);
1446
 
    writes_seen++;
1447
 
#endif
1448
 
}
1449
 
 
1450
 
void adios_mpi_lustre_get_write_buffer (struct adios_file_struct * fd
1451
 
                                ,struct adios_var_struct * v
1452
 
                                ,uint64_t * size
1453
 
                                ,void ** buffer
1454
 
                                ,struct adios_method_struct * method
1455
 
                                )
1456
 
{
1457
 
    uint64_t mem_allowed;
1458
 
 
1459
 
    if (*size == 0)
1460
 
    {
1461
 
        *buffer = 0;
1462
 
 
1463
 
        return;
1464
 
    }
1465
 
 
1466
 
    if (v->data && v->free_data)
1467
 
    {
1468
 
        adios_method_buffer_free (v->data_size);
1469
 
        free (v->data);
1470
 
    }
1471
 
 
1472
 
    mem_allowed = adios_method_buffer_alloc (*size);
1473
 
    if (mem_allowed == *size)
1474
 
    {
1475
 
        *buffer = malloc (*size);
1476
 
        if (!*buffer)
1477
 
        {
1478
 
            adios_method_buffer_free (mem_allowed);
1479
 
            fprintf (stderr, "Out of memory allocating %llu bytes for %s\n"
1480
 
                    ,*size, v->name
1481
 
                    );
1482
 
            v->got_buffer = adios_flag_no;
1483
 
            v->free_data = adios_flag_no;
1484
 
            v->data_size = 0;
1485
 
            v->data = 0;
1486
 
            *size = 0;
1487
 
            *buffer = 0;
1488
 
        }
1489
 
        else
1490
 
        {
1491
 
            v->got_buffer = adios_flag_yes;
1492
 
            v->free_data = adios_flag_yes;
1493
 
            v->data_size = mem_allowed;
1494
 
            v->data = *buffer;
1495
 
        }
1496
 
    }
1497
 
    else
1498
 
    {
1499
 
        adios_method_buffer_free (mem_allowed);
1500
 
        fprintf (stderr, "OVERFLOW: Cannot allocate requested buffer of %llu "
1501
 
                         "bytes for %s\n"
1502
 
                ,*size
1503
 
                ,v->name
1504
 
                );
1505
 
        *size = 0;
1506
 
        *buffer = 0;
1507
 
    }
1508
 
}
1509
 
 
1510
 
void adios_mpi_lustre_read (struct adios_file_struct * fd
1511
 
                    ,struct adios_var_struct * v, void * buffer
1512
 
                    ,uint64_t buffer_size
1513
 
                    ,struct adios_method_struct * method
1514
 
                    )
1515
 
{
1516
 
    v->data = buffer;
1517
 
    v->data_size = buffer_size;
1518
 
}
1519
 
 
1520
 
static void adios_mpi_lustre_do_read (struct adios_file_struct * fd
1521
 
                              ,struct adios_method_struct * method
1522
 
                              )
1523
 
{
1524
 
    struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
1525
 
                                                      method->method_data;
1526
 
    struct adios_var_struct * v = fd->group->vars;
1527
 
 
1528
 
    struct adios_parse_buffer_struct data;
1529
 
 
1530
 
    data.vars = v;
1531
 
    data.buffer = 0;
1532
 
    data.buffer_len = 0;
1533
 
 
1534
 
    switch (md->b.version & ADIOS_VERSION_NUM_MASK)
1535
 
    {
1536
 
        case 1:
1537
 
        {
1538
 
            // the three section headers
1539
 
            struct adios_process_group_header_struct_v1 pg_header;
1540
 
            struct adios_vars_header_struct_v1 vars_header;
1541
 
            struct adios_attributes_header_struct_v1 attrs_header;
1542
 
 
1543
 
            struct adios_var_header_struct_v1 var_header;
1544
 
            struct adios_var_payload_struct_v1 var_payload;
1545
 
            struct adios_attribute_struct_v1 attribute;
1546
 
 
1547
 
            uint64_t i;
1548
 
 
1549
 
            adios_init_buffer_read_process_group (&md->b);
1550
 
            MPI_File_seek (md->fh, md->b.read_pg_offset
1551
 
                          ,MPI_SEEK_SET
1552
 
                          );
1553
 
            MPI_File_read (md->fh, md->b.buff, md->b.read_pg_size, MPI_BYTE
1554
 
                          ,&md->status
1555
 
                          );
1556
 
            adios_parse_process_group_header_v1 (&md->b, &pg_header);
1557
 
 
1558
 
            adios_parse_vars_header_v1 (&md->b, &vars_header);
1559
 
 
1560
 
            for (i = 0; i < vars_header.count; i++)
1561
 
            {
1562
 
                memset (&var_payload, 0
1563
 
                       ,sizeof (struct adios_var_payload_struct_v1)
1564
 
                       );
1565
 
                adios_parse_var_data_header_v1 (&md->b, &var_header);
1566
 
 
1567
 
                struct adios_var_struct * v1 = v;
1568
 
                while (v1)
1569
 
                {
1570
 
                    if (   strcasecmp (var_header.name, v1->name)
1571
 
                        || strcasecmp (var_header.path, v1->path)
1572
 
                       )
1573
 
                    {
1574
 
                        v1 = v1->next;
1575
 
                    }
1576
 
                    else
1577
 
                    {
1578
 
                        break;
1579
 
                    }
1580
 
                }
1581
 
 
1582
 
                if (v1)
1583
 
                {
1584
 
                    var_payload.payload = v1->data;
1585
 
                    adios_parse_var_data_payload_v1 (&md->b, &var_header
1586
 
                                                    ,&var_payload
1587
 
                                                    ,v1->data_size
1588
 
                                                    );
1589
 
                }
1590
 
                else
1591
 
                {
1592
 
                    printf ("MPI read: skipping name: %s path: %s\n"
1593
 
                           ,var_header.name, var_header.path
1594
 
                           );
1595
 
                    adios_parse_var_data_payload_v1 (&md->b, &var_header
1596
 
                                                    ,NULL, 0
1597
 
                                                    );
1598
 
                }
1599
 
 
1600
 
                adios_clear_var_header_v1 (&var_header);
1601
 
            }
1602
 
 
1603
 
#if 1
1604
 
            adios_parse_attributes_header_v1 (&md->b, &attrs_header);
1605
 
 
1606
 
            for (i = 0; i < attrs_header.count; i++)
1607
 
            {
1608
 
                adios_parse_attribute_v1 (&md->b, &attribute);
1609
 
                adios_clear_attribute_v1 (&attribute);
1610
 
            }
1611
 
#endif
1612
 
            adios_clear_process_group_header_v1 (&pg_header);
1613
 
 
1614
 
            break;
1615
 
        }
1616
 
 
1617
 
        default:
1618
 
            fprintf (stderr, "MPI read: file version unknown: %u\n"
1619
 
                    ,md->b.version
1620
 
                    );
1621
 
            return;
1622
 
    }
1623
 
 
1624
 
    adios_buffer_struct_clear (&md->b);
1625
 
}
1626
 
 
1627
 
void adios_mpi_lustre_close (struct adios_file_struct * fd
1628
 
                     ,struct adios_method_struct * method
1629
 
                     )
1630
 
{
1631
 
    struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
1632
 
                                                 method->method_data;
1633
 
    struct adios_attribute_struct * a = fd->group->attributes;
1634
 
 
1635
 
    struct adios_index_process_group_struct_v1 * new_pg_root = 0;
1636
 
    struct adios_index_var_struct_v1 * new_vars_root = 0;
1637
 
    struct adios_index_attribute_struct_v1 * new_attrs_root = 0;
1638
 
#if COLLECT_METRICS
1639
 
    gettimeofday (&t23, NULL);
1640
 
    static int iteration = 0;
1641
 
#endif
1642
 
 
1643
 
    switch (fd->mode)
1644
 
    {
1645
 
        case adios_mode_read:
1646
 
        {
1647
 
            // read the index to find the place to start reading
1648
 
            adios_mpi_lustre_do_read (fd, method);
1649
 
            struct adios_var_struct * v = fd->group->vars;
1650
 
            while (v)
1651
 
            {
1652
 
                v->data = 0;
1653
 
                v = v->next;
1654
 
            }
1655
 
 
1656
 
            break;
1657
 
        }
1658
 
 
1659
 
        case adios_mode_write:
1660
 
        {
1661
 
            char * buffer = 0;
1662
 
            uint64_t buffer_size = 0;
1663
 
            uint64_t buffer_offset = 0;
1664
 
            uint64_t index_start = md->b.pg_index_offset;
1665
 
 
1666
 
            if (fd->shared_buffer == adios_flag_no)
1667
 
            {
1668
 
                MPI_Offset new_off;
1669
 
                // set it up so that it will start at 0, but have correct sizes
1670
 
                MPI_File_get_position (md->fh, &new_off);
1671
 
                fd->offset = fd->base_offset - md->vars_start;
1672
 
                fd->vars_start = 0;
1673
 
                fd->buffer_size = 0;
1674
 
                adios_write_close_vars_v1 (fd);
1675
 
                // fd->vars_start gets updated with the size written
1676
 
                uint64_t count;
1677
 
                int retlen;
1678
 
                count = adios_mpi_lustre_striping_unit_write(
1679
 
                                  md->fh,
1680
 
                                  md->vars_start,
1681
 
                                  fd->buffer,
1682
 
                                  md->vars_header_size,
1683
 
                                  md->block_unit);
1684
 
                if (count != md->vars_header_size)
1685
 
                {
1686
 
                    fprintf (stderr, "d:MPI method tried to write %llu, "
1687
 
                                     "only wrote %d\n"
1688
 
                            ,md->vars_header_size
1689
 
                            ,count
1690
 
                            );
1691
 
                }
1692
 
                fd->offset = 0;
1693
 
                fd->bytes_written = 0;
1694
 
                adios_shared_buffer_free (&md->b);
1695
 
 
1696
 
                adios_write_open_attributes_v1 (fd);
1697
 
                md->vars_start = new_off;
1698
 
                md->vars_header_size = fd->offset;
1699
 
                MPI_File_seek (md->fh, new_off + md->vars_header_size
1700
 
                              ,MPI_SEEK_SET
1701
 
                              ); // go back to end, but after attr header
1702
 
                fd->base_offset += fd->offset;  // add size of header
1703
 
                fd->offset = 0;
1704
 
                fd->bytes_written = 0;
1705
 
 
1706
 
                while (a)
1707
 
                {
1708
 
                    adios_write_attribute_v1 (fd, a);
1709
 
                    if (fd->base_offset + fd->bytes_written > fd->pg_start_in_file + fd->write_size_bytes)
1710
 
                        fprintf (stderr, "adios_mpi_write exceeds pg bound. File is corrupted. "
1711
 
                                         "Need to enlarge group size. \n");
1712
 
                    count = adios_mpi_lustre_striping_unit_write(
1713
 
                                      md->fh,
1714
 
                                      -1,
1715
 
                                      fd->buffer,
1716
 
                                      fd->bytes_written,
1717
 
                                      md->block_unit);
1718
 
                    if (count != fd->bytes_written)
1719
 
                    {
1720
 
                        fprintf (stderr, "e:MPI method tried to write %llu, "
1721
 
                                         "only wrote %llu\n"
1722
 
                                ,fd->bytes_written
1723
 
                                ,count
1724
 
                                );
1725
 
                    }
1726
 
                    fd->base_offset += count;
1727
 
                    fd->offset = 0;
1728
 
                    fd->bytes_written = 0;
1729
 
                    adios_shared_buffer_free (&md->b);
1730
 
 
1731
 
                    a = a->next;
1732
 
                }
1733
 
 
1734
 
                // set it up so that it will start at 0, but have correct sizes
1735
 
                fd->offset = fd->base_offset - md->vars_start;
1736
 
                fd->vars_start = 0;
1737
 
                fd->buffer_size = 0;
1738
 
                adios_write_close_attributes_v1 (fd);
1739
 
                // fd->vars_start gets updated with the size written
1740
 
                count = adios_mpi_lustre_striping_unit_write(
1741
 
                                  md->fh,
1742
 
                                  md->vars_start,
1743
 
                                  fd->buffer,
1744
 
                                  md->vars_header_size,
1745
 
                                  md->block_unit);
1746
 
                if (count != md->vars_header_size)
1747
 
                {
1748
 
                    fprintf (stderr, "f:MPI method tried to write %llu, "
1749
 
                                     "only wrote %llu\n"
1750
 
                            ,md->vars_header_size
1751
 
                            ,count
1752
 
                            );
1753
 
                }
1754
 
                fd->offset = 0;
1755
 
                fd->bytes_written = 0;
1756
 
            }
1757
 
 
1758
 
#if COLLECT_METRICS
1759
 
            gettimeofday (&t19, NULL);
1760
 
#endif
1761
 
#if COLLECT_METRICS
1762
 
            gettimeofday (&t7, NULL);
1763
 
#endif
1764
 
#if COLLECT_METRICS
1765
 
            gettimeofday (&t12, NULL);
1766
 
#endif
1767
 
            // build index appending to any existing index
1768
 
            adios_build_index_v1 (fd, &md->old_pg_root, &md->old_vars_root
1769
 
                                 ,&md->old_attrs_root
1770
 
                                 );
1771
 
            // if collective, gather the indexes from the rest and call
1772
 
            if (md->group_comm != MPI_COMM_NULL)
1773
 
            {
1774
 
                if (md->rank == 0)
1775
 
                {
1776
 
                    int * index_sizes = malloc (4 * md->size);
1777
 
                    int * index_offsets = malloc (4 * md->size);
1778
 
                    char * recv_buffer = 0;
1779
 
                    uint32_t size = 0;
1780
 
                    uint32_t total_size = 0;
1781
 
                    int i;
1782
 
 
1783
 
                    MPI_Gather (&size, 1, MPI_INT
1784
 
                               ,index_sizes, 1, MPI_INT
1785
 
                               ,0, md->group_comm
1786
 
                               );
1787
 
 
1788
 
                    for (i = 0; i < md->size; i++)
1789
 
                    {
1790
 
                        index_offsets [i] = total_size;
1791
 
                        total_size += index_sizes [i];
1792
 
                    } 
1793
 
 
1794
 
                    recv_buffer = malloc (total_size);
1795
 
 
1796
 
                    MPI_Gatherv (&size, 0, MPI_BYTE
1797
 
                                ,recv_buffer, index_sizes, index_offsets
1798
 
                                ,MPI_BYTE, 0, md->group_comm
1799
 
                                );
1800
 
 
1801
 
                    char * buffer_save = md->b.buff;
1802
 
                    uint64_t buffer_size_save = md->b.length;
1803
 
                    uint64_t offset_save = md->b.offset;
1804
 
 
1805
 
                    for (i = 1; i < md->size; i++)
1806
 
                    {
1807
 
                        md->b.buff = recv_buffer + index_offsets [i];
1808
 
                        md->b.length = index_sizes [i];
1809
 
                        md->b.offset = 0;
1810
 
 
1811
 
                        adios_parse_process_group_index_v1 (&md->b
1812
 
                                                           ,&new_pg_root
1813
 
                                                           );
1814
 
                        adios_parse_vars_index_v1 (&md->b, &new_vars_root);
1815
 
                        adios_parse_attributes_index_v1 (&md->b
1816
 
                                                        ,&new_attrs_root
1817
 
                                                        );
1818
 
                        adios_merge_index_v1 (&md->old_pg_root
1819
 
                                             ,&md->old_vars_root
1820
 
                                             ,&md->old_attrs_root
1821
 
                                             ,new_pg_root, new_vars_root
1822
 
                                             ,new_attrs_root
1823
 
                                             );
1824
 
                        new_pg_root = 0;
1825
 
                        new_vars_root = 0;
1826
 
                        new_attrs_root = 0;
1827
 
                    }
1828
 
                    md->b.buff = buffer_save;
1829
 
                    md->b.length = buffer_size_save;
1830
 
                    md->b.offset = offset_save;
1831
 
 
1832
 
                    free (recv_buffer);
1833
 
                    free (index_sizes);
1834
 
                    free (index_offsets);
1835
 
                }
1836
 
                else
1837
 
                {
1838
 
                    adios_write_index_v1 (&buffer, &buffer_size, &buffer_offset
1839
 
                                         ,0, md->old_pg_root
1840
 
                                         ,md->old_vars_root
1841
 
                                         ,md->old_attrs_root
1842
 
                                         );
1843
 
 
1844
 
                    MPI_Gather (&buffer_size, 1, MPI_INT, 0, 0, MPI_INT
1845
 
                               ,0, md->group_comm
1846
 
                               );
1847
 
                    MPI_Gatherv (buffer, buffer_size, MPI_BYTE
1848
 
                                ,0, 0, 0, MPI_BYTE
1849
 
                                ,0, md->group_comm
1850
 
                                );
1851
 
                }
1852
 
            }
1853
 
 
1854
 
#if COLLECT_METRICS
1855
 
            gettimeofday (&t13, NULL);
1856
 
#endif
1857
 
            if (fd->shared_buffer == adios_flag_yes)
1858
 
            {
1859
 
                // everyone writes their data
1860
 
                if (fd->base_offset + fd->bytes_written > fd->pg_start_in_file + fd->write_size_bytes)
1861
 
                    fprintf (stderr, "adios_mpi_write exceeds pg bound. File is corrupted. "
1862
 
                             "Need to enlarge group size. \n");
1863
 
 
1864
 
                adios_mpi_lustre_striping_unit_write(
1865
 
                                  md->fh,
1866
 
                                  fd->base_offset,
1867
 
                                  fd->buffer,
1868
 
                                  fd->bytes_written,
1869
 
                                  md->block_unit);
1870
 
            }
1871
 
 
1872
 
            if (md->rank == 0)
1873
 
            {
1874
 
                adios_write_index_v1 (&buffer, &buffer_size, &buffer_offset
1875
 
                                     ,index_start, md->old_pg_root
1876
 
                                     ,md->old_vars_root
1877
 
                                     ,md->old_attrs_root
1878
 
                                     );
1879
 
                adios_write_version_v1 (&buffer, &buffer_size, &buffer_offset);
1880
 
 
1881
 
                adios_mpi_lustre_striping_unit_write(
1882
 
                                  md->fh,
1883
 
                                  md->b.pg_index_offset,
1884
 
                                  buffer,
1885
 
                                  buffer_offset,
1886
 
                                  md->block_unit);
1887
 
            }
1888
 
#if COLLECT_METRICS
1889
 
            gettimeofday (&t8, NULL);
1890
 
#endif
1891
 
#if COLLECT_METRICS
1892
 
            gettimeofday (&t20, NULL);
1893
 
#endif
1894
 
#if COLLECT_METRICS
1895
 
            gettimeofday (&t14, NULL);
1896
 
#endif
1897
 
 
1898
 
            if (buffer)
1899
 
            {
1900
 
                free (buffer);
1901
 
                buffer = 0;
1902
 
                buffer_size = 0;
1903
 
                buffer_offset = 0;
1904
 
            }
1905
 
 
1906
 
            adios_clear_index_v1 (new_pg_root, new_vars_root, new_attrs_root);
1907
 
            adios_clear_index_v1 (md->old_pg_root, md->old_vars_root
1908
 
                                 ,md->old_attrs_root
1909
 
                                 );
1910
 
            new_pg_root = 0;
1911
 
            new_vars_root = 0;
1912
 
            new_attrs_root = 0;
1913
 
            md->old_pg_root = 0;
1914
 
            md->old_vars_root = 0;
1915
 
            md->old_attrs_root = 0;
1916
 
#if COLLECT_METRICS
1917
 
            gettimeofday (&t11, NULL);
1918
 
            t15.tv_sec = t11.tv_sec;
1919
 
            t15.tv_usec = t11.tv_usec;
1920
 
#endif
1921
 
 
1922
 
            break;
1923
 
        }
1924
 
 
1925
 
        case adios_mode_append:
1926
 
        {
1927
 
            char * buffer = 0;
1928
 
            uint64_t buffer_size = 0;
1929
 
            uint64_t buffer_offset = 0;
1930
 
            uint64_t index_start = md->b.pg_index_offset;
1931
 
 
1932
 
            if (fd->shared_buffer == adios_flag_no)
1933
 
            {
1934
 
                MPI_Offset new_off;
1935
 
                // set it up so that it will start at 0, but have correct sizes
1936
 
                MPI_File_get_position (md->fh, &new_off);
1937
 
                fd->offset = fd->base_offset - md->vars_start;
1938
 
                fd->vars_start = 0;
1939
 
                fd->buffer_size = 0;
1940
 
                adios_write_close_vars_v1 (fd);
1941
 
                // fd->vars_start gets updated with the size written
1942
 
                uint64_t count;
1943
 
                count = adios_mpi_lustre_striping_unit_write(
1944
 
                                  md->fh,
1945
 
                                  md->vars_start,
1946
 
                                  fd->buffer,
1947
 
                                  md->vars_header_size,
1948
 
                                  md->block_unit);
1949
 
                if (count != md->vars_header_size)
1950
 
                {
1951
 
                    fprintf (stderr, "d:MPI method tried to write %llu, "
1952
 
                                     "only wrote %llu\n"
1953
 
                            ,md->vars_header_size
1954
 
                            ,count
1955
 
                            );
1956
 
                }
1957
 
                fd->offset = 0;
1958
 
                fd->bytes_written = 0;
1959
 
                adios_shared_buffer_free (&md->b);
1960
 
 
1961
 
                adios_write_open_attributes_v1 (fd);
1962
 
                md->vars_start = new_off;
1963
 
                md->vars_header_size = fd->offset;
1964
 
                MPI_File_seek (md->fh, new_off + md->vars_header_size
1965
 
                              ,MPI_SEEK_SET
1966
 
                              ); // go back to end, but after attr header
1967
 
                fd->base_offset += fd->offset;  // add size of header
1968
 
                fd->offset = 0;
1969
 
                fd->bytes_written = 0;
1970
 
 
1971
 
                while (a)
1972
 
                {
1973
 
                    adios_write_attribute_v1 (fd, a);
1974
 
                    count = adios_mpi_lustre_striping_unit_write(
1975
 
                                  md->fh,
1976
 
                                  -1,
1977
 
                                  fd->buffer,
1978
 
                                  fd->bytes_written,
1979
 
                                  md->block_unit);
1980
 
                    if (count != fd->bytes_written)
1981
 
                    {
1982
 
                        fprintf (stderr, "e:MPI method tried to write %llu, "
1983
 
                                         "only wrote %llu\n"
1984
 
                                ,fd->bytes_written
1985
 
                                ,count
1986
 
                                );
1987
 
                    }
1988
 
                    fd->base_offset += count;
1989
 
                    fd->offset = 0;
1990
 
                    fd->bytes_written = 0;
1991
 
                    adios_shared_buffer_free (&md->b);
1992
 
 
1993
 
                    a = a->next;
1994
 
                }
1995
 
 
1996
 
                // set it up so that it will start at 0, but have correct sizes
1997
 
                fd->offset = fd->base_offset - md->vars_start;
1998
 
                fd->vars_start = 0;
1999
 
                fd->buffer_size = 0;
2000
 
                adios_write_close_attributes_v1 (fd);
2001
 
                // fd->vars_start gets updated with the size written
2002
 
                count = adios_mpi_lustre_striping_unit_write(
2003
 
                                  md->fh,
2004
 
                                  md->vars_start,
2005
 
                                  fd->buffer,
2006
 
                                  md->vars_header_size,
2007
 
                                  md->block_unit);
2008
 
                if (count != md->vars_header_size)
2009
 
                {
2010
 
                    fprintf (stderr, "f:MPI method tried to write %llu, "
2011
 
                                     "only wrote %llu\n"
2012
 
                            ,md->vars_header_size
2013
 
                            ,count
2014
 
                            );
2015
 
                }
2016
 
                fd->offset = 0;
2017
 
                fd->bytes_written = 0;
2018
 
            }
2019
 
 
2020
 
            // build index appending to any existing index
2021
 
            adios_build_index_v1 (fd, &md->old_pg_root, &md->old_vars_root
2022
 
                                 ,&md->old_attrs_root
2023
 
                                 );
2024
 
            // if collective, gather the indexes from the rest and call
2025
 
            if (md->group_comm != MPI_COMM_NULL)
2026
 
            {
2027
 
                if (md->rank == 0)
2028
 
                {
2029
 
                    int * index_sizes = malloc (4 * md->size);
2030
 
                    int * index_offsets = malloc (4 * md->size);
2031
 
                    char * recv_buffer = 0;
2032
 
                    uint32_t size = 0;
2033
 
                    uint32_t total_size = 0;
2034
 
                    int i;
2035
 
 
2036
 
                    MPI_Gather (&size, 1, MPI_INT
2037
 
                               ,index_sizes, 1, MPI_INT
2038
 
                               ,0, md->group_comm
2039
 
                               );
2040
 
 
2041
 
                    for (i = 0; i < md->size; i++)
2042
 
                    {
2043
 
                        index_offsets [i] = total_size;
2044
 
                        total_size += index_sizes [i];
2045
 
                    }
2046
 
 
2047
 
                    recv_buffer = malloc (total_size);
2048
 
 
2049
 
                    MPI_Gatherv (&size, 0, MPI_BYTE
2050
 
                                ,recv_buffer, index_sizes, index_offsets
2051
 
                                ,MPI_BYTE, 0, md->group_comm
2052
 
                                );
2053
 
 
2054
 
                    char * buffer_save = md->b.buff;
2055
 
                    uint64_t buffer_size_save = md->b.length;
2056
 
                    uint64_t offset_save = md->b.offset;
2057
 
 
2058
 
                    for (i = 1; i < md->size; i++)
2059
 
                    {
2060
 
                        md->b.buff = recv_buffer + index_offsets [i];
2061
 
                        md->b.length = index_sizes [i];
2062
 
                        md->b.offset = 0;
2063
 
 
2064
 
                        adios_parse_process_group_index_v1 (&md->b
2065
 
                                                           ,&new_pg_root
2066
 
                                                           );
2067
 
                        adios_parse_vars_index_v1 (&md->b, &new_vars_root);
2068
 
                        adios_parse_attributes_index_v1 (&md->b
2069
 
                                                        ,&new_attrs_root
2070
 
                                                        );
2071
 
                        adios_merge_index_v1 (&md->old_pg_root
2072
 
                                             ,&md->old_vars_root
2073
 
                                             ,&md->old_attrs_root
2074
 
                                             ,new_pg_root, new_vars_root
2075
 
                                             ,new_attrs_root
2076
 
                                             );
2077
 
                        new_pg_root = 0;
2078
 
                        new_vars_root = 0;
2079
 
                        new_attrs_root = 0;
2080
 
                    }
2081
 
                    md->b.buff = buffer_save;
2082
 
                    md->b.length = buffer_size_save;
2083
 
                    md->b.offset = offset_save;
2084
 
 
2085
 
                    free (recv_buffer);
2086
 
                    free (index_sizes);
2087
 
                    free (index_offsets);
2088
 
                }
2089
 
                else
2090
 
                {
2091
 
                    adios_write_index_v1 (&buffer, &buffer_size, &buffer_offset
2092
 
                                         ,0, md->old_pg_root
2093
 
                                         ,md->old_vars_root
2094
 
                                         ,md->old_attrs_root
2095
 
                                         );
2096
 
 
2097
 
                    MPI_Gather (&buffer_size, 1, MPI_INT, 0, 0, MPI_INT
2098
 
                               ,0, md->group_comm
2099
 
                               );
2100
 
                    MPI_Gatherv (buffer, buffer_size, MPI_BYTE
2101
 
                                ,0, 0, 0, MPI_BYTE
2102
 
                                ,0, md->group_comm
2103
 
                                );
2104
 
                }
2105
 
            }
2106
 
 
2107
 
            if (fd->shared_buffer == adios_flag_yes)
2108
 
            {
2109
 
                // everyone writes their data
2110
 
                adios_mpi_lustre_striping_unit_write(
2111
 
                                  md->fh,
2112
 
                                  fd->base_offset,
2113
 
                                  fd->buffer,
2114
 
                                  fd->bytes_written,
2115
 
                                  md->block_unit);
2116
 
            }
2117
 
 
2118
 
            if (md->rank == 0)
2119
 
            {
2120
 
                adios_write_index_v1 (&buffer, &buffer_size, &buffer_offset
2121
 
                                     ,index_start, md->old_pg_root
2122
 
                                     ,md->old_vars_root
2123
 
                                     ,md->old_attrs_root
2124
 
                                     );
2125
 
                adios_write_version_v1 (&buffer, &buffer_size, &buffer_offset);
2126
 
 
2127
 
                adios_mpi_lustre_striping_unit_write(
2128
 
                                  md->fh,
2129
 
                                  md->b.pg_index_offset,
2130
 
                                  buffer,
2131
 
                                  buffer_offset,
2132
 
                                  md->block_unit);
2133
 
            }
2134
 
 
2135
 
            free (buffer);
2136
 
 
2137
 
            adios_clear_index_v1 (new_pg_root, new_vars_root, new_attrs_root);
2138
 
            adios_clear_index_v1 (md->old_pg_root, md->old_vars_root
2139
 
                                 ,md->old_attrs_root
2140
 
                                 );
2141
 
            new_pg_root = 0;
2142
 
            new_vars_root = 0;
2143
 
            new_attrs_root = 0;
2144
 
            md->old_pg_root = 0;
2145
 
            md->old_vars_root = 0;
2146
 
            md->old_attrs_root = 0;
2147
 
 
2148
 
            break;
2149
 
        }
2150
 
 
2151
 
        default:
2152
 
        {
2153
 
            fprintf (stderr, "Unknown file mode: %d\n", fd->mode);
2154
 
        }
2155
 
    }
2156
 
 
2157
 
    if (md && md->fh)
2158
 
        MPI_File_close (&md->fh);
2159
 
 
2160
 
    if (   md->group_comm != MPI_COMM_WORLD
2161
 
        && md->group_comm != MPI_COMM_SELF
2162
 
        && md->group_comm != MPI_COMM_NULL
2163
 
       )
2164
 
    {
2165
 
        md->group_comm = MPI_COMM_NULL;
2166
 
    }
2167
 
 
2168
 
    md->fh = 0;
2169
 
    md->req = 0;
2170
 
    memset (&md->status, 0, sizeof (MPI_Status));
2171
 
    md->group_comm = MPI_COMM_NULL;
2172
 
 
2173
 
    adios_clear_index_v1 (md->old_pg_root, md->old_vars_root
2174
 
                         ,md->old_attrs_root
2175
 
                         );
2176
 
    md->old_pg_root = 0;
2177
 
    md->old_vars_root = 0;
2178
 
    md->old_attrs_root = 0;
2179
 
#if COLLECT_METRICS
2180
 
    print_metrics (md, iteration++);
2181
 
#endif
2182
 
}
2183
 
 
2184
 
void adios_mpi_lustre_finalize (int mype, struct adios_method_struct * method)
2185
 
{
2186
 
// nothing to do here
2187
 
    if (adios_mpi_lustre_initialized)
2188
 
        adios_mpi_lustre_initialized = 0;
2189
 
}
2190
 
 
2191
 
void adios_mpi_lustre_end_iteration (struct adios_method_struct * method)
2192
 
{
2193
 
}
2194
 
 
2195
 
void adios_mpi_lustre_start_calculation (struct adios_method_struct * method)
2196
 
{
2197
 
}
2198
 
 
2199
 
void adios_mpi_lustre_stop_calculation (struct adios_method_struct * method)
2200
 
{
2201
 
}