~ubuntu-branches/ubuntu/utopic/adios/utopic

« back to all changes in this revision

Viewing changes to src/write/adios_mpi.c

  • Committer: Package Import Robot
  • Author(s): Alastair McKinstry
  • Date: 2013-12-09 15:21:31 UTC
  • mfrom: (1.1.2)
  • Revision ID: package-import@ubuntu.com-20131209152131-jtd4fpmdv3xnunnm
Tags: 1.5.0-1
* New upstream.
* Standards-Version: 3.9.5
* Include latest config.{sub,guess} 
* New watch file.
* Create libadios-bin for binaries.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* 
 
2
 * ADIOS is freely available under the terms of the BSD license described
 
3
 * in the COPYING file in the top level directory of this source distribution.
 
4
 *
 
5
 * Copyright (c) 2008 - 2009.  UT-BATTELLE, LLC. All rights reserved.
 
6
 */
 
7
 
 
8
#include <unistd.h>
 
9
#include <fcntl.h>
 
10
#include <stdlib.h>
 
11
#include <math.h>
 
12
#include <string.h>
 
13
#if defined(__APPLE__)
 
14
#       include <sys/param.h>
 
15
#       include <sys/mount.h>
 
16
#else
 
17
#       include <sys/vfs.h>
 
18
#endif
 
19
#include <sys/ioctl.h>
 
20
#include <assert.h>
 
21
 
 
22
// xml parser
 
23
#include <mxml.h>
 
24
 
 
25
#include "public/adios_mpi.h"
 
26
#include "public/adios_error.h"
 
27
#include "core/adios_transport_hooks.h"
 
28
#include "core/adios_bp_v1.h"
 
29
#include "core/adios_internals.h"
 
30
#include "core/buffer.h"
 
31
#include "core/util.h"
 
32
#include "core/adios_logger.h"
 
33
#ifdef DMALLOC
 
34
#include "dmalloc.h"
 
35
#endif
 
36
 
 
37
static int adios_mpi_initialized = 0;
 
38
 
 
39
#define COLLECT_METRICS 0
 
40
 
 
41
 
 
42
struct adios_MPI_data_struct
 
43
{
 
44
    MPI_File fh;
 
45
    MPI_Request req;
 
46
    MPI_Status status;
 
47
    MPI_Comm group_comm;
 
48
    MPI_Info info;      // set with base hints for Lustre
 
49
    int rank;
 
50
    int size;
 
51
 
 
52
    struct adios_bp_buffer_struct_v1 b;
 
53
 
 
54
    struct adios_index_process_group_struct_v1 * old_pg_root;
 
55
    struct adios_index_var_struct_v1 * old_vars_root;
 
56
    struct adios_index_attribute_struct_v1 * old_attrs_root;
 
57
 
 
58
    uint64_t vars_start;
 
59
    uint64_t vars_header_size;
 
60
    uint16_t storage_targets;  // number of storage targets being used
 
61
};
 
62
 
 
63
#if COLLECT_METRICS
 
64
// see adios_adaptive_finalize for what each represents
 
65
struct timeval_writer
 
66
{
 
67
    struct timeval t;  // time value
 
68
    int pid;            // process id
 
69
};
 
70
 
 
71
static
 
72
int timeval_writer_compare (const void * left, const void * right)
 
73
{
 
74
    struct timeval_writer * l = (struct timeval_writer *) left;
 
75
    struct timeval_writer * r = (struct timeval_writer *) right;
 
76
 
 
77
    if (l->pid < r->pid) return -1;
 
78
    if (l->pid == r->pid) return 0;
 
79
    if (l->pid > r->pid) return 1;
 
80
}
 
81
 
 
82
struct timing_metrics
 
83
{
 
84
    struct timeval t0, t5, t6, t7, t8, t11, t12, t13;
 
85
    struct timeval t14, t16, t19, t20, t21, t22, t23;
 
86
    struct timeval t27, t28;
 
87
 
 
88
    uint64_t write_count; // number used
 
89
    uint64_t write_size;  // number allocated
 
90
    struct timeval * t24;
 
91
};
 
92
 
 
93
static struct timing_metrics timing;
 
94
 
 
95
// Subtract the `struct timeval' values X and Y,
 
96
// storing the result in RESULT.
 
97
// Return 1 if the difference is negative, otherwise 0.
 
98
static int timeval_subtract (struct timeval * result
 
99
                            ,struct timeval * x, struct timeval * y
 
100
                            )
 
101
{
 
102
  // Perform the carry for the later subtraction by updating y.
 
103
  if (x->tv_usec < y->tv_usec)
 
104
  {
 
105
    int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;
 
106
    y->tv_usec -= 1000000 * nsec;
 
107
    y->tv_sec += nsec;
 
108
  }
 
109
  if (x->tv_usec - y->tv_usec > 1000000)
 
110
  {
 
111
    int nsec = (x->tv_usec - y->tv_usec) / 1000000;
 
112
    y->tv_usec += 1000000 * nsec;
 
113
    y->tv_sec -= nsec;
 
114
  }
 
115
 
 
116
  // Compute the time remaining to wait.
 
117
  // tv_usec is certainly positive.
 
118
  result->tv_sec = x->tv_sec - y->tv_sec;
 
119
  result->tv_usec = x->tv_usec - y->tv_usec;
 
120
 
 
121
  // Return 1 if result is negative.
 
122
  return x->tv_sec < y->tv_sec;
 
123
}
 
124
 
 
125
static void print_metric (FILE * f, struct timing_metrics * t, int iteration, int rank, int size, int sub_coord_rank);
 
126
 
 
127
static
 
128
void print_metrics (struct adios_MPI_data_struct * md, int iteration)
 
129
{
 
130
    MPI_Barrier (md->group_comm);
 
131
    if (md->rank == 0)
 
132
    {
 
133
        int i;
 
134
        struct timing_metrics * t;
 
135
        int * sub_coord_ranks;
 
136
 
 
137
        t = malloc (sizeof (struct timing_metrics) * md->size);
 
138
        sub_coord_ranks = malloc (sizeof (int) * md->size);
 
139
 
 
140
        memcpy (&t [0], &timing, sizeof (struct timing_metrics));
 
141
 
 
142
        // get the bulk data
 
143
        MPI_Gather (&timing, sizeof (struct timing_metrics), MPI_BYTE
 
144
                   ,t, sizeof (struct timing_metrics), MPI_BYTE
 
145
                   ,0, md->group_comm
 
146
                   );
 
147
 
 
148
        // get the write timing
 
149
        int * index_sizes = malloc (4 * md->size);
 
150
        int * index_offsets = malloc (4 * md->size);
 
151
        uint32_t total_size = 0;
 
152
        char * recv_buffer = 0;
 
153
        char * recv_buffer1 = 0;
 
154
        char * recv_buffer2 = 0;
 
155
        char * recv_buffer3 = 0;
 
156
        for (i = 0; i < md->size; i++)
 
157
        {
 
158
            index_sizes [i] = t [i].write_count * sizeof (struct timeval);
 
159
            index_offsets [i] = total_size;
 
160
            total_size += t [i].write_count * sizeof (struct timeval);
 
161
        }
 
162
 
 
163
        recv_buffer = malloc (total_size + 1);
 
164
 
 
165
        MPI_Gatherv (t [0].t24, 0, MPI_BYTE
 
166
                    ,recv_buffer, index_sizes, index_offsets, MPI_BYTE
 
167
                    ,0, md->group_comm
 
168
                    );
 
169
 
 
170
        t [0].t24 = timing.t24;
 
171
        for (i = 1; i < md->size; i++)
 
172
        {
 
173
            t [i].t24 = (struct timeval *) (recv_buffer + index_offsets [i]);
 
174
        }
 
175
 
 
176
        // print the detailed metrics
 
177
        FILE * f = fopen ("adios_metrics", "a");
 
178
        for (i = 0; i < md->size; i++)
 
179
        {
 
180
            print_metric (f, &t [i], iteration, i, md->size
 
181
                         ,sub_coord_ranks [i]
 
182
                         );
 
183
        }
 
184
        fclose (f);
 
185
 
 
186
        free (sub_coord_ranks);
 
187
        free (t);
 
188
        free (recv_buffer);
 
189
        free (recv_buffer1);
 
190
        free (recv_buffer2);
 
191
        free (recv_buffer3);
 
192
        free (index_sizes);
 
193
        free (index_offsets);
 
194
    }
 
195
    else
 
196
    {
 
197
        // send the bulk data
 
198
        MPI_Gather (&timing, sizeof (struct timing_metrics), MPI_BYTE
 
199
                   ,&timing, sizeof (struct timing_metrics), MPI_BYTE
 
200
                   ,0, md->group_comm
 
201
                   );
 
202
 
 
203
        // send the write timing
 
204
        MPI_Gatherv (timing.t24, timing.write_count * sizeof (struct timeval)
 
205
                    ,MPI_BYTE
 
206
                    ,0, 0, 0, 0
 
207
                    ,0, md->group_comm
 
208
                    );
 
209
    }
 
210
    MPI_Barrier (md->group_comm);
 
211
}
 
212
 
 
213
static void print_metric (FILE * f, struct timing_metrics * t, int iteration, int rank, int size, int sub_coord_rank)
 
214
{
 
215
    struct timeval diff;
 
216
    if (rank == 0)
 
217
    {
 
218
        timeval_subtract (&diff, &t->t6, &t->t5);
 
219
        fprintf (f, "dd\t%2d\tMass file open:\t%02d.%06d\n"
 
220
                ,iteration, diff.tv_sec, diff.tv_usec);
 
221
 
 
222
        timeval_subtract (&diff, &t->t5, &t->t16);
 
223
        fprintf (f, "ee\t%2d\tBuild file offsets:\t%02d.%06d\n"
 
224
                ,iteration, diff.tv_sec, diff.tv_usec);
 
225
 
 
226
        timeval_subtract (&diff, &t->t11, &t->t0);
 
227
        fprintf (f, "hh\t%2d\tTotal time:\t%02d.%06d\n"
 
228
                ,iteration, diff.tv_sec, diff.tv_usec);
 
229
    }
 
230
    
 
231
    timeval_subtract (&diff, &t->t13, &t->t12);
 
232
    fprintf (f, "ii\t%2d\tLocal index creation:\t%6d\t%02d.%06d\n"
 
233
            ,iteration, rank, diff.tv_sec, diff.tv_usec);
 
234
    
 
235
    timeval_subtract (&diff, &t->t22, &t->t21);
 
236
    fprintf (f, "kk\t%2d\tshould buffer time:\t%6d\t%02d.%06d\n"
 
237
            ,iteration, rank, diff.tv_sec, diff.tv_usec);
 
238
    
 
239
    timeval_subtract (&diff, &t->t19, &t->t23);
 
240
    fprintf (f, "ll\t%2d\tclose startup time:\t%6d\t%02d.%06d\n"
 
241
            ,iteration, rank, diff.tv_sec, diff.tv_usec);
 
242
    
 
243
    timeval_subtract (&diff, &t->t19, &t->t0);
 
244
    fprintf (f, "mm\t%2d\tsetup time:\t%6d\t%02d.%06d\n"
 
245
            ,iteration, rank, diff.tv_sec, diff.tv_usec);
 
246
    
 
247
    timeval_subtract (&diff, &t->t14, &t->t20);
 
248
    fprintf (f, "nn\t%2d\tcleanup time:\t%6d\t%02d.%06d\n"
 
249
            ,iteration, rank, diff.tv_sec, diff.tv_usec);
 
250
    
 
251
    timeval_subtract (&diff, &t->t21, &t->t0);
 
252
    fprintf (f, "oo\t%2d\topen->should_buffer time:\t%6d\t%02d.%06d\n"
 
253
            ,iteration, rank, diff.tv_sec, diff.tv_usec);
 
254
    
 
255
    timeval_subtract (&diff, &(t->t24 [0]), &t->t21);
 
256
    fprintf (f, "pp\t%2d\tshould_buffer->write1 time:\t%6d\t%02d.%06d\n"
 
257
            ,iteration, rank, diff.tv_sec, diff.tv_usec);
 
258
 
 
259
    fprintf (f, "aa\t%2d\tprocess write time:\t%6d\t%02d.%06d\n"
 
260
            ,iteration, rank, t->t8.tv_sec, t->t8.tv_usec);
 
261
 
 
262
    timeval_subtract (&diff, &t->t11, &t->t23);
 
263
    fprintf (f, "vv\t%2d\tclose start to shutdown time:\t%6d\t%02d.%06d\n"
 
264
           ,iteration, rank, (int)diff.tv_sec, (int)diff.tv_usec);
 
265
 
 
266
    timeval_subtract (&diff, &t->t28, &t->t23);
 
267
    fprintf (f, "ww\t%2d\tclose total time:\t%6d\t%02d.%06d\n"
 
268
            ,iteration, rank, (int)diff.tv_sec, (int)diff.tv_usec);
 
269
}
 
270
#endif
 
271
 
 
272
 
 
273
void adios_mpi_init (const PairStruct * parameters
 
274
                    ,struct adios_method_struct * method
 
275
                    )
 
276
{
 
277
    struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
 
278
                                                    method->method_data;
 
279
    if (!adios_mpi_initialized)
 
280
    {
 
281
        adios_mpi_initialized = 1;
 
282
    }
 
283
    method->method_data = malloc (sizeof (struct adios_MPI_data_struct));
 
284
    md = (struct adios_MPI_data_struct *) method->method_data;
 
285
    md->fh = 0;
 
286
    md->req = 0;
 
287
    memset (&md->status, 0, sizeof (MPI_Status));
 
288
    MPI_Info_create (&md->info);
 
289
    MPI_Info_set (md->info, "romio_ds_read", "disable");
 
290
    MPI_Info_set (md->info, "romio_ds_write", "disable");
 
291
    MPI_Info_set (md->info, "ind_wr_buffer_size", "16777216");
 
292
    md->rank = 0;
 
293
    md->size = 0;
 
294
    md->group_comm = method->init_comm; // unused here, adios_open will set the current comm
 
295
    md->old_pg_root = 0;
 
296
    md->old_vars_root = 0;
 
297
    md->old_attrs_root = 0;
 
298
    md->vars_start = 0;
 
299
    md->vars_header_size = 0;
 
300
    md->storage_targets = 0;
 
301
 
 
302
    adios_buffer_struct_init (&md->b);
 
303
#if COLLECT_METRICS
 
304
    // init the pointer for the first go around avoiding the bad free in open
 
305
    timing.t24 = 0;
 
306
#endif
 
307
}
 
308
 
 
309
int adios_mpi_open (struct adios_file_struct * fd
 
310
                   ,struct adios_method_struct * method, MPI_Comm comm
 
311
                   )
 
312
{
 
313
    struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
 
314
                                                    method->method_data;
 
315
 
 
316
#if COLLECT_METRICS
 
317
    gettimeofday (&timing.t0, NULL); // only used on rank == size - 1, but we don't
 
318
                              // have the comm yet to get the rank/size
 
319
#endif
 
320
    adios_buffer_struct_clear (&md->b);
 
321
 
 
322
    md->group_comm = comm;
 
323
    if (md->group_comm != MPI_COMM_NULL)
 
324
    {
 
325
        MPI_Comm_rank (md->group_comm, &md->rank);
 
326
        MPI_Comm_size (md->group_comm, &md->size);
 
327
    }
 
328
    fd->group->process_id = md->rank;
 
329
 
 
330
#if COLLECT_METRICS
 
331
    timing.write_count = 0;
 
332
    timing.write_size = 0;
 
333
    if (timing.t24) free (timing.t24);
 
334
    timing.t24 = 0;
 
335
#endif
 
336
 
 
337
    // we have to wait for the group_size (should_buffer) 
 
338
    // to calculate stripe sizes from output sizes of the processes
 
339
    // before we can do an open for any of the modes
 
340
 
 
341
    return 1;
 
342
}
 
343
 
 
344
static
 
345
void build_offsets (struct adios_bp_buffer_struct_v1 * b
 
346
                   ,MPI_Offset * offsets, int size, char * group_name
 
347
                   ,struct adios_index_process_group_struct_v1 * pg_root
 
348
                   )
 
349
{
 
350
    while (pg_root)
 
351
    {
 
352
        if (!strcasecmp (pg_root->group_name, group_name))
 
353
        {
 
354
            MPI_Offset size = 0;
 
355
 
 
356
            if (pg_root->next)
 
357
            {
 
358
                size = pg_root->next->offset_in_file - pg_root->offset_in_file;
 
359
            }
 
360
            else
 
361
            {
 
362
                size = b->pg_index_offset - pg_root->offset_in_file;
 
363
            }
 
364
 
 
365
            offsets [pg_root->process_id * 3] = pg_root->offset_in_file;
 
366
            offsets [pg_root->process_id * 3 + 1] = size;
 
367
            offsets [pg_root->process_id * 3 + 2] = b->version;
 
368
        }
 
369
 
 
370
        pg_root = pg_root->next;
 
371
    }
 
372
}
 
373
 
 
374
static void
 
375
adios_mpi_build_file_offset(struct adios_MPI_data_struct *md,
 
376
                            struct adios_file_struct *fd, char *name)
 
377
{
 
378
    if (md->group_comm != MPI_COMM_NULL)
 
379
    {
 
380
        if (md->rank == 0)
 
381
        {
 
382
            // make one space for offset and one for size
 
383
            MPI_Offset * offsets = malloc(sizeof (MPI_Offset)
 
384
                                           * md->size);
 
385
            int i;
 
386
 
 
387
            offsets [0] = fd->write_size_bytes;
 
388
// mpixlc_r on Eugene doesn't support 64 bit mode. Therefore the following may have problem
 
389
// on Eugene for large data size since MPI_LONG_LONG is 32bit 
 
390
            MPI_Gather (&(fd->write_size_bytes), 1, MPI_LONG_LONG
 
391
                       ,offsets, 1, MPI_LONG_LONG
 
392
                       ,0, md->group_comm);
 
393
 
 
394
// top section: make things a consistent stripe size
 
395
// bottom section: just pack the file
 
396
#if 0
 
397
            // find the largest and use that as a basis for stripe
 
398
            // size for each process writing
 
399
            uint64_t biggest_size = 0;
 
400
            for (i = 0; i < md->size; i++)
 
401
            {
 
402
                if (offsets [i] > biggest_size)
 
403
                    biggest_size = offsets [i];
 
404
            }
 
405
            // now round up to the next stripe size increment (Lustre: 64 KB)
 
406
#define STRIPE_INCREMENT (64 * 1024)
 
407
            // (according to the Lustre reps, use 1 MB instead of 64 KB?)
 
408
//#define STRIPE_INCREMENT (1024 * 1024)
 
409
            if (biggest_size % (STRIPE_INCREMENT))
 
410
            {
 
411
                biggest_size = (  ((biggest_size / STRIPE_INCREMENT) + 1)
 
412
                                * STRIPE_INCREMENT
 
413
                               );
 
414
            }
 
415
            // also round up the base_offset, just in case
 
416
            if (fd->base_offset % (STRIPE_INCREMENT))
 
417
            {
 
418
                fd->base_offset = (  ((fd->base_offset / STRIPE_INCREMENT) + 1)
 
419
                                   * STRIPE_INCREMENT
 
420
                                  );
 
421
            }
 
422
#undef STRIPE_INCREMENT
 
423
            offsets [0 + 0] = fd->base_offset;
 
424
            offsets [0 + 1] = biggest_size;
 
425
 
 
426
            for (i = 1; i < md->size; i++)
 
427
            {
 
428
                offsets [i * 2 + 0] = offsets [(i - 1) * 2 + 0] + biggest_size;
 
429
                offsets [i * 2 + 1] = biggest_size;
 
430
            }
 
431
            md->b.pg_index_offset =   offsets [(md->size - 1) * 2 + 0]
 
432
                                    + biggest_size;
 
433
#else
 
434
 
 
435
            uint64_t last_offset = offsets [0];
 
436
            offsets [0] = fd->base_offset;
 
437
            for (i = 1; i < md->size; i++)
 
438
            {
 
439
                uint64_t this_offset = offsets [i];
 
440
                offsets [i] = offsets [i - 1] + last_offset;
 
441
                last_offset = this_offset;
 
442
            }
 
443
            md->b.pg_index_offset =   offsets [md->size - 1]
 
444
                                    + last_offset;
 
445
#endif
 
446
            MPI_Scatter (offsets, 1, MPI_LONG_LONG
 
447
                        ,MPI_IN_PLACE, 1, MPI_LONG_LONG
 
448
                        ,0, md->group_comm
 
449
                        );
 
450
            fd->base_offset = offsets [0];
 
451
            fd->pg_start_in_file = fd->base_offset;
 
452
            free (offsets);
 
453
        }
 
454
        else
 
455
        {
 
456
            MPI_Offset offset[1];
 
457
            offset[0] = fd->write_size_bytes;
 
458
 
 
459
            MPI_Gather (offset, 1, MPI_LONG_LONG
 
460
                       ,0, 1, MPI_LONG_LONG
 
461
                       ,0, md->group_comm
 
462
                       );
 
463
 
 
464
            MPI_Scatter (0, 1, MPI_LONG_LONG
 
465
                        ,offset, 1, MPI_LONG_LONG
 
466
                        ,0, md->group_comm
 
467
                        );
 
468
            fd->base_offset = offset [0];
 
469
            fd->pg_start_in_file = fd->base_offset;
 
470
        }
 
471
    }
 
472
    else
 
473
    {
 
474
        md->b.pg_index_offset = fd->write_size_bytes;
 
475
    }
 
476
}
 
477
 
 
478
#if 0
 
479
// LUSTRE Structure
 
480
// from /usr/include/lustre/lustre_user.h
 
481
#define LUSTRE_SUPER_MAGIC 0x0BD00BD0
 
482
#  define LOV_USER_MAGIC 0x0BD10BD0
 
483
#  define LL_IOC_LOV_SETSTRIPE  _IOW ('f', 154, long)
 
484
#  define LL_IOC_LOV_GETSTRIPE  _IOW ('f', 155, long)
 
485
struct lov_user_ost_data {           // per-stripe data structure
 
486
        uint64_t l_object_id;        // OST object ID
 
487
        uint64_t l_object_gr;        // OST object group (creating MDS number)
 
488
        uint32_t l_ost_gen;          // generation of this OST index
 
489
        uint32_t l_ost_idx;          // OST index in LOV
 
490
} __attribute__((packed));
 
491
struct lov_user_md {                 // LOV EA user data (host-endian)
 
492
        uint32_t lmm_magic;          // magic number = LOV_USER_MAGIC_V1
 
493
        uint32_t lmm_pattern;        // LOV_PATTERN_RAID0, LOV_PATTERN_RAID1
 
494
        uint64_t lmm_object_id;      // LOV object ID
 
495
        uint64_t lmm_object_gr;      // LOV object group
 
496
        uint32_t lmm_stripe_size;    // size of stripe in bytes
 
497
        uint16_t lmm_stripe_count;   // num stripes in use for this object
 
498
        uint16_t lmm_stripe_offset;  // starting stripe offset in lmm_objects
 
499
        struct lov_user_ost_data  lmm_objects[0]; // per-stripe data
 
500
} __attribute__((packed));
 
501
 
 
502
// do the magic ioctl calls to set Lustre's stripe size
 
503
static void set_stripe_size (struct adios_file_struct * fd
 
504
                            ,struct adios_MPI_data_struct * md
 
505
                            ,const char * filename
 
506
                            )
 
507
{
 
508
    struct statfs fsbuf;
 
509
    int err;
 
510
 
 
511
    // Note: Since each file might have different write_buffer,
 
512
    // So we will reset write_buffer even buffer_size != 0
 
513
    err = statfs (filename, &fsbuf);
 
514
    if (!err && fsbuf.f_type == LUSTRE_SUPER_MAGIC)
 
515
    {
 
516
        int f;
 
517
        int old_mask;
 
518
        int perm;
 
519
 
 
520
        old_mask = umask (022);
 
521
        umask (old_mask);
 
522
        perm = old_mask ^ 0666;
 
523
 
 
524
        f = open (filename, O_RDONLY | O_CREAT, perm);
 
525
        if (f != -1)
 
526
        {
 
527
            struct lov_user_md lum;
 
528
            lum.lmm_magic = LOV_USER_MAGIC;
 
529
            lum.lmm_stripe_size = md->biggest_size;
 
530
            err = ioctl (f, LL_IOC_LOV_SETSTRIPE, (void *) &lum);
 
531
            // if err != 0, the must not be Lustre
 
532
            lum.lmm_stripe_count = 0;
 
533
            err = ioctl (f, LL_IOC_LOV_GETSTRIPE, (void *) &lum);
 
534
            // if err != 0, the must not be Lustre
 
535
            if (err == 0)
 
536
            {
 
537
                md->storage_targets = lum.lmm_stripe_count;
 
538
            }
 
539
            close (f);
 
540
        }
 
541
    }
 
542
}
 
543
#endif
 
544
 
 
545
enum ADIOS_FLAG adios_mpi_should_buffer (struct adios_file_struct * fd
 
546
                                        ,struct adios_method_struct * method
 
547
                                        )
 
548
{
 
549
    int i;
 
550
    struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
 
551
                                                      method->method_data;
 
552
    char * name;
 
553
    int err;
 
554
    int flag;    // used for coordinating the MPI_File_open
 
555
 
 
556
    int previous;
 
557
    int current;
 
558
    int next;
 
559
 
 
560
#if COLLECT_METRICS
 
561
    gettimeofday (&timing.t21, NULL);
 
562
#endif
 
563
 
 
564
    name = malloc (strlen (method->base_path) + strlen (fd->name) + 1);
 
565
    sprintf (name, "%s%s", method->base_path, fd->name);
 
566
 
 
567
    if (md->rank == md->size - 1)
 
568
        next = -1;
 
569
    else
 
570
        next = md->rank + 1;
 
571
    previous = md->rank - 1;
 
572
    current = md->rank;
 
573
 
 
574
    fd->base_offset = 0;
 
575
 
 
576
    switch (fd->mode)
 
577
    {
 
578
        case adios_mode_read:
 
579
        {
 
580
            if (md->group_comm == MPI_COMM_NULL || md->rank == 0)
 
581
            {
 
582
                err = MPI_File_open (MPI_COMM_SELF, name, MPI_MODE_RDONLY
 
583
                                    ,md->info, &md->fh
 
584
                                    );
 
585
                if (err != MPI_SUCCESS)
 
586
                {
 
587
                    char e [MPI_MAX_ERROR_STRING];
 
588
                    int len = 0;
 
589
                    memset (e, 0, MPI_MAX_ERROR_STRING);
 
590
                    MPI_Error_string (err, e, &len);
 
591
                    adios_error (err_file_open_error, 
 
592
                                 "MPI method: open read failed for %s: '%s'\n",
 
593
                                 name, e);
 
594
                    free (name);
 
595
 
 
596
                    return adios_flag_no;
 
597
                }
 
598
 
 
599
                MPI_Offset file_size;
 
600
                MPI_File_get_size (md->fh, &file_size);
 
601
                md->b.file_size = file_size;
 
602
 
 
603
                adios_init_buffer_read_version (&md->b);
 
604
                MPI_File_seek (md->fh, md->b.file_size - md->b.length
 
605
                              ,MPI_SEEK_SET
 
606
                              );
 
607
                MPI_File_read (md->fh, md->b.buff, md->b.length, MPI_BYTE
 
608
                              ,&md->status
 
609
                              );
 
610
                adios_parse_version (&md->b, &md->b.version);
 
611
 
 
612
                adios_init_buffer_read_index_offsets (&md->b);
 
613
                // already in the buffer
 
614
                adios_parse_index_offsets_v1 (&md->b);
 
615
 
 
616
                adios_init_buffer_read_process_group_index (&md->b);
 
617
                MPI_File_seek (md->fh, md->b.pg_index_offset
 
618
                              ,MPI_SEEK_SET
 
619
                              );
 
620
                MPI_File_read (md->fh, md->b.buff, md->b.pg_size, MPI_BYTE
 
621
                              ,&md->status
 
622
                              );
 
623
                adios_parse_process_group_index_v1 (&md->b
 
624
                                                   ,&md->old_pg_root
 
625
                                                   );
 
626
 
 
627
#if 1
 
628
                adios_init_buffer_read_vars_index (&md->b);
 
629
                MPI_File_seek (md->fh, md->b.vars_index_offset
 
630
                              ,MPI_SEEK_SET
 
631
                              );
 
632
                MPI_File_read (md->fh, md->b.buff, md->b.vars_size, MPI_BYTE
 
633
                              ,&md->status
 
634
                              );
 
635
                adios_parse_vars_index_v1 (&md->b, &md->old_vars_root);
 
636
 
 
637
                adios_init_buffer_read_attributes_index (&md->b);
 
638
                MPI_File_seek (md->fh, md->b.attrs_index_offset
 
639
                              ,MPI_SEEK_SET
 
640
                              );
 
641
                MPI_File_read (md->fh, md->b.buff, md->b.attrs_size, MPI_BYTE
 
642
                              ,&md->status
 
643
                              );
 
644
                adios_parse_attributes_index_v1 (&md->b, &md->old_attrs_root);
 
645
#endif
 
646
 
 
647
                fd->base_offset = md->b.end_of_pgs;
 
648
            }
 
649
 
 
650
            if (   md->group_comm != MPI_COMM_NULL
 
651
                && md->group_comm != MPI_COMM_SELF
 
652
               )
 
653
            {
 
654
                if (md->rank == 0)
 
655
                {
 
656
                    MPI_Offset * offsets = malloc (  sizeof (MPI_Offset)
 
657
                                                   * md->size * 3
 
658
                                                  );
 
659
                    memset (offsets, 0, sizeof (MPI_Offset) * md->size * 3);
 
660
 
 
661
                    // go through the pg index to build the offsets array
 
662
                    build_offsets (&md->b, offsets, md->size
 
663
                                  ,fd->group->name, md->old_pg_root
 
664
                                  );
 
665
                    MPI_Scatter (offsets, 3, MPI_LONG_LONG
 
666
                                ,MPI_IN_PLACE, 3, MPI_LONG_LONG
 
667
                                ,0, md->group_comm
 
668
                                );
 
669
                    md->b.read_pg_offset = offsets [0];
 
670
                    md->b.read_pg_size = offsets [1];
 
671
                    free (offsets);
 
672
                }
 
673
                else
 
674
                {
 
675
                    MPI_Offset offset [3];
 
676
                    offset [0] = offset [1] = offset [2] = 0;
 
677
 
 
678
                    MPI_Scatter (0, 3, MPI_LONG_LONG
 
679
                                ,offset, 3, MPI_LONG_LONG
 
680
                                ,0, md->group_comm
 
681
                                );
 
682
 
 
683
                    md->b.read_pg_offset = offset [0];
 
684
                    md->b.read_pg_size = offset [1];
 
685
                    md->b.version = offset [2];
 
686
                }
 
687
            }
 
688
 
 
689
            // cascade the opens to avoid trashing the metadata server
 
690
            if (previous == -1)
 
691
            {
 
692
                // note rank 0 is already open
 
693
                // don't open it again here
 
694
 
 
695
                if (next != -1)
 
696
                {
 
697
                    MPI_Isend (&flag, 1, MPI_INT, next, current
 
698
                              ,md->group_comm, &md->req
 
699
                              );
 
700
                }
 
701
            }
 
702
            else
 
703
            {
 
704
                MPI_Recv (&flag, 1, MPI_INT, previous, previous
 
705
                         ,md->group_comm, &md->status
 
706
                         );
 
707
                if (next != -1)
 
708
                {
 
709
                    MPI_Isend (&flag, 1, MPI_INT, next, current
 
710
                              ,md->group_comm, &md->req
 
711
                              );
 
712
                }
 
713
                err = MPI_File_open (MPI_COMM_SELF, name
 
714
                                    ,MPI_MODE_RDONLY
 
715
                                    ,md->info
 
716
                                    ,&md->fh
 
717
                                    );
 
718
            }
 
719
 
 
720
            if (err != MPI_SUCCESS)
 
721
            {
 
722
                char e [MPI_MAX_ERROR_STRING];
 
723
                int len = 0;
 
724
                memset (e, 0, MPI_MAX_ERROR_STRING);
 
725
                MPI_Error_string (err, e, &len);
 
726
                adios_error (err_file_open_error, 
 
727
                        "MPI method, rank %d: open read failed for %s: '%s'\n", 
 
728
                        md->rank, name, e);
 
729
                free (name);
 
730
 
 
731
                return adios_flag_no;
 
732
            }
 
733
 
 
734
            break;
 
735
        }
 
736
 
 
737
        case adios_mode_write:
 
738
        {
 
739
            fd->base_offset = 0;
 
740
            fd->pg_start_in_file = 0;
 
741
#if COLLECT_METRICS                     
 
742
            gettimeofday (&timing.t16, NULL);
 
743
#endif
 
744
 
 
745
            // figure out the offsets and create the file with proper striping
 
746
            // before the MPI_File_open is called
 
747
            adios_mpi_build_file_offset (md, fd, name);
 
748
            //set_stripe_size (fd, md, name);
 
749
#if COLLECT_METRICS
 
750
            gettimeofday (&timing.t5, NULL);
 
751
#endif
 
752
 
 
753
            // cascade the opens to avoid trashing the metadata server
 
754
            if (previous == -1)
 
755
            {
 
756
                MPI_File_delete (name, MPI_INFO_NULL);  // make sure clean
 
757
 
 
758
                err = MPI_File_open (MPI_COMM_SELF, name
 
759
                                    ,MPI_MODE_WRONLY | MPI_MODE_CREATE
 
760
                                    ,md->info
 
761
                                    ,&md->fh
 
762
                                    );
 
763
                if (next != -1)
 
764
                {
 
765
                    MPI_Isend (&flag, 1, MPI_INT, next, current
 
766
                              ,md->group_comm, &md->req
 
767
                              );
 
768
                }
 
769
            }
 
770
            else
 
771
            {
 
772
                MPI_Recv (&flag, 1, MPI_INT, previous, previous
 
773
                         ,md->group_comm, &md->status
 
774
                         );
 
775
                if (next != -1)
 
776
                {
 
777
                    MPI_Isend (&flag, 1, MPI_INT, next, current
 
778
                              ,md->group_comm, &md->req
 
779
                              );
 
780
                }
 
781
                err = MPI_File_open (MPI_COMM_SELF, name
 
782
                                    ,MPI_MODE_WRONLY
 
783
                                    ,md->info
 
784
                                    ,&md->fh
 
785
                                    );
 
786
            }
 
787
 
 
788
            if (err != MPI_SUCCESS)
 
789
            {
 
790
                char e [MPI_MAX_ERROR_STRING];
 
791
                int len = 0;
 
792
                memset (e, 0, MPI_MAX_ERROR_STRING);
 
793
                MPI_Error_string (err, e, &len);
 
794
                adios_error (err_file_open_error, 
 
795
                        "MPI method, rank %d: open write failed for %s: '%s'\n", 
 
796
                        md->rank, name, e);
 
797
                free (name);
 
798
 
 
799
                return adios_flag_no;
 
800
            }
 
801
#if COLLECT_METRICS
 
802
            gettimeofday (&timing.t6, NULL);
 
803
#endif
 
804
 
 
805
            break;
 
806
        }
 
807
 
 
808
        case adios_mode_append:
 
809
        case adios_mode_update:
 
810
        {
 
811
            int old_file = 1;
 
812
 
 
813
            if (md->group_comm == MPI_COMM_NULL || md->rank == 0)
 
814
            {
 
815
                err = MPI_File_open (MPI_COMM_SELF, name, MPI_MODE_RDONLY
 
816
                                    ,md->info, &md->fh
 
817
                                    );
 
818
 
 
819
                if (err != MPI_SUCCESS)
 
820
                {
 
821
                    old_file = 0;
 
822
                    MPI_File_close (&md->fh);
 
823
                    err = MPI_File_open (MPI_COMM_SELF, name
 
824
                                        ,MPI_MODE_WRONLY | MPI_MODE_CREATE
 
825
                                        ,md->info, &md->fh
 
826
                                        );
 
827
                    if (err != MPI_SUCCESS)
 
828
                    {
 
829
                        char e [MPI_MAX_ERROR_STRING];
 
830
                        int len = 0;
 
831
                        memset (e, 0, MPI_MAX_ERROR_STRING);
 
832
                        MPI_Error_string (err, e, &len);
 
833
                        adios_error (err_file_open_error, 
 
834
                                "MPI method, rank %d: open for append failed for %s: '%s'\n", 
 
835
                                md->rank, name, e);
 
836
                        free (name);
 
837
 
 
838
                        return adios_flag_no;
 
839
                    }
 
840
                }
 
841
                MPI_Bcast (&old_file, 1, MPI_INT, 0, md->group_comm);
 
842
            }
 
843
            else
 
844
            {
 
845
                if (md->group_comm != MPI_COMM_NULL)
 
846
                    MPI_Bcast (&old_file, 1, MPI_INT, 0, md->group_comm);
 
847
            }
 
848
 
 
849
            if (old_file)
 
850
            {
 
851
                if (md->group_comm == MPI_COMM_NULL || md->rank == 0)
 
852
                {
 
853
                    if (err != MPI_SUCCESS)
 
854
                    {
 
855
                        md->b.file_size = 0;
 
856
                    }
 
857
                    else
 
858
                    {
 
859
                        MPI_Offset file_size;
 
860
                        MPI_File_get_size (md->fh, &file_size);
 
861
                        md->b.file_size = file_size;
 
862
                    }
 
863
 
 
864
                    adios_init_buffer_read_version (&md->b);
 
865
                    MPI_File_seek (md->fh, md->b.file_size - md->b.length
 
866
                                  ,MPI_SEEK_SET
 
867
                                  );
 
868
                    MPI_File_read (md->fh, md->b.buff, md->b.length, MPI_BYTE
 
869
                                  ,&md->status
 
870
                                  );
 
871
                    adios_parse_version (&md->b, &md->b.version);
 
872
 
 
873
                    adios_init_buffer_read_index_offsets (&md->b);
 
874
                    // already in the buffer
 
875
                    adios_parse_index_offsets_v1 (&md->b);
 
876
 
 
877
                    adios_init_buffer_read_process_group_index (&md->b);
 
878
                    MPI_File_seek (md->fh, md->b.pg_index_offset
 
879
                                  ,MPI_SEEK_SET
 
880
                                  );
 
881
                    MPI_File_read (md->fh, md->b.buff, md->b.pg_size, MPI_BYTE
 
882
                                  ,&md->status
 
883
                                  );
 
884
 
 
885
                    adios_parse_process_group_index_v1 (&md->b
 
886
                                                       ,&md->old_pg_root
 
887
                                                       );
 
888
 
 
889
                    // find the largest time index so we can append properly
 
890
                    struct adios_index_process_group_struct_v1 * p;
 
891
                    uint32_t max_time_index = 0;
 
892
                    p = md->old_pg_root;
 
893
                    while (p)
 
894
                    {
 
895
                        if (p->time_index > max_time_index)
 
896
                            max_time_index = p->time_index;
 
897
                        p = p->next;
 
898
                    }
 
899
                    fd->group->time_index = ++max_time_index;
 
900
                    MPI_Bcast (&fd->group->time_index, 1, MPI_INT, 0
 
901
                              ,md->group_comm
 
902
                              );
 
903
 
 
904
                    adios_init_buffer_read_vars_index (&md->b);
 
905
                    MPI_File_seek (md->fh, md->b.vars_index_offset
 
906
                                  ,MPI_SEEK_SET
 
907
                                  );
 
908
                    MPI_File_read (md->fh, md->b.buff, md->b.vars_size, MPI_BYTE
 
909
                                  ,&md->status
 
910
                                  );
 
911
                    adios_parse_vars_index_v1 (&md->b, &md->old_vars_root);
 
912
 
 
913
                    adios_init_buffer_read_attributes_index (&md->b);
 
914
                    MPI_File_seek (md->fh, md->b.attrs_index_offset
 
915
                                  ,MPI_SEEK_SET
 
916
                                  );
 
917
                    MPI_File_read (md->fh, md->b.buff, md->b.attrs_size
 
918
                                  ,MPI_BYTE, &md->status
 
919
                                  );
 
920
                    adios_parse_attributes_index_v1 (&md->b
 
921
                                                    ,&md->old_attrs_root
 
922
                                                    );
 
923
 
 
924
                    fd->base_offset = md->b.end_of_pgs;
 
925
                    fd->pg_start_in_file = fd->base_offset;
 
926
                }
 
927
                else
 
928
                {
 
929
                    fd->base_offset = 0;
 
930
                    fd->pg_start_in_file = 0;
 
931
                    MPI_Bcast (&fd->group->time_index, 1, MPI_INT, 0
 
932
                              ,md->group_comm
 
933
                              );
 
934
                }
 
935
 
 
936
                MPI_File_close (&md->fh);
 
937
            }
 
938
            else
 
939
            {
 
940
                fd->base_offset = 0;
 
941
                fd->pg_start_in_file = 0;
 
942
 
 
943
                if (md->rank == 0)
 
944
                    MPI_File_close (&md->fh);
 
945
            }
 
946
 
 
947
            // figure out the offsets and create the file with proper striping
 
948
            // before the MPI_File_open is called
 
949
            adios_mpi_build_file_offset (md, fd, name);
 
950
            //set_stripe_size (fd, md, name);
 
951
 
 
952
            // cascade the opens to avoid trashing the metadata server
 
953
            if (previous == -1)
 
954
            {
 
955
                // we know it exists, because we created it if it didn't
 
956
                // when reading the old file so can just open wronly
 
957
                // but adding the create for consistency with write mode
 
958
                // so it is easier to merge write/append later
 
959
                err = MPI_File_open (MPI_COMM_SELF, name
 
960
                                    ,MPI_MODE_WRONLY | MPI_MODE_CREATE
 
961
                                    ,md->info
 
962
                                    ,&md->fh
 
963
                                    );
 
964
                if (next != -1)
 
965
                {
 
966
                    MPI_Isend (&flag, 1, MPI_INT, next, current
 
967
                              ,md->group_comm, &md->req
 
968
                              );
 
969
                }
 
970
            }
 
971
            else
 
972
            {
 
973
                MPI_Recv (&flag, 1, MPI_INT, previous, previous
 
974
                         ,md->group_comm, &md->status
 
975
                         );
 
976
                if (next != -1)
 
977
                {
 
978
                    MPI_Isend (&flag, 1, MPI_INT, next, current
 
979
                              ,md->group_comm, &md->req
 
980
                              );
 
981
                }
 
982
                err = MPI_File_open (MPI_COMM_SELF, name
 
983
                                    ,MPI_MODE_WRONLY
 
984
                                    ,md->info
 
985
                                    ,&md->fh
 
986
                                    );
 
987
            }
 
988
 
 
989
            if (err != MPI_SUCCESS)
 
990
            {
 
991
                char e [MPI_MAX_ERROR_STRING];
 
992
                int len = 0;
 
993
                memset (e, 0, MPI_MAX_ERROR_STRING);
 
994
                MPI_Error_string (err, e, &len);
 
995
                adios_error (err_file_open_error, 
 
996
                        "MPI method, rank %d: open for append failed for %s: '%s'\n", 
 
997
                        md->rank, name, e);
 
998
                free (name);
 
999
 
 
1000
                return adios_flag_no;
 
1001
            }
 
1002
 
 
1003
            break;
 
1004
        }
 
1005
 
 
1006
        default:
 
1007
        {
 
1008
            adios_error (err_invalid_file_mode, 
 
1009
                         "MPI method: Unknown file mode requested: %d\n", 
 
1010
                         fd->mode);
 
1011
 
 
1012
            free (name);
 
1013
 
 
1014
            return adios_flag_no;
 
1015
        }
 
1016
    }
 
1017
 
 
1018
    free (name);
 
1019
 
 
1020
    if (fd->shared_buffer == adios_flag_no && fd->mode != adios_mode_read)
 
1021
    {
 
1022
        int err;
 
1023
        // write the process group header
 
1024
        adios_write_process_group_header_v1 (fd, fd->write_size_bytes);
 
1025
 
 
1026
        MPI_File_seek (md->fh, fd->base_offset, MPI_SEEK_SET);
 
1027
#if 0
 
1028
        err = MPI_File_write (md->fh, fd->buffer, fd->bytes_written, MPI_BYTE
 
1029
                             ,&md->status
 
1030
                             );
 
1031
#endif
 
1032
        {
 
1033
            uint64_t total_written = 0;
 
1034
            uint64_t to_write = fd->bytes_written;
 
1035
            int write_len = 0;
 
1036
            int count;
 
1037
            char * buf_ptr = fd->buffer;
 
1038
            while (total_written < fd->bytes_written)
 
1039
            {
 
1040
                write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
 
1041
                err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
 
1042
                MPI_Get_count(&md->status, MPI_BYTE, &count);
 
1043
                if (count != write_len)
 
1044
                {
 
1045
                    err = count;
 
1046
                    break;
 
1047
                }
 
1048
                total_written += count;
 
1049
                buf_ptr += count;
 
1050
                to_write -= count;
 
1051
                //err = total_written;
 
1052
            }
 
1053
        }
 
1054
        if (err != MPI_SUCCESS)
 
1055
        {
 
1056
            char e [MPI_MAX_ERROR_STRING];
 
1057
            int len = 0;
 
1058
            memset (e, 0, MPI_MAX_ERROR_STRING);
 
1059
            MPI_Error_string (err, e, &len);
 
1060
            adios_error (err_write_error, 
 
1061
                         "MPI method, rank %d: adios_group_size() failed to "
 
1062
                         "write header to %s: '%s'\n",
 
1063
                         md->rank, fd->name, e);
 
1064
            free (name);
 
1065
 
 
1066
            return adios_flag_no;
 
1067
        }
 
1068
 
 
1069
        int count;
 
1070
        MPI_Get_count (&md->status, MPI_BYTE, &count);
 
1071
        if (count != fd->bytes_written)
 
1072
        {
 
1073
            log_warn ("a:MPI method tried to write %llu, only wrote %llu\n",
 
1074
                      fd->bytes_written, count);
 
1075
        }
 
1076
        fd->base_offset += count;
 
1077
        fd->offset = 0;
 
1078
        fd->bytes_written = 0;
 
1079
        adios_shared_buffer_free (&md->b);
 
1080
 
 
1081
        // setup for writing vars
 
1082
        adios_write_open_vars_v1 (fd);
 
1083
        md->vars_start = fd->base_offset;
 
1084
        md->vars_header_size = fd->offset;
 
1085
        fd->base_offset += fd->offset;
 
1086
        MPI_File_seek (md->fh, md->vars_header_size, MPI_SEEK_CUR);
 
1087
        fd->offset = 0;
 
1088
        fd->bytes_written = 0;
 
1089
        adios_shared_buffer_free (&md->b);
 
1090
    }
 
1091
 
 
1092
#if COLLECT_METRICS
 
1093
    gettimeofday (&timing.t22, NULL);
 
1094
#endif
 
1095
    return fd->shared_buffer;
 
1096
}
 
1097
 
 
1098
void adios_mpi_write (struct adios_file_struct * fd
 
1099
                     ,struct adios_var_struct * v
 
1100
                     ,void * data
 
1101
                     ,struct adios_method_struct * method
 
1102
                     )
 
1103
{
 
1104
    struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
 
1105
                                                      method->method_data;
 
1106
 
 
1107
    if (v->got_buffer == adios_flag_yes)
 
1108
    {
 
1109
        if (data != v->data)  // if the user didn't give back the same thing
 
1110
        {
 
1111
            if (v->free_data == adios_flag_yes)
 
1112
            {
 
1113
                free (v->data);
 
1114
                adios_method_buffer_free (v->data_size);
 
1115
            }
 
1116
        }
 
1117
        else
 
1118
        {
 
1119
            // we already saved all of the info, so we're ok.
 
1120
            return;
 
1121
        }
 
1122
    }
 
1123
 
 
1124
    if (fd->shared_buffer == adios_flag_no)
 
1125
    {
 
1126
        int err;
 
1127
        // var payload sent for sizing information
 
1128
        adios_write_var_header_v1 (fd, v);
 
1129
 
 
1130
#if 0
 
1131
        err = MPI_File_write (md->fh, fd->buffer, fd->bytes_written
 
1132
                             ,MPI_BYTE, &md->status
 
1133
                             );
 
1134
#endif
 
1135
        {
 
1136
            uint64_t total_written = 0;
 
1137
            uint64_t to_write = fd->bytes_written;
 
1138
            int write_len = 0;
 
1139
            int count;
 
1140
            char * buf_ptr = fd->buffer;
 
1141
            while (total_written < fd->bytes_written)
 
1142
            {
 
1143
                write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
 
1144
                err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
 
1145
                MPI_Get_count(&md->status, MPI_BYTE, &count);
 
1146
                if (count != write_len)
 
1147
                {
 
1148
                    err = count;
 
1149
                    break;
 
1150
                }
 
1151
                total_written += count;
 
1152
                buf_ptr += count;
 
1153
                to_write -= count;
 
1154
                //err = total_written;
 
1155
            }
 
1156
        }
 
1157
        if (err != MPI_SUCCESS) 
 
1158
        {              
 
1159
            char e [MPI_MAX_ERROR_STRING];
 
1160
            int len = 0;
 
1161
            memset (e, 0, MPI_MAX_ERROR_STRING);
 
1162
            MPI_Error_string (err, e, &len);
 
1163
            adios_error (err_write_error, 
 
1164
                         "MPI method, rank %d: adios_write() of header of variable %s to "
 
1165
                         "file %s failed: '%s'\n ",
 
1166
                         md->rank, v->name, fd->name, e);       
 
1167
        }
 
1168
 
 
1169
        int count;
 
1170
        MPI_Get_count (&md->status, MPI_BYTE, &count);
 
1171
        if (count != fd->bytes_written)
 
1172
        {
 
1173
            log_warn ("MPI method, rank %d: tried to write %llu bytes, "
 
1174
                      "only wrote %d of header of variable %s\n",
 
1175
                      md->rank, fd->bytes_written, count, v->name);
 
1176
        }
 
1177
        fd->base_offset += count;
 
1178
        fd->offset = 0;
 
1179
        fd->bytes_written = 0;
 
1180
        adios_shared_buffer_free (&md->b);
 
1181
 
 
1182
        // write payload
 
1183
        // adios_write_var_payload_v1 (fd, v);
 
1184
        uint64_t var_size = adios_get_var_size (v, fd->group, v->data);
 
1185
 
 
1186
        if (fd->base_offset + var_size > fd->pg_start_in_file + fd->write_size_bytes)
 
1187
            adios_error (err_out_of_bound, 
 
1188
                         "MPI method, rank %d: adios_write() of variable %s exceeds pg bound.\n"
 
1189
                         "File is corrupted. Need to enlarge group size in adios_group_size().\n"
 
1190
                         "Group size=%llu, offset at end of this variable data=%llu\n",
 
1191
                         md->rank, v->name, 
 
1192
                         fd->write_size_bytes,
 
1193
                         fd->base_offset - fd->pg_start_in_file + var_size);
 
1194
#if 0
 
1195
        err = MPI_File_write (md->fh, v->data, var_size, MPI_BYTE, &md->status);
 
1196
#endif
 
1197
        {
 
1198
            uint64_t total_written = 0;
 
1199
            uint64_t to_write = var_size;
 
1200
            int write_len = 0;
 
1201
            int count;
 
1202
            char * buf_ptr = v->data;
 
1203
            while (total_written < var_size)
 
1204
            {
 
1205
                write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
 
1206
                err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
 
1207
                MPI_Get_count(&md->status, MPI_BYTE, &count);
 
1208
                if (count != write_len)
 
1209
                {
 
1210
                    err = count;
 
1211
                    break;
 
1212
                }
 
1213
                total_written += count;
 
1214
                buf_ptr += count;
 
1215
                to_write -= count;
 
1216
                //err = total_written;
 
1217
            }
 
1218
        }
 
1219
        if (err != MPI_SUCCESS) 
 
1220
        {              
 
1221
            char e [MPI_MAX_ERROR_STRING];
 
1222
            int len = 0;
 
1223
            memset (e, 0, MPI_MAX_ERROR_STRING);
 
1224
            MPI_Error_string (err, e, &len);
 
1225
            adios_error (err_write_error, 
 
1226
                         "MPI method, rank %d: adios_write() of variable %s to "
 
1227
                         "file %s failed: '%s'\n ",
 
1228
                         md->rank, v->name, fd->name, e);       
 
1229
        }
 
1230
 
 
1231
        MPI_Get_count (&md->status, MPI_BYTE, &count);
 
1232
        if (count != var_size)
 
1233
        {
 
1234
            log_warn ("MPI method, rank %d: tried to write %llu bytes, "
 
1235
                      "only wrote %d of variable %s\n",
 
1236
                      md->rank, var_size, count, v->name);
 
1237
        }
 
1238
        fd->base_offset += count;
 
1239
        fd->offset = 0;
 
1240
        fd->bytes_written = 0;
 
1241
        adios_shared_buffer_free (&md->b);
 
1242
    }
 
1243
#if COLLECT_METRICS
 
1244
    if (timing.write_count == timing.write_size)
 
1245
    {
 
1246
        timing.write_size += 10;
 
1247
        timing.t24 = realloc (timing.t24, sizeof (struct timeval)
 
1248
                                          * timing.write_size
 
1249
                             );
 
1250
        assert (timing.t24 != 0);
 
1251
    }
 
1252
    gettimeofday (&(timing.t24 [timing.write_count++]), NULL);
 
1253
#endif
 
1254
}
 
1255
 
 
1256
void adios_mpi_get_write_buffer (struct adios_file_struct * fd
 
1257
                                ,struct adios_var_struct * v
 
1258
                                ,uint64_t * size
 
1259
                                ,void ** buffer
 
1260
                                ,struct adios_method_struct * method
 
1261
                                )
 
1262
{
 
1263
    uint64_t mem_allowed;
 
1264
    struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
 
1265
                                                      method->method_data;
 
1266
 
 
1267
    if (*size == 0)
 
1268
    {
 
1269
        *buffer = 0;
 
1270
 
 
1271
        return;
 
1272
    }
 
1273
 
 
1274
    if (v->data && v->free_data)
 
1275
    {
 
1276
        adios_method_buffer_free (v->data_size);
 
1277
        free (v->data);
 
1278
    }
 
1279
 
 
1280
    mem_allowed = adios_method_buffer_alloc (*size);
 
1281
    if (mem_allowed == *size)
 
1282
    {
 
1283
        *buffer = malloc (*size);
 
1284
        if (!*buffer)
 
1285
        {
 
1286
            adios_method_buffer_free (mem_allowed);
 
1287
            adios_error (err_no_memory,
 
1288
                         "MPI method, rank %d: cannot allocate %llu bytes for variable %s\n",
 
1289
                         md->rank, *size, v->name);
 
1290
            v->got_buffer = adios_flag_no;
 
1291
            v->free_data = adios_flag_no;
 
1292
            v->data_size = 0;
 
1293
            v->data = 0;
 
1294
            *size = 0;
 
1295
            *buffer = 0;
 
1296
        }
 
1297
        else
 
1298
        {
 
1299
            v->got_buffer = adios_flag_yes;
 
1300
            v->free_data = adios_flag_yes;
 
1301
            v->data_size = mem_allowed;
 
1302
            v->data = *buffer;
 
1303
        }
 
1304
    }
 
1305
    else
 
1306
    {
 
1307
        adios_method_buffer_free (mem_allowed);
 
1308
 
 
1309
        adios_error (err_buffer_overflow,
 
1310
                "MPI method, rank %d: OVERFLOW: Cannot get requested ADIOS buffer of %llu "
 
1311
                "bytes for variable %s. Remaining buffer size was %llu\n",
 
1312
                md->rank, *size, v->name, mem_allowed);
 
1313
        *size = 0;
 
1314
        *buffer = 0;
 
1315
    }
 
1316
}
 
1317
 
 
1318
void adios_mpi_read (struct adios_file_struct * fd
 
1319
                    ,struct adios_var_struct * v, void * buffer
 
1320
                    ,uint64_t buffer_size
 
1321
                    ,struct adios_method_struct * method
 
1322
                    )
 
1323
{
 
1324
    v->data = buffer;
 
1325
    v->data_size = buffer_size;
 
1326
}
 
1327
 
 
1328
static void adios_mpi_do_read (struct adios_file_struct * fd
 
1329
                              ,struct adios_method_struct * method
 
1330
                              )
 
1331
{
 
1332
    struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
 
1333
                                                      method->method_data;
 
1334
    struct adios_var_struct * v = fd->group->vars;
 
1335
 
 
1336
    struct adios_parse_buffer_struct data;
 
1337
 
 
1338
    data.vars = v;
 
1339
    data.buffer = 0;
 
1340
    data.buffer_len = 0;
 
1341
 
 
1342
    switch (md->b.version & ADIOS_VERSION_NUM_MASK)
 
1343
    {
 
1344
        case 1:
 
1345
        {
 
1346
            // the three section headers
 
1347
            struct adios_process_group_header_struct_v1 pg_header;
 
1348
            struct adios_vars_header_struct_v1 vars_header;
 
1349
            struct adios_attributes_header_struct_v1 attrs_header;
 
1350
 
 
1351
            struct adios_var_header_struct_v1 var_header;
 
1352
            struct adios_var_payload_struct_v1 var_payload;
 
1353
            struct adios_attribute_struct_v1 attribute;
 
1354
 
 
1355
            int i;
 
1356
 
 
1357
            adios_init_buffer_read_process_group (&md->b);
 
1358
            MPI_File_seek (md->fh, md->b.read_pg_offset
 
1359
                          ,MPI_SEEK_SET
 
1360
                          );
 
1361
            MPI_File_read (md->fh, md->b.buff, md->b.read_pg_size, MPI_BYTE
 
1362
                          ,&md->status
 
1363
                          );
 
1364
            adios_parse_process_group_header_v1 (&md->b, &pg_header);
 
1365
 
 
1366
            adios_parse_vars_header_v1 (&md->b, &vars_header);
 
1367
 
 
1368
            for (i = 0; i < vars_header.count; i++)
 
1369
            {
 
1370
                memset (&var_payload, 0
 
1371
                       ,sizeof (struct adios_var_payload_struct_v1)
 
1372
                       );
 
1373
                adios_parse_var_data_header_v1 (&md->b, &var_header);
 
1374
 
 
1375
                struct adios_var_struct * v1 = v;
 
1376
                while (v1)
 
1377
                {
 
1378
                    if (   strcasecmp (var_header.name, v1->name)
 
1379
                        || strcasecmp (var_header.path, v1->path)
 
1380
                       )
 
1381
                    {
 
1382
                        v1 = v1->next;
 
1383
                    }
 
1384
                    else
 
1385
                    {
 
1386
                        break;
 
1387
                    }
 
1388
                }
 
1389
 
 
1390
                if (v1)
 
1391
                {
 
1392
                    var_payload.payload = v1->data;
 
1393
                    adios_parse_var_data_payload_v1 (&md->b, &var_header
 
1394
                                                    ,&var_payload
 
1395
                                                    ,v1->data_size
 
1396
                                                    );
 
1397
                }
 
1398
                else
 
1399
                {
 
1400
                    log_warn ("MPI method, rank %d: read: skipping name: %s path: %s\n",
 
1401
                           md->rank, var_header.name, var_header.path);
 
1402
                    adios_parse_var_data_payload_v1 (&md->b, &var_header
 
1403
                                                    ,NULL, 0
 
1404
                                                    );
 
1405
                }
 
1406
 
 
1407
                adios_clear_var_header_v1 (&var_header);
 
1408
            }
 
1409
 
 
1410
#if 1
 
1411
            adios_parse_attributes_header_v1 (&md->b, &attrs_header);
 
1412
 
 
1413
            for (i = 0; i < attrs_header.count; i++)
 
1414
            {
 
1415
                adios_parse_attribute_v1 (&md->b, &attribute);
 
1416
                adios_clear_attribute_v1 (&attribute);
 
1417
            }
 
1418
#endif
 
1419
            adios_clear_process_group_header_v1 (&pg_header);
 
1420
 
 
1421
            break;
 
1422
        }
 
1423
 
 
1424
        default:
 
1425
            adios_error (err_invalid_file_version, 
 
1426
                         "MPI method read: ADIOS file version unknown: %u\n",
 
1427
                         md->b.version);
 
1428
            return;
 
1429
    }
 
1430
 
 
1431
    adios_buffer_struct_clear (&md->b);
 
1432
}
 
1433
 
 
1434
void adios_mpi_close (struct adios_file_struct * fd
 
1435
                     ,struct adios_method_struct * method
 
1436
                     )
 
1437
{
 
1438
    struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
 
1439
                                                 method->method_data;
 
1440
    struct adios_attribute_struct * a = fd->group->attributes;
 
1441
 
 
1442
    struct adios_index_process_group_struct_v1 * new_pg_root = 0;
 
1443
    struct adios_index_var_struct_v1 * new_vars_root = 0;
 
1444
    struct adios_index_attribute_struct_v1 * new_attrs_root = 0;
 
1445
#if COLLECT_METRICS
 
1446
    gettimeofday (&timing.t23, NULL);
 
1447
    timing.t19.tv_sec = timing.t23.tv_sec;
 
1448
    timing.t19.tv_usec = timing.t23.tv_usec;
 
1449
    static int iteration = 0;
 
1450
#endif
 
1451
 
 
1452
    switch (fd->mode)
 
1453
    {
 
1454
        case adios_mode_read:
 
1455
        {
 
1456
            // read the index to find the place to start reading
 
1457
            adios_mpi_do_read (fd, method);
 
1458
            struct adios_var_struct * v = fd->group->vars;
 
1459
            while (v)
 
1460
            {
 
1461
                v->data = 0;
 
1462
                v = v->next;
 
1463
            }
 
1464
 
 
1465
            break;
 
1466
        }
 
1467
 
 
1468
        case adios_mode_write:
 
1469
        {
 
1470
            char * buffer = 0;
 
1471
            uint64_t buffer_size = 0;
 
1472
            uint64_t buffer_offset = 0;
 
1473
            uint64_t index_start = md->b.pg_index_offset;
 
1474
            int err;
 
1475
 
 
1476
            if (fd->shared_buffer == adios_flag_no)
 
1477
            {
 
1478
                MPI_Offset new_off;
 
1479
                // set it up so that it will start at 0, but have correct sizes
 
1480
                MPI_File_get_position (md->fh, &new_off);
 
1481
                fd->offset = fd->base_offset - md->vars_start;
 
1482
                fd->vars_start = 0;
 
1483
                fd->buffer_size = 0;
 
1484
                adios_write_close_vars_v1 (fd);
 
1485
                // fd->vars_start gets updated with the size written
 
1486
                MPI_File_seek (md->fh, md->vars_start, MPI_SEEK_SET);
 
1487
#if 0
 
1488
                err = MPI_File_write (md->fh, fd->buffer, md->vars_header_size
 
1489
                                     ,MPI_BYTE, &md->status
 
1490
                                     );
 
1491
#endif
 
1492
                {
 
1493
                    uint64_t total_written = 0;
 
1494
                    uint64_t to_write = md->vars_header_size;
 
1495
                    int write_len = 0;
 
1496
                    int count;
 
1497
                    char * buf_ptr = fd->buffer;
 
1498
                    while (total_written < md->vars_header_size)
 
1499
                    {
 
1500
                        write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
 
1501
                        err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
 
1502
                        MPI_Get_count(&md->status, MPI_BYTE, &count);
 
1503
                        if (count != write_len)
 
1504
                        {
 
1505
                            err = count;
 
1506
                            break;
 
1507
                        }
 
1508
                        total_written += count;
 
1509
                        buf_ptr += count;
 
1510
                        to_write -= count;
 
1511
                        //err = total_written;
 
1512
                    }
 
1513
                }
 
1514
                if (err != MPI_SUCCESS) 
 
1515
                {              
 
1516
                    char e [MPI_MAX_ERROR_STRING];
 
1517
                    int len = 0;
 
1518
                    memset (e, 0, MPI_MAX_ERROR_STRING);
 
1519
                    MPI_Error_string (err, e, &len);
 
1520
                    adios_error (err_write_error, 
 
1521
                         "MPI method, rank %d: adios_close(): writing of variable header "
 
1522
                         "of %llu bytes to file %s failed: '%s'\n",
 
1523
                         md->rank, md->vars_header_size, fd->name, e);       
 
1524
                }
 
1525
 
 
1526
                int count;
 
1527
                MPI_Get_count (&md->status, MPI_BYTE, &count);
 
1528
                if (count != md->vars_header_size)
 
1529
                {
 
1530
                    log_warn ("MPI method, rank %d: adios_close() tried to write %llu bytes "
 
1531
                              "of variable header but only wrote %d\n",
 
1532
                              md->rank, md->vars_header_size, count);
 
1533
                }
 
1534
                fd->offset = 0;
 
1535
                fd->bytes_written = 0;
 
1536
                adios_shared_buffer_free (&md->b);
 
1537
 
 
1538
                adios_write_open_attributes_v1 (fd);
 
1539
                md->vars_start = new_off;
 
1540
                md->vars_header_size = fd->offset;
 
1541
                MPI_File_seek (md->fh, new_off + md->vars_header_size
 
1542
                              ,MPI_SEEK_SET
 
1543
                              ); // go back to end, but after attr header
 
1544
                fd->base_offset += fd->offset;  // add size of header
 
1545
                fd->offset = 0;
 
1546
                fd->bytes_written = 0;
 
1547
 
 
1548
                if (!fd->group->process_id) { // from ADIOS 1.4, only rank 0 writes attributes
 
1549
                    while (a)
 
1550
                    {
 
1551
                        adios_write_attribute_v1 (fd, a);
 
1552
                        if (fd->base_offset + fd->bytes_written > fd->pg_start_in_file + fd->write_size_bytes)
 
1553
                            adios_error (err_out_of_bound, 
 
1554
                                    "MPI method, rank %d: writing of the attributes exceeds pg bound.\n"
 
1555
                                    "File is corrupted. Need to enlarge group size in adios_group_size().\n"
 
1556
                                    "Group size=%llu, offset at end of this variable data=%llu\n",
 
1557
                                    md->rank, 
 
1558
                                    fd->write_size_bytes,
 
1559
                                    fd->base_offset - fd->pg_start_in_file + fd->bytes_written);
 
1560
#if 0
 
1561
                        err = MPI_File_write (md->fh, fd->buffer, fd->bytes_written
 
1562
                                ,MPI_BYTE, &md->status
 
1563
                                );
 
1564
#endif
 
1565
                        {
 
1566
                            uint64_t total_written = 0;
 
1567
                            uint64_t to_write = fd->bytes_written;
 
1568
                            int write_len = 0;
 
1569
                            int count;
 
1570
                            char * buf_ptr = fd->buffer;
 
1571
                            while (total_written < fd->bytes_written)
 
1572
                            {
 
1573
                                write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
 
1574
                                err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
 
1575
                                MPI_Get_count(&md->status, MPI_BYTE, &count);
 
1576
                                if (count != write_len)
 
1577
                                {
 
1578
                                    err = count;
 
1579
                                    break;
 
1580
                                }
 
1581
                                total_written += count;
 
1582
                                buf_ptr += count;
 
1583
                                to_write -= count;
 
1584
                                //err = total_written;
 
1585
                            }
 
1586
                        }
 
1587
                        if (err != MPI_SUCCESS) 
 
1588
                        {              
 
1589
                            char e [MPI_MAX_ERROR_STRING];
 
1590
                            int len = 0;
 
1591
                            memset (e, 0, MPI_MAX_ERROR_STRING);
 
1592
                            MPI_Error_string (err, e, &len);
 
1593
                            adios_error (err_write_error, 
 
1594
                                    "MPI method, rank %d: adios_close(): writing of attributes "
 
1595
                                    "of %llu bytes to file %s failed: '%s'\n",
 
1596
                                    md->rank, fd->bytes_written, fd->name, e);       
 
1597
                        }
 
1598
 
 
1599
                        MPI_Get_count (&md->status, MPI_BYTE, &count);
 
1600
                        if (count != fd->bytes_written)
 
1601
                        {
 
1602
                            log_warn ("MPI method, rank %d: adios_close() tried to write "
 
1603
                                      "%llu bytes of attributes but only wrote %d\n",
 
1604
                                      md->rank, fd->bytes_written, count);
 
1605
                        }
 
1606
                        fd->base_offset += count;
 
1607
                        fd->offset = 0;
 
1608
                        fd->bytes_written = 0;
 
1609
                        adios_shared_buffer_free (&md->b);
 
1610
 
 
1611
                        a = a->next;
 
1612
                    }
 
1613
                }
 
1614
 
 
1615
                // set it up so that it will start at 0, but have correct sizes
 
1616
                fd->offset = fd->base_offset - md->vars_start;
 
1617
                fd->vars_start = 0;
 
1618
                fd->buffer_size = 0;
 
1619
                adios_write_close_attributes_v1 (fd);
 
1620
                MPI_File_seek (md->fh, md->vars_start, MPI_SEEK_SET);
 
1621
                // fd->vars_start gets updated with the size written
 
1622
#if 0
 
1623
                err = MPI_File_write (md->fh, fd->buffer, md->vars_header_size
 
1624
                                     ,MPI_BYTE, &md->status
 
1625
                                     );
 
1626
#endif
 
1627
                {
 
1628
                    uint64_t total_written = 0;
 
1629
                    uint64_t to_write = md->vars_header_size;
 
1630
                    int write_len = 0;
 
1631
                    int count;
 
1632
                    char * buf_ptr = fd->buffer;
 
1633
                    while (total_written < md->vars_header_size)
 
1634
                    {
 
1635
                        write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
 
1636
                        err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
 
1637
                        MPI_Get_count(&md->status, MPI_BYTE, &count);
 
1638
                        if (count != write_len)
 
1639
                        {
 
1640
                            err = count;
 
1641
                            break;
 
1642
                        }
 
1643
                        total_written += count;
 
1644
                        buf_ptr += count;
 
1645
                        to_write -= count;
 
1646
                        //err = total_written;
 
1647
                    }
 
1648
                }
 
1649
                if (err != MPI_SUCCESS) 
 
1650
                {              
 
1651
                    char e [MPI_MAX_ERROR_STRING];
 
1652
                    int len = 0;
 
1653
                    memset (e, 0, MPI_MAX_ERROR_STRING);
 
1654
                    MPI_Error_string (err, e, &len);
 
1655
                    adios_error (err_write_error, 
 
1656
                            "MPI method, rank %d: adios_close(): writing of variable header "
 
1657
                            "of %llu bytes to file %s failed: '%s'\n",
 
1658
                            md->rank, md->vars_header_size, fd->name, e);       
 
1659
                }
 
1660
 
 
1661
                MPI_Get_count (&md->status, MPI_BYTE, &count);
 
1662
                if (count != md->vars_header_size)
 
1663
                {
 
1664
                    log_warn ("MPI method, rank %d: adios_close() tried to write %llu bytes "
 
1665
                              "of variable header but only wrote %d\n",
 
1666
                              md->rank, md->vars_header_size, count);
 
1667
                }
 
1668
                fd->offset = 0;
 
1669
                fd->bytes_written = 0;
 
1670
            }
 
1671
 
 
1672
#if COLLECT_METRICS
 
1673
            gettimeofday (&timing.t19, NULL);
 
1674
            gettimeofday (&timing.t12, NULL);
 
1675
#endif
 
1676
            // build index appending to any existing index
 
1677
            adios_build_index_v1 (fd, &md->old_pg_root, &md->old_vars_root
 
1678
                                 ,&md->old_attrs_root
 
1679
                                 );
 
1680
            // if collective, gather the indexes from the rest and call
 
1681
            if (md->group_comm != MPI_COMM_NULL)
 
1682
            {
 
1683
                if (md->rank == 0)
 
1684
                {
 
1685
                    int * index_sizes = malloc (4 * md->size);
 
1686
                    int * index_offsets = malloc (4 * md->size);
 
1687
                    char * recv_buffer = 0;
 
1688
                    uint32_t size = 0;
 
1689
                    uint32_t total_size = 0;
 
1690
                    int i;
 
1691
 
 
1692
                    MPI_Gather (&size, 1, MPI_INT
 
1693
                               ,index_sizes, 1, MPI_INT
 
1694
                               ,0, md->group_comm
 
1695
                               );
 
1696
 
 
1697
                    for (i = 0; i < md->size; i++)
 
1698
                    {
 
1699
                        index_offsets [i] = total_size;
 
1700
                        total_size += index_sizes [i];
 
1701
                    } 
 
1702
 
 
1703
                    recv_buffer = malloc (total_size);
 
1704
 
 
1705
                    MPI_Gatherv (&size, 0, MPI_BYTE
 
1706
                                ,recv_buffer, index_sizes, index_offsets
 
1707
                                ,MPI_BYTE, 0, md->group_comm
 
1708
                                );
 
1709
 
 
1710
                    char * buffer_save = md->b.buff;
 
1711
                    uint64_t buffer_size_save = md->b.length;
 
1712
                    uint64_t offset_save = md->b.offset;
 
1713
 
 
1714
                    for (i = 1; i < md->size; i++)
 
1715
                    {
 
1716
                        md->b.buff = recv_buffer + index_offsets [i];
 
1717
                        md->b.length = index_sizes [i];
 
1718
                        md->b.offset = 0;
 
1719
 
 
1720
                        adios_parse_process_group_index_v1 (&md->b
 
1721
                                                           ,&new_pg_root
 
1722
                                                           );
 
1723
                        adios_parse_vars_index_v1 (&md->b, &new_vars_root);
 
1724
                        // do not merge attributes from other processes from 1.4
 
1725
                        /*
 
1726
                        adios_parse_attributes_index_v1 (&md->b
 
1727
                                                        ,&new_attrs_root
 
1728
                                                        );
 
1729
                        */
 
1730
                        adios_merge_index_v1 (&md->old_pg_root
 
1731
                                             ,&md->old_vars_root
 
1732
                                             ,&md->old_attrs_root
 
1733
                                             ,new_pg_root, new_vars_root
 
1734
                                             ,new_attrs_root
 
1735
                                             );
 
1736
                        new_pg_root = 0;
 
1737
                        new_vars_root = 0;
 
1738
                        new_attrs_root = 0;
 
1739
                    }
 
1740
                    md->b.buff = buffer_save;
 
1741
                    md->b.length = buffer_size_save;
 
1742
                    md->b.offset = offset_save;
 
1743
 
 
1744
                    free (recv_buffer);
 
1745
                    free (index_sizes);
 
1746
                    free (index_offsets);
 
1747
                }
 
1748
                else
 
1749
                {
 
1750
                    adios_write_index_v1 (&buffer, &buffer_size, &buffer_offset
 
1751
                                         ,0, md->old_pg_root
 
1752
                                         ,md->old_vars_root
 
1753
                                         ,md->old_attrs_root
 
1754
                                         );
 
1755
                    uint32_t tmp_buffer_size = (uint32_t) buffer_size;
 
1756
                    MPI_Gather (&tmp_buffer_size, 1, MPI_INT, 0, 0, MPI_INT
 
1757
                               ,0, md->group_comm
 
1758
                               );
 
1759
                    MPI_Gatherv (buffer, buffer_size, MPI_BYTE
 
1760
                                ,0, 0, 0, MPI_BYTE
 
1761
                                ,0, md->group_comm
 
1762
                                );
 
1763
                }
 
1764
            }
 
1765
 
 
1766
#if COLLECT_METRICS
 
1767
            gettimeofday (&timing.t13, NULL);
 
1768
#endif
 
1769
            if (fd->shared_buffer == adios_flag_yes)
 
1770
            {
 
1771
                // if we need to write > 2 GB, need to do it in parts
 
1772
                // since count is limited to INT32_MAX (signed 32-bit max).
 
1773
                uint64_t bytes_written = 0;
 
1774
                int32_t to_write = 0;
 
1775
                if (fd->bytes_written > INT32_MAX)
 
1776
                {
 
1777
                    to_write = INT32_MAX;
 
1778
                }
 
1779
                else
 
1780
                {
 
1781
                    to_write = (int32_t) fd->bytes_written;
 
1782
                }
 
1783
 
 
1784
                if (fd->base_offset + fd->bytes_written > 
 
1785
                    fd->pg_start_in_file + fd->write_size_bytes) 
 
1786
                {
 
1787
                    adios_error (err_out_of_bound, 
 
1788
                            "MPI method, rank %d: size of buffered data exceeds pg bound.\n"
 
1789
                            "File is corrupted. Need to enlarge group size in adios_group_size().\n"
 
1790
                            "Group size=%llu, offset at end of variable buffer=%llu\n",
 
1791
                            md->rank, 
 
1792
                            fd->write_size_bytes,
 
1793
                            fd->base_offset - fd->pg_start_in_file + fd->bytes_written);
 
1794
                }
 
1795
 
 
1796
                while (bytes_written < fd->bytes_written)
 
1797
                {
 
1798
                    // everyone writes their data
 
1799
 
 
1800
                    MPI_File_seek (md->fh, fd->base_offset + bytes_written
 
1801
                                  ,MPI_SEEK_SET
 
1802
                                  );
 
1803
                    err = MPI_File_write (md->fh, fd->buffer + bytes_written
 
1804
                                         ,to_write, MPI_BYTE, &md->status
 
1805
                                         );
 
1806
                    if (err != MPI_SUCCESS) 
 
1807
                    {              
 
1808
                        char e [MPI_MAX_ERROR_STRING];
 
1809
                        int len = 0;
 
1810
                        memset (e, 0, MPI_MAX_ERROR_STRING);
 
1811
                        MPI_Error_string (err, e, &len);
 
1812
                        adios_error (err_write_error, 
 
1813
                                "MPI method, rank %d: adios_close(): writing of buffered data "
 
1814
                                "[%llu..%llu] to file %s failed: '%s'\n",
 
1815
                                md->rank, bytes_written, bytes_written+to_write-1, 
 
1816
                                fd->name, e);       
 
1817
                    }
 
1818
                    bytes_written += to_write;
 
1819
                    if (fd->bytes_written > bytes_written)
 
1820
                    {
 
1821
                        if (fd->bytes_written - bytes_written > INT32_MAX)
 
1822
                        {
 
1823
                            to_write = INT32_MAX;
 
1824
                        }
 
1825
                        else
 
1826
                        {
 
1827
                            to_write = fd->bytes_written - bytes_written;
 
1828
                        }
 
1829
                    }
 
1830
                }
 
1831
            }
 
1832
 
 
1833
            if (md->rank == 0)
 
1834
            {
 
1835
                adios_write_index_v1 (&buffer, &buffer_size, &buffer_offset
 
1836
                                     ,index_start, md->old_pg_root
 
1837
                                     ,md->old_vars_root
 
1838
                                     ,md->old_attrs_root
 
1839
                                     );
 
1840
                adios_write_version_v1 (&buffer, &buffer_size, &buffer_offset);
 
1841
 
 
1842
                MPI_File_seek (md->fh, md->b.pg_index_offset, MPI_SEEK_SET);
 
1843
#if 0
 
1844
                err = MPI_File_write (md->fh, buffer, buffer_offset, MPI_BYTE
 
1845
                                     ,&md->status
 
1846
                                     );
 
1847
#endif
 
1848
                {              
 
1849
                    uint64_t total_written = 0;
 
1850
                    uint64_t to_write = buffer_offset;
 
1851
                    int write_len = 0;
 
1852
                    int count;
 
1853
                    char * buf_ptr = buffer;
 
1854
                    while (total_written < buffer_offset)
 
1855
                    {
 
1856
                        write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
 
1857
#if COLLECT_METRICS
 
1858
struct timeval a, b;
 
1859
gettimeofday (&a, NULL);
 
1860
#endif
 
1861
                        err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
 
1862
#if COLLECT_METRICS
 
1863
gettimeofday (&b, NULL);
 
1864
timeval_subtract (&timing.t8, &b, &a);
 
1865
#endif
 
1866
                        MPI_Get_count(&md->status, MPI_BYTE, &count);
 
1867
                        if (count != write_len)
 
1868
                        {
 
1869
                            log_error ("MPI method, rank %d: Need to do multi-write 1 (tried: "
 
1870
                                    "%d wrote: %d) errno %d\n",
 
1871
                                    md->rank, write_len, count, errno);
 
1872
                            err = count;
 
1873
                            break;
 
1874
                        }
 
1875
                        total_written += count;
 
1876
                        buf_ptr += count;
 
1877
                        to_write -= count;
 
1878
                        //err = total_written;
 
1879
                    }
 
1880
                }              
 
1881
                if (err != MPI_SUCCESS) 
 
1882
                {              
 
1883
                    char e [MPI_MAX_ERROR_STRING];
 
1884
                    int len = 0;
 
1885
                    memset (e, 0, MPI_MAX_ERROR_STRING);
 
1886
                    MPI_Error_string (err, e, &len);
 
1887
                    adios_error (err_write_error, 
 
1888
                            "MPI method, rank %d: adios_close(): writing of index data "
 
1889
                            "of %llu bytes to file %s failed: '%s'\n",
 
1890
                            md->rank, buffer_offset, fd->name, e);       
 
1891
                }
 
1892
            }
 
1893
#if COLLECT_METRICS
 
1894
            gettimeofday (&timing.t20, NULL);
 
1895
            gettimeofday (&timing.t14, NULL);
 
1896
#endif
 
1897
 
 
1898
            if (buffer)
 
1899
            {
 
1900
                free (buffer);
 
1901
                buffer = 0;
 
1902
                buffer_size = 0;
 
1903
                buffer_offset = 0;
 
1904
            }
 
1905
 
 
1906
            adios_clear_index_v1 (new_pg_root, new_vars_root, new_attrs_root);
 
1907
            adios_clear_index_v1 (md->old_pg_root, md->old_vars_root
 
1908
                                 ,md->old_attrs_root
 
1909
                                 );
 
1910
            new_pg_root = 0;
 
1911
            new_vars_root = 0;
 
1912
            new_attrs_root = 0;
 
1913
            md->old_pg_root = 0;
 
1914
            md->old_vars_root = 0;
 
1915
            md->old_attrs_root = 0;
 
1916
#if COLLECT_METRICS
 
1917
            gettimeofday (&timing.t11, NULL);
 
1918
#endif
 
1919
 
 
1920
            break;
 
1921
        }
 
1922
 
 
1923
        case adios_mode_append:
 
1924
        case adios_mode_update:
 
1925
        {
 
1926
            char * buffer = 0;
 
1927
            uint64_t buffer_size = 0;
 
1928
            uint64_t buffer_offset = 0;
 
1929
            uint64_t index_start = md->b.pg_index_offset;
 
1930
            int err;
 
1931
 
 
1932
            if (fd->shared_buffer == adios_flag_no)
 
1933
            {
 
1934
                MPI_Offset new_off;
 
1935
                // set it up so that it will start at 0, but have correct sizes
 
1936
                MPI_File_get_position (md->fh, &new_off);
 
1937
                fd->offset = fd->base_offset - md->vars_start;
 
1938
                fd->vars_start = 0;
 
1939
                fd->buffer_size = 0;
 
1940
                adios_write_close_vars_v1 (fd);
 
1941
                // fd->vars_start gets updated with the size written
 
1942
                MPI_File_seek (md->fh, md->vars_start, MPI_SEEK_SET);
 
1943
#if 0
 
1944
                err = MPI_File_write (md->fh, fd->buffer, md->vars_header_size
 
1945
                                     ,MPI_BYTE, &md->status
 
1946
                                     );
 
1947
#endif
 
1948
                {              
 
1949
                    uint64_t total_written = 0;
 
1950
                    uint64_t to_write = md->vars_header_size;
 
1951
                    int write_len = 0;
 
1952
                    int count;
 
1953
                    char * buf_ptr = fd->buffer;
 
1954
                    while (total_written < md->vars_header_size)
 
1955
                    {
 
1956
                        write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
 
1957
                        err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
 
1958
                        MPI_Get_count(&md->status, MPI_BYTE, &count);
 
1959
                        if (count != write_len)
 
1960
                        {
 
1961
                            err = count;
 
1962
                            break;
 
1963
                        }
 
1964
                        total_written += count;
 
1965
                        buf_ptr += count;
 
1966
                        to_write -= count;
 
1967
                        //err = total_written;
 
1968
                    }
 
1969
                }              
 
1970
                if (err != MPI_SUCCESS) 
 
1971
                {              
 
1972
                    char e [MPI_MAX_ERROR_STRING];
 
1973
                    int len = 0;
 
1974
                    memset (e, 0, MPI_MAX_ERROR_STRING);
 
1975
                    MPI_Error_string (err, e, &len);
 
1976
                    adios_error (err_write_error, 
 
1977
                         "MPI method, rank %d: adios_close(): writing of variable header "
 
1978
                         "of %llu bytes to file %s failed: '%s'\n",
 
1979
                         md->rank, md->vars_header_size, fd->name, e);       
 
1980
                }
 
1981
 
 
1982
                int count;
 
1983
                MPI_Get_count (&md->status, MPI_BYTE, &count);
 
1984
                if (count != md->vars_header_size)
 
1985
                {
 
1986
                    log_warn ("MPI method, rank %d: adios_close() tried to write %llu bytes "
 
1987
                              "of variable header but only wrote %d\n",
 
1988
                              md->rank, md->vars_header_size, count);
 
1989
                }
 
1990
                fd->offset = 0;
 
1991
                fd->bytes_written = 0;
 
1992
                adios_shared_buffer_free (&md->b);
 
1993
 
 
1994
                adios_write_open_attributes_v1 (fd);
 
1995
                md->vars_start = new_off;
 
1996
                md->vars_header_size = fd->offset;
 
1997
                MPI_File_seek (md->fh, new_off + md->vars_header_size
 
1998
                              ,MPI_SEEK_SET
 
1999
                              ); // go back to end, but after attr header
 
2000
                fd->base_offset += fd->offset;  // add size of header
 
2001
                fd->offset = 0;
 
2002
                fd->bytes_written = 0;
 
2003
 
 
2004
                if (!fd->group->process_id) { // from ADIOS 1.4, only rank 0 writes attributes
 
2005
                    while (a)
 
2006
                    {
 
2007
                        adios_write_attribute_v1 (fd, a);
 
2008
                        if (fd->base_offset + fd->bytes_written > fd->pg_start_in_file + fd->write_size_bytes)
 
2009
                            adios_error (err_out_of_bound, 
 
2010
                                    "MPI method, rank %d: writing of the attributes exceeds pg bound.\n"
 
2011
                                    "File is corrupted. Need to enlarge group size in adios_group_size().\n"
 
2012
                                    "Group size=%llu, offset at end of this variable data=%llu\n",
 
2013
                                    md->rank, 
 
2014
                                    fd->write_size_bytes,
 
2015
                                    fd->base_offset - fd->pg_start_in_file + fd->bytes_written);
 
2016
#if 0
 
2017
                        err = MPI_File_write (md->fh, fd->buffer, fd->bytes_written
 
2018
                                ,MPI_BYTE, &md->status
 
2019
                                );
 
2020
#endif
 
2021
                        {
 
2022
                            uint64_t total_written = 0;
 
2023
                            uint64_t to_write = fd->bytes_written;
 
2024
                            int write_len = 0;
 
2025
                            int count;
 
2026
                            char * buf_ptr = fd->buffer;
 
2027
                            while (total_written < fd->bytes_written)
 
2028
                            {
 
2029
                                write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
 
2030
                                err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
 
2031
                                MPI_Get_count(&md->status, MPI_BYTE, &count);
 
2032
                                if (count != write_len)
 
2033
                                {
 
2034
                                    err = count;
 
2035
                                    break;
 
2036
                                }
 
2037
                                total_written += count;
 
2038
                                buf_ptr += count;
 
2039
                                to_write -= count;
 
2040
                                //err = total_written;
 
2041
                            }
 
2042
                        }              
 
2043
                        if (err != MPI_SUCCESS) 
 
2044
                        {              
 
2045
                            char e [MPI_MAX_ERROR_STRING];
 
2046
                            int len = 0;
 
2047
                            memset (e, 0, MPI_MAX_ERROR_STRING);
 
2048
                            MPI_Error_string (err, e, &len);
 
2049
                            adios_error (err_write_error, 
 
2050
                                    "MPI method, rank %d: adios_close(): writing of attributes "
 
2051
                                    "of %llu bytes to file %s failed: '%s'\n",
 
2052
                                    md->rank, fd->bytes_written, fd->name, e);       
 
2053
                        }
 
2054
 
 
2055
                        MPI_Get_count (&md->status, MPI_BYTE, &count);
 
2056
                        if (count != fd->bytes_written)
 
2057
                        {
 
2058
                            log_warn ("MPI method, rank %d: adios_close() tried to write "
 
2059
                                      "%llu bytes of attributes but only wrote %d\n",
 
2060
                                      md->rank, fd->bytes_written, count);
 
2061
                        }
 
2062
                        fd->base_offset += count;
 
2063
                        fd->offset = 0;
 
2064
                        fd->bytes_written = 0;
 
2065
                        adios_shared_buffer_free (&md->b);
 
2066
 
 
2067
                        a = a->next;
 
2068
                    }
 
2069
                }
 
2070
 
 
2071
                // set it up so that it will start at 0, but have correct sizes
 
2072
                fd->offset = fd->base_offset - md->vars_start;
 
2073
                fd->vars_start = 0;
 
2074
                fd->buffer_size = 0;
 
2075
                adios_write_close_attributes_v1 (fd);
 
2076
                MPI_File_seek (md->fh, md->vars_start, MPI_SEEK_SET);
 
2077
                // fd->vars_start gets updated with the size written
 
2078
#if 0
 
2079
                err = MPI_File_write (md->fh, fd->buffer, md->vars_header_size
 
2080
                                     ,MPI_BYTE, &md->status
 
2081
                                     );
 
2082
#endif
 
2083
                {              
 
2084
                    uint64_t total_written = 0;
 
2085
                    uint64_t to_write = md->vars_header_size;
 
2086
                    int write_len = 0;
 
2087
                    int count;
 
2088
                    char * buf_ptr = fd->buffer;
 
2089
                    while (total_written < md->vars_header_size)
 
2090
                    {
 
2091
                        write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
 
2092
                        err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
 
2093
                        MPI_Get_count(&md->status, MPI_BYTE, &count);
 
2094
                        if (count != write_len)
 
2095
                        {
 
2096
                            err = count;
 
2097
                            break;
 
2098
                        }
 
2099
                        total_written += count;
 
2100
                        buf_ptr += count;
 
2101
                        to_write -= count;
 
2102
                        //err = total_written;
 
2103
                    }
 
2104
                }              
 
2105
                if (err != MPI_SUCCESS) 
 
2106
                {              
 
2107
                    char e [MPI_MAX_ERROR_STRING];
 
2108
                    int len = 0;
 
2109
                    memset (e, 0, MPI_MAX_ERROR_STRING);
 
2110
                    MPI_Error_string (err, e, &len);
 
2111
                    adios_error (err_write_error, 
 
2112
                            "MPI method, rank %d: adios_close(): writing of variable header "
 
2113
                            "of %llu bytes to file %s failed: '%s'\n",
 
2114
                            md->rank, md->vars_header_size, fd->name, e);       
 
2115
                }
 
2116
 
 
2117
                MPI_Get_count (&md->status, MPI_BYTE, &count);
 
2118
                if (count != md->vars_header_size)
 
2119
                {
 
2120
                    log_warn ("MPI method, rank %d: adios_close() tried to write %llu bytes "
 
2121
                              "of variable header but only wrote %d\n",
 
2122
                              md->rank, md->vars_header_size, count);
 
2123
                }
 
2124
                fd->offset = 0;
 
2125
                fd->bytes_written = 0;
 
2126
            }
 
2127
 
 
2128
            // build index appending to any existing index
 
2129
            adios_build_index_v1 (fd, &md->old_pg_root, &md->old_vars_root
 
2130
                                 ,&md->old_attrs_root
 
2131
                                 );
 
2132
            // if collective, gather the indexes from the rest and call
 
2133
            if (md->group_comm != MPI_COMM_NULL)
 
2134
            {
 
2135
                if (md->rank == 0)
 
2136
                {
 
2137
                    int * index_sizes = malloc (4 * md->size);
 
2138
                    int * index_offsets = malloc (4 * md->size);
 
2139
                    char * recv_buffer = 0;
 
2140
                    uint32_t size = 0;
 
2141
                    uint32_t total_size = 0;
 
2142
                    int i;
 
2143
 
 
2144
                    MPI_Gather (&size, 1, MPI_INT
 
2145
                               ,index_sizes, 1, MPI_INT
 
2146
                               ,0, md->group_comm
 
2147
                               );
 
2148
 
 
2149
                    for (i = 0; i < md->size; i++)
 
2150
                    {
 
2151
                        index_offsets [i] = total_size;
 
2152
                        total_size += index_sizes [i];
 
2153
                    }
 
2154
 
 
2155
                    recv_buffer = malloc (total_size);
 
2156
 
 
2157
                    MPI_Gatherv (&size, 0, MPI_BYTE
 
2158
                                ,recv_buffer, index_sizes, index_offsets
 
2159
                                ,MPI_BYTE, 0, md->group_comm
 
2160
                                );
 
2161
 
 
2162
                    char * buffer_save = md->b.buff;
 
2163
                    uint64_t buffer_size_save = md->b.length;
 
2164
                    uint64_t offset_save = md->b.offset;
 
2165
 
 
2166
                    for (i = 1; i < md->size; i++)
 
2167
                    {
 
2168
                        md->b.buff = recv_buffer + index_offsets [i];
 
2169
                        md->b.length = index_sizes [i];
 
2170
                        md->b.offset = 0;
 
2171
 
 
2172
                        adios_parse_process_group_index_v1 (&md->b
 
2173
                                                           ,&new_pg_root
 
2174
                                                           );
 
2175
                        adios_parse_vars_index_v1 (&md->b, &new_vars_root);
 
2176
                        // do not merge attributes from other processes from 1.4
 
2177
                        /*
 
2178
                        adios_parse_attributes_index_v1 (&md->b
 
2179
                                                        ,&new_attrs_root
 
2180
                                                        );
 
2181
                         */
 
2182
                        adios_merge_index_v1 (&md->old_pg_root
 
2183
                                             ,&md->old_vars_root
 
2184
                                             ,&md->old_attrs_root
 
2185
                                             ,new_pg_root, new_vars_root
 
2186
                                             ,new_attrs_root
 
2187
                                             );
 
2188
                        new_pg_root = 0;
 
2189
                        new_vars_root = 0;
 
2190
                        new_attrs_root = 0;
 
2191
                    }
 
2192
                    md->b.buff = buffer_save;
 
2193
                    md->b.length = buffer_size_save;
 
2194
                    md->b.offset = offset_save;
 
2195
 
 
2196
                    free (recv_buffer);
 
2197
                    free (index_sizes);
 
2198
                    free (index_offsets);
 
2199
                }
 
2200
                else
 
2201
                {
 
2202
                    adios_write_index_v1 (&buffer, &buffer_size, &buffer_offset
 
2203
                                         ,0, md->old_pg_root
 
2204
                                         ,md->old_vars_root
 
2205
                                         ,md->old_attrs_root
 
2206
                                         );
 
2207
 
 
2208
                    int _buffer_size = buffer_size;
 
2209
 
 
2210
                    MPI_Gather (&_buffer_size, 1, MPI_INT, 0, 0, MPI_INT
 
2211
                               ,0, md->group_comm
 
2212
                               );
 
2213
                    MPI_Gatherv (buffer, buffer_size, MPI_BYTE
 
2214
                                ,0, 0, 0, MPI_BYTE
 
2215
                                ,0, md->group_comm
 
2216
                                );
 
2217
                }
 
2218
            }
 
2219
 
 
2220
            if (fd->shared_buffer == adios_flag_yes)
 
2221
            {
 
2222
                if (fd->base_offset + fd->bytes_written > 
 
2223
                    fd->pg_start_in_file + fd->write_size_bytes) 
 
2224
                {
 
2225
                    adios_error (err_out_of_bound, 
 
2226
                            "MPI method, rank %d: size of buffered data exceeds pg bound.\n"
 
2227
                            "File is corrupted. Need to enlarge group size in adios_group_size().\n"
 
2228
                            "Group size=%llu, offset at end of variable buffer=%llu\n",
 
2229
                            md->rank, 
 
2230
                            fd->write_size_bytes,
 
2231
                            fd->base_offset - fd->pg_start_in_file + fd->bytes_written);
 
2232
                }
 
2233
                // everyone writes their data
 
2234
                MPI_File_seek (md->fh, fd->base_offset, MPI_SEEK_SET);
 
2235
#if 0
 
2236
                err = MPI_File_write (md->fh, fd->buffer, fd->bytes_written
 
2237
                                     ,MPI_BYTE, &md->status
 
2238
                                     );
 
2239
#endif
 
2240
                {              
 
2241
                    uint64_t total_written = 0;
 
2242
                    uint64_t to_write = fd->bytes_written;
 
2243
                    int write_len = 0;
 
2244
                    int count;
 
2245
                    char * buf_ptr = fd->buffer;
 
2246
                    while (total_written < fd->bytes_written)
 
2247
                    {
 
2248
                        write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
 
2249
                        err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
 
2250
                        MPI_Get_count(&md->status, MPI_BYTE, &count);
 
2251
                        if (count != write_len)
 
2252
                        {
 
2253
                            err = count;
 
2254
                            break;
 
2255
                        }
 
2256
                        total_written += count;
 
2257
                        buf_ptr += count;
 
2258
                        to_write -= count;
 
2259
                        //err = total_written;
 
2260
                    }
 
2261
                }              
 
2262
                if (err != MPI_SUCCESS) 
 
2263
                {              
 
2264
                    char e [MPI_MAX_ERROR_STRING];
 
2265
                    int len = 0;
 
2266
                    memset (e, 0, MPI_MAX_ERROR_STRING);
 
2267
                    MPI_Error_string (err, e, &len);
 
2268
                    adios_error (err_write_error, 
 
2269
                            "MPI method, rank %d: adios_close(): writing of buffered data "
 
2270
                            "of %llu bytes to file %s failed: '%s'\n",
 
2271
                            md->rank, fd->bytes_written, fd->name, e);       
 
2272
                }
 
2273
            }
 
2274
 
 
2275
            if (md->rank == 0)
 
2276
            {
 
2277
                adios_write_index_v1 (&buffer, &buffer_size, &buffer_offset
 
2278
                                     ,index_start, md->old_pg_root
 
2279
                                     ,md->old_vars_root
 
2280
                                     ,md->old_attrs_root
 
2281
                                     );
 
2282
                adios_write_version_v1 (&buffer, &buffer_size, &buffer_offset);
 
2283
 
 
2284
                MPI_File_seek (md->fh, md->b.pg_index_offset, MPI_SEEK_SET);
 
2285
#if 0
 
2286
                err = MPI_File_write (md->fh, buffer, buffer_offset, MPI_BYTE
 
2287
                                     ,&md->status
 
2288
                                     );
 
2289
#endif
 
2290
                {              
 
2291
                    uint64_t total_written = 0;
 
2292
                    uint64_t to_write = buffer_offset;
 
2293
                    int write_len = 0;
 
2294
                    int count;
 
2295
                    char * buf_ptr = buffer;
 
2296
                    while (total_written < buffer_offset)
 
2297
                    {
 
2298
                        write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
 
2299
                        err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
 
2300
                        MPI_Get_count(&md->status, MPI_BYTE, &count);
 
2301
                        if (count != write_len)
 
2302
                        {
 
2303
                            log_error ("MPI method, rank %d: Need to do multi-write 2 (tried: "
 
2304
                                    "%d wrote: %d) errno %d\n",
 
2305
                                    md->rank, write_len, count, errno);
 
2306
                            err = count;
 
2307
                            break;
 
2308
                        }
 
2309
                        total_written += count;
 
2310
                        buf_ptr += count;
 
2311
                        to_write -= count;
 
2312
                        //err = total_written;
 
2313
                    }
 
2314
                }              
 
2315
                if (err != MPI_SUCCESS) 
 
2316
                {              
 
2317
                    char e [MPI_MAX_ERROR_STRING];
 
2318
                    int len = 0;
 
2319
                    memset (e, 0, MPI_MAX_ERROR_STRING);
 
2320
                    MPI_Error_string (err, e, &len);
 
2321
                    adios_error (err_write_error, 
 
2322
                            "MPI method, rank %d: adios_close(): writing of index data "
 
2323
                            "of %llu bytes to file %s failed: '%s'\n",
 
2324
                            md->rank, buffer_offset, fd->name, e);       
 
2325
                }
 
2326
            }
 
2327
 
 
2328
            free (buffer);
 
2329
 
 
2330
            adios_clear_index_v1 (new_pg_root, new_vars_root, new_attrs_root);
 
2331
            adios_clear_index_v1 (md->old_pg_root, md->old_vars_root
 
2332
                                 ,md->old_attrs_root
 
2333
                                 );
 
2334
            new_pg_root = 0;
 
2335
            new_vars_root = 0;
 
2336
            new_attrs_root = 0;
 
2337
            md->old_pg_root = 0;
 
2338
            md->old_vars_root = 0;
 
2339
            md->old_attrs_root = 0;
 
2340
 
 
2341
            break;
 
2342
        }
 
2343
 
 
2344
        default:
 
2345
        {
 
2346
            adios_error (err_invalid_file_mode,
 
2347
                    "MPI method: Unknown file mode: %d in adios_close()\n", 
 
2348
                    fd->mode);
 
2349
        }
 
2350
    }
 
2351
 
 
2352
    if (md && md->fh)
 
2353
    {
 
2354
#if COLLECT_METRICS
 
2355
        MPI_File_sync (md->fh);
 
2356
#endif
 
2357
        MPI_File_close (&md->fh);
 
2358
    }
 
2359
 
 
2360
#if COLLECT_METRICS
 
2361
    gettimeofday (&timing.t28, NULL);
 
2362
    print_metrics (md, iteration++);
 
2363
#endif
 
2364
    if (   md->group_comm != MPI_COMM_WORLD
 
2365
        && md->group_comm != MPI_COMM_SELF
 
2366
        && md->group_comm != MPI_COMM_NULL
 
2367
       )
 
2368
    {
 
2369
        md->group_comm = MPI_COMM_NULL;
 
2370
    }
 
2371
 
 
2372
    md->fh = 0;
 
2373
    md->req = 0;
 
2374
    memset (&md->status, 0, sizeof (MPI_Status));
 
2375
    md->group_comm = MPI_COMM_NULL;
 
2376
 
 
2377
    adios_clear_index_v1 (md->old_pg_root, md->old_vars_root
 
2378
                         ,md->old_attrs_root
 
2379
                         );
 
2380
    md->old_pg_root = 0;
 
2381
    md->old_vars_root = 0;
 
2382
    md->old_attrs_root = 0;
 
2383
}
 
2384
 
 
2385
void adios_mpi_finalize (int mype, struct adios_method_struct * method)
 
2386
{
 
2387
    struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
 
2388
                                                    method->method_data;
 
2389
    if (adios_mpi_initialized)
 
2390
    {
 
2391
        adios_mpi_initialized = 0;
 
2392
        MPI_Info_free (&md->info);
 
2393
    }
 
2394
}
 
2395
 
 
2396
void adios_mpi_end_iteration (struct adios_method_struct * method)
 
2397
{
 
2398
}
 
2399
 
 
2400
void adios_mpi_start_calculation (struct adios_method_struct * method)
 
2401
{
 
2402
}
 
2403
 
 
2404
void adios_mpi_stop_calculation (struct adios_method_struct * method)
 
2405
{
 
2406
}