2
* ADIOS is freely available under the terms of the BSD license described
3
* in the COPYING file in the top level directory of this source distribution.
5
* Copyright (c) 2008 - 2009. UT-BATTELLE, LLC. All rights reserved.
13
#if defined(__APPLE__)
14
# include <sys/param.h>
15
# include <sys/mount.h>
19
#include <sys/ioctl.h>
25
#include "public/adios_mpi.h"
26
#include "public/adios_error.h"
27
#include "core/adios_transport_hooks.h"
28
#include "core/adios_bp_v1.h"
29
#include "core/adios_internals.h"
30
#include "core/buffer.h"
31
#include "core/util.h"
32
#include "core/adios_logger.h"
37
static int adios_mpi_initialized = 0;
39
#define COLLECT_METRICS 0
42
struct adios_MPI_data_struct
48
MPI_Info info; // set with base hints for Lustre
52
struct adios_bp_buffer_struct_v1 b;
54
struct adios_index_process_group_struct_v1 * old_pg_root;
55
struct adios_index_var_struct_v1 * old_vars_root;
56
struct adios_index_attribute_struct_v1 * old_attrs_root;
59
uint64_t vars_header_size;
60
uint16_t storage_targets; // number of storage targets being used
64
// see adios_adaptive_finalize for what each represents
67
struct timeval t; // time value
68
int pid; // process id
72
int timeval_writer_compare (const void * left, const void * right)
74
struct timeval_writer * l = (struct timeval_writer *) left;
75
struct timeval_writer * r = (struct timeval_writer *) right;
77
if (l->pid < r->pid) return -1;
78
if (l->pid == r->pid) return 0;
79
if (l->pid > r->pid) return 1;
84
struct timeval t0, t5, t6, t7, t8, t11, t12, t13;
85
struct timeval t14, t16, t19, t20, t21, t22, t23;
86
struct timeval t27, t28;
88
uint64_t write_count; // number used
89
uint64_t write_size; // number allocated
93
static struct timing_metrics timing;
95
// Subtract the `struct timeval' values X and Y,
96
// storing the result in RESULT.
97
// Return 1 if the difference is negative, otherwise 0.
98
static int timeval_subtract (struct timeval * result
99
,struct timeval * x, struct timeval * y
102
// Perform the carry for the later subtraction by updating y.
103
if (x->tv_usec < y->tv_usec)
105
int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;
106
y->tv_usec -= 1000000 * nsec;
109
if (x->tv_usec - y->tv_usec > 1000000)
111
int nsec = (x->tv_usec - y->tv_usec) / 1000000;
112
y->tv_usec += 1000000 * nsec;
116
// Compute the time remaining to wait.
117
// tv_usec is certainly positive.
118
result->tv_sec = x->tv_sec - y->tv_sec;
119
result->tv_usec = x->tv_usec - y->tv_usec;
121
// Return 1 if result is negative.
122
return x->tv_sec < y->tv_sec;
125
static void print_metric (FILE * f, struct timing_metrics * t, int iteration, int rank, int size, int sub_coord_rank);
128
void print_metrics (struct adios_MPI_data_struct * md, int iteration)
130
MPI_Barrier (md->group_comm);
134
struct timing_metrics * t;
135
int * sub_coord_ranks;
137
t = malloc (sizeof (struct timing_metrics) * md->size);
138
sub_coord_ranks = malloc (sizeof (int) * md->size);
140
memcpy (&t [0], &timing, sizeof (struct timing_metrics));
143
MPI_Gather (&timing, sizeof (struct timing_metrics), MPI_BYTE
144
,t, sizeof (struct timing_metrics), MPI_BYTE
148
// get the write timing
149
int * index_sizes = malloc (4 * md->size);
150
int * index_offsets = malloc (4 * md->size);
151
uint32_t total_size = 0;
152
char * recv_buffer = 0;
153
char * recv_buffer1 = 0;
154
char * recv_buffer2 = 0;
155
char * recv_buffer3 = 0;
156
for (i = 0; i < md->size; i++)
158
index_sizes [i] = t [i].write_count * sizeof (struct timeval);
159
index_offsets [i] = total_size;
160
total_size += t [i].write_count * sizeof (struct timeval);
163
recv_buffer = malloc (total_size + 1);
165
MPI_Gatherv (t [0].t24, 0, MPI_BYTE
166
,recv_buffer, index_sizes, index_offsets, MPI_BYTE
170
t [0].t24 = timing.t24;
171
for (i = 1; i < md->size; i++)
173
t [i].t24 = (struct timeval *) (recv_buffer + index_offsets [i]);
176
// print the detailed metrics
177
FILE * f = fopen ("adios_metrics", "a");
178
for (i = 0; i < md->size; i++)
180
print_metric (f, &t [i], iteration, i, md->size
186
free (sub_coord_ranks);
193
free (index_offsets);
197
// send the bulk data
198
MPI_Gather (&timing, sizeof (struct timing_metrics), MPI_BYTE
199
,&timing, sizeof (struct timing_metrics), MPI_BYTE
203
// send the write timing
204
MPI_Gatherv (timing.t24, timing.write_count * sizeof (struct timeval)
210
MPI_Barrier (md->group_comm);
213
static void print_metric (FILE * f, struct timing_metrics * t, int iteration, int rank, int size, int sub_coord_rank)
218
timeval_subtract (&diff, &t->t6, &t->t5);
219
fprintf (f, "dd\t%2d\tMass file open:\t%02d.%06d\n"
220
,iteration, diff.tv_sec, diff.tv_usec);
222
timeval_subtract (&diff, &t->t5, &t->t16);
223
fprintf (f, "ee\t%2d\tBuild file offsets:\t%02d.%06d\n"
224
,iteration, diff.tv_sec, diff.tv_usec);
226
timeval_subtract (&diff, &t->t11, &t->t0);
227
fprintf (f, "hh\t%2d\tTotal time:\t%02d.%06d\n"
228
,iteration, diff.tv_sec, diff.tv_usec);
231
timeval_subtract (&diff, &t->t13, &t->t12);
232
fprintf (f, "ii\t%2d\tLocal index creation:\t%6d\t%02d.%06d\n"
233
,iteration, rank, diff.tv_sec, diff.tv_usec);
235
timeval_subtract (&diff, &t->t22, &t->t21);
236
fprintf (f, "kk\t%2d\tshould buffer time:\t%6d\t%02d.%06d\n"
237
,iteration, rank, diff.tv_sec, diff.tv_usec);
239
timeval_subtract (&diff, &t->t19, &t->t23);
240
fprintf (f, "ll\t%2d\tclose startup time:\t%6d\t%02d.%06d\n"
241
,iteration, rank, diff.tv_sec, diff.tv_usec);
243
timeval_subtract (&diff, &t->t19, &t->t0);
244
fprintf (f, "mm\t%2d\tsetup time:\t%6d\t%02d.%06d\n"
245
,iteration, rank, diff.tv_sec, diff.tv_usec);
247
timeval_subtract (&diff, &t->t14, &t->t20);
248
fprintf (f, "nn\t%2d\tcleanup time:\t%6d\t%02d.%06d\n"
249
,iteration, rank, diff.tv_sec, diff.tv_usec);
251
timeval_subtract (&diff, &t->t21, &t->t0);
252
fprintf (f, "oo\t%2d\topen->should_buffer time:\t%6d\t%02d.%06d\n"
253
,iteration, rank, diff.tv_sec, diff.tv_usec);
255
timeval_subtract (&diff, &(t->t24 [0]), &t->t21);
256
fprintf (f, "pp\t%2d\tshould_buffer->write1 time:\t%6d\t%02d.%06d\n"
257
,iteration, rank, diff.tv_sec, diff.tv_usec);
259
fprintf (f, "aa\t%2d\tprocess write time:\t%6d\t%02d.%06d\n"
260
,iteration, rank, t->t8.tv_sec, t->t8.tv_usec);
262
timeval_subtract (&diff, &t->t11, &t->t23);
263
fprintf (f, "vv\t%2d\tclose start to shutdown time:\t%6d\t%02d.%06d\n"
264
,iteration, rank, (int)diff.tv_sec, (int)diff.tv_usec);
266
timeval_subtract (&diff, &t->t28, &t->t23);
267
fprintf (f, "ww\t%2d\tclose total time:\t%6d\t%02d.%06d\n"
268
,iteration, rank, (int)diff.tv_sec, (int)diff.tv_usec);
273
void adios_mpi_init (const PairStruct * parameters
274
,struct adios_method_struct * method
277
struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
279
if (!adios_mpi_initialized)
281
adios_mpi_initialized = 1;
283
method->method_data = malloc (sizeof (struct adios_MPI_data_struct));
284
md = (struct adios_MPI_data_struct *) method->method_data;
287
memset (&md->status, 0, sizeof (MPI_Status));
288
MPI_Info_create (&md->info);
289
MPI_Info_set (md->info, "romio_ds_read", "disable");
290
MPI_Info_set (md->info, "romio_ds_write", "disable");
291
MPI_Info_set (md->info, "ind_wr_buffer_size", "16777216");
294
md->group_comm = method->init_comm; // unused here, adios_open will set the current comm
296
md->old_vars_root = 0;
297
md->old_attrs_root = 0;
299
md->vars_header_size = 0;
300
md->storage_targets = 0;
302
adios_buffer_struct_init (&md->b);
304
// init the pointer for the first go around avoiding the bad free in open
309
int adios_mpi_open (struct adios_file_struct * fd
310
,struct adios_method_struct * method, MPI_Comm comm
313
struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
317
gettimeofday (&timing.t0, NULL); // only used on rank == size - 1, but we don't
318
// have the comm yet to get the rank/size
320
adios_buffer_struct_clear (&md->b);
322
md->group_comm = comm;
323
if (md->group_comm != MPI_COMM_NULL)
325
MPI_Comm_rank (md->group_comm, &md->rank);
326
MPI_Comm_size (md->group_comm, &md->size);
328
fd->group->process_id = md->rank;
331
timing.write_count = 0;
332
timing.write_size = 0;
333
if (timing.t24) free (timing.t24);
337
// we have to wait for the group_size (should_buffer)
338
// to calculate stripe sizes from output sizes of the processes
339
// before we can do an open for any of the modes
345
void build_offsets (struct adios_bp_buffer_struct_v1 * b
346
,MPI_Offset * offsets, int size, char * group_name
347
,struct adios_index_process_group_struct_v1 * pg_root
352
if (!strcasecmp (pg_root->group_name, group_name))
358
size = pg_root->next->offset_in_file - pg_root->offset_in_file;
362
size = b->pg_index_offset - pg_root->offset_in_file;
365
offsets [pg_root->process_id * 3] = pg_root->offset_in_file;
366
offsets [pg_root->process_id * 3 + 1] = size;
367
offsets [pg_root->process_id * 3 + 2] = b->version;
370
pg_root = pg_root->next;
375
adios_mpi_build_file_offset(struct adios_MPI_data_struct *md,
376
struct adios_file_struct *fd, char *name)
378
if (md->group_comm != MPI_COMM_NULL)
382
// make one space for offset and one for size
383
MPI_Offset * offsets = malloc(sizeof (MPI_Offset)
387
offsets [0] = fd->write_size_bytes;
388
// mpixlc_r on Eugene doesn't support 64 bit mode. Therefore the following may have problem
389
// on Eugene for large data size since MPI_LONG_LONG is 32bit
390
MPI_Gather (&(fd->write_size_bytes), 1, MPI_LONG_LONG
391
,offsets, 1, MPI_LONG_LONG
394
// top section: make things a consistent stripe size
395
// bottom section: just pack the file
397
// find the largest and use that as a basis for stripe
398
// size for each process writing
399
uint64_t biggest_size = 0;
400
for (i = 0; i < md->size; i++)
402
if (offsets [i] > biggest_size)
403
biggest_size = offsets [i];
405
// now round up to the next stripe size increment (Lustre: 64 KB)
406
#define STRIPE_INCREMENT (64 * 1024)
407
// (according to the Lustre reps, use 1 MB instead of 64 KB?)
408
//#define STRIPE_INCREMENT (1024 * 1024)
409
if (biggest_size % (STRIPE_INCREMENT))
411
biggest_size = ( ((biggest_size / STRIPE_INCREMENT) + 1)
415
// also round up the base_offset, just in case
416
if (fd->base_offset % (STRIPE_INCREMENT))
418
fd->base_offset = ( ((fd->base_offset / STRIPE_INCREMENT) + 1)
422
#undef STRIPE_INCREMENT
423
offsets [0 + 0] = fd->base_offset;
424
offsets [0 + 1] = biggest_size;
426
for (i = 1; i < md->size; i++)
428
offsets [i * 2 + 0] = offsets [(i - 1) * 2 + 0] + biggest_size;
429
offsets [i * 2 + 1] = biggest_size;
431
md->b.pg_index_offset = offsets [(md->size - 1) * 2 + 0]
435
uint64_t last_offset = offsets [0];
436
offsets [0] = fd->base_offset;
437
for (i = 1; i < md->size; i++)
439
uint64_t this_offset = offsets [i];
440
offsets [i] = offsets [i - 1] + last_offset;
441
last_offset = this_offset;
443
md->b.pg_index_offset = offsets [md->size - 1]
446
MPI_Scatter (offsets, 1, MPI_LONG_LONG
447
,MPI_IN_PLACE, 1, MPI_LONG_LONG
450
fd->base_offset = offsets [0];
451
fd->pg_start_in_file = fd->base_offset;
456
MPI_Offset offset[1];
457
offset[0] = fd->write_size_bytes;
459
MPI_Gather (offset, 1, MPI_LONG_LONG
464
MPI_Scatter (0, 1, MPI_LONG_LONG
465
,offset, 1, MPI_LONG_LONG
468
fd->base_offset = offset [0];
469
fd->pg_start_in_file = fd->base_offset;
474
md->b.pg_index_offset = fd->write_size_bytes;
480
// from /usr/include/lustre/lustre_user.h
481
#define LUSTRE_SUPER_MAGIC 0x0BD00BD0
482
# define LOV_USER_MAGIC 0x0BD10BD0
483
# define LL_IOC_LOV_SETSTRIPE _IOW ('f', 154, long)
484
# define LL_IOC_LOV_GETSTRIPE _IOW ('f', 155, long)
485
struct lov_user_ost_data { // per-stripe data structure
486
uint64_t l_object_id; // OST object ID
487
uint64_t l_object_gr; // OST object group (creating MDS number)
488
uint32_t l_ost_gen; // generation of this OST index
489
uint32_t l_ost_idx; // OST index in LOV
490
} __attribute__((packed));
491
struct lov_user_md { // LOV EA user data (host-endian)
492
uint32_t lmm_magic; // magic number = LOV_USER_MAGIC_V1
493
uint32_t lmm_pattern; // LOV_PATTERN_RAID0, LOV_PATTERN_RAID1
494
uint64_t lmm_object_id; // LOV object ID
495
uint64_t lmm_object_gr; // LOV object group
496
uint32_t lmm_stripe_size; // size of stripe in bytes
497
uint16_t lmm_stripe_count; // num stripes in use for this object
498
uint16_t lmm_stripe_offset; // starting stripe offset in lmm_objects
499
struct lov_user_ost_data lmm_objects[0]; // per-stripe data
500
} __attribute__((packed));
502
// do the magic ioctl calls to set Lustre's stripe size
503
static void set_stripe_size (struct adios_file_struct * fd
504
,struct adios_MPI_data_struct * md
505
,const char * filename
511
// Note: Since each file might have different write_buffer,
512
// So we will reset write_buffer even buffer_size != 0
513
err = statfs (filename, &fsbuf);
514
if (!err && fsbuf.f_type == LUSTRE_SUPER_MAGIC)
520
old_mask = umask (022);
522
perm = old_mask ^ 0666;
524
f = open (filename, O_RDONLY | O_CREAT, perm);
527
struct lov_user_md lum;
528
lum.lmm_magic = LOV_USER_MAGIC;
529
lum.lmm_stripe_size = md->biggest_size;
530
err = ioctl (f, LL_IOC_LOV_SETSTRIPE, (void *) &lum);
531
// if err != 0, the must not be Lustre
532
lum.lmm_stripe_count = 0;
533
err = ioctl (f, LL_IOC_LOV_GETSTRIPE, (void *) &lum);
534
// if err != 0, the must not be Lustre
537
md->storage_targets = lum.lmm_stripe_count;
545
enum ADIOS_FLAG adios_mpi_should_buffer (struct adios_file_struct * fd
546
,struct adios_method_struct * method
550
struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
554
int flag; // used for coordinating the MPI_File_open
561
gettimeofday (&timing.t21, NULL);
564
name = malloc (strlen (method->base_path) + strlen (fd->name) + 1);
565
sprintf (name, "%s%s", method->base_path, fd->name);
567
if (md->rank == md->size - 1)
571
previous = md->rank - 1;
578
case adios_mode_read:
580
if (md->group_comm == MPI_COMM_NULL || md->rank == 0)
582
err = MPI_File_open (MPI_COMM_SELF, name, MPI_MODE_RDONLY
585
if (err != MPI_SUCCESS)
587
char e [MPI_MAX_ERROR_STRING];
589
memset (e, 0, MPI_MAX_ERROR_STRING);
590
MPI_Error_string (err, e, &len);
591
adios_error (err_file_open_error,
592
"MPI method: open read failed for %s: '%s'\n",
596
return adios_flag_no;
599
MPI_Offset file_size;
600
MPI_File_get_size (md->fh, &file_size);
601
md->b.file_size = file_size;
603
adios_init_buffer_read_version (&md->b);
604
MPI_File_seek (md->fh, md->b.file_size - md->b.length
607
MPI_File_read (md->fh, md->b.buff, md->b.length, MPI_BYTE
610
adios_parse_version (&md->b, &md->b.version);
612
adios_init_buffer_read_index_offsets (&md->b);
613
// already in the buffer
614
adios_parse_index_offsets_v1 (&md->b);
616
adios_init_buffer_read_process_group_index (&md->b);
617
MPI_File_seek (md->fh, md->b.pg_index_offset
620
MPI_File_read (md->fh, md->b.buff, md->b.pg_size, MPI_BYTE
623
adios_parse_process_group_index_v1 (&md->b
628
adios_init_buffer_read_vars_index (&md->b);
629
MPI_File_seek (md->fh, md->b.vars_index_offset
632
MPI_File_read (md->fh, md->b.buff, md->b.vars_size, MPI_BYTE
635
adios_parse_vars_index_v1 (&md->b, &md->old_vars_root);
637
adios_init_buffer_read_attributes_index (&md->b);
638
MPI_File_seek (md->fh, md->b.attrs_index_offset
641
MPI_File_read (md->fh, md->b.buff, md->b.attrs_size, MPI_BYTE
644
adios_parse_attributes_index_v1 (&md->b, &md->old_attrs_root);
647
fd->base_offset = md->b.end_of_pgs;
650
if ( md->group_comm != MPI_COMM_NULL
651
&& md->group_comm != MPI_COMM_SELF
656
MPI_Offset * offsets = malloc ( sizeof (MPI_Offset)
659
memset (offsets, 0, sizeof (MPI_Offset) * md->size * 3);
661
// go through the pg index to build the offsets array
662
build_offsets (&md->b, offsets, md->size
663
,fd->group->name, md->old_pg_root
665
MPI_Scatter (offsets, 3, MPI_LONG_LONG
666
,MPI_IN_PLACE, 3, MPI_LONG_LONG
669
md->b.read_pg_offset = offsets [0];
670
md->b.read_pg_size = offsets [1];
675
MPI_Offset offset [3];
676
offset [0] = offset [1] = offset [2] = 0;
678
MPI_Scatter (0, 3, MPI_LONG_LONG
679
,offset, 3, MPI_LONG_LONG
683
md->b.read_pg_offset = offset [0];
684
md->b.read_pg_size = offset [1];
685
md->b.version = offset [2];
689
// cascade the opens to avoid trashing the metadata server
692
// note rank 0 is already open
693
// don't open it again here
697
MPI_Isend (&flag, 1, MPI_INT, next, current
698
,md->group_comm, &md->req
704
MPI_Recv (&flag, 1, MPI_INT, previous, previous
705
,md->group_comm, &md->status
709
MPI_Isend (&flag, 1, MPI_INT, next, current
710
,md->group_comm, &md->req
713
err = MPI_File_open (MPI_COMM_SELF, name
720
if (err != MPI_SUCCESS)
722
char e [MPI_MAX_ERROR_STRING];
724
memset (e, 0, MPI_MAX_ERROR_STRING);
725
MPI_Error_string (err, e, &len);
726
adios_error (err_file_open_error,
727
"MPI method, rank %d: open read failed for %s: '%s'\n",
731
return adios_flag_no;
737
case adios_mode_write:
740
fd->pg_start_in_file = 0;
742
gettimeofday (&timing.t16, NULL);
745
// figure out the offsets and create the file with proper striping
746
// before the MPI_File_open is called
747
adios_mpi_build_file_offset (md, fd, name);
748
//set_stripe_size (fd, md, name);
750
gettimeofday (&timing.t5, NULL);
753
// cascade the opens to avoid trashing the metadata server
756
MPI_File_delete (name, MPI_INFO_NULL); // make sure clean
758
err = MPI_File_open (MPI_COMM_SELF, name
759
,MPI_MODE_WRONLY | MPI_MODE_CREATE
765
MPI_Isend (&flag, 1, MPI_INT, next, current
766
,md->group_comm, &md->req
772
MPI_Recv (&flag, 1, MPI_INT, previous, previous
773
,md->group_comm, &md->status
777
MPI_Isend (&flag, 1, MPI_INT, next, current
778
,md->group_comm, &md->req
781
err = MPI_File_open (MPI_COMM_SELF, name
788
if (err != MPI_SUCCESS)
790
char e [MPI_MAX_ERROR_STRING];
792
memset (e, 0, MPI_MAX_ERROR_STRING);
793
MPI_Error_string (err, e, &len);
794
adios_error (err_file_open_error,
795
"MPI method, rank %d: open write failed for %s: '%s'\n",
799
return adios_flag_no;
802
gettimeofday (&timing.t6, NULL);
808
case adios_mode_append:
809
case adios_mode_update:
813
if (md->group_comm == MPI_COMM_NULL || md->rank == 0)
815
err = MPI_File_open (MPI_COMM_SELF, name, MPI_MODE_RDONLY
819
if (err != MPI_SUCCESS)
822
MPI_File_close (&md->fh);
823
err = MPI_File_open (MPI_COMM_SELF, name
824
,MPI_MODE_WRONLY | MPI_MODE_CREATE
827
if (err != MPI_SUCCESS)
829
char e [MPI_MAX_ERROR_STRING];
831
memset (e, 0, MPI_MAX_ERROR_STRING);
832
MPI_Error_string (err, e, &len);
833
adios_error (err_file_open_error,
834
"MPI method, rank %d: open for append failed for %s: '%s'\n",
838
return adios_flag_no;
841
MPI_Bcast (&old_file, 1, MPI_INT, 0, md->group_comm);
845
if (md->group_comm != MPI_COMM_NULL)
846
MPI_Bcast (&old_file, 1, MPI_INT, 0, md->group_comm);
851
if (md->group_comm == MPI_COMM_NULL || md->rank == 0)
853
if (err != MPI_SUCCESS)
859
MPI_Offset file_size;
860
MPI_File_get_size (md->fh, &file_size);
861
md->b.file_size = file_size;
864
adios_init_buffer_read_version (&md->b);
865
MPI_File_seek (md->fh, md->b.file_size - md->b.length
868
MPI_File_read (md->fh, md->b.buff, md->b.length, MPI_BYTE
871
adios_parse_version (&md->b, &md->b.version);
873
adios_init_buffer_read_index_offsets (&md->b);
874
// already in the buffer
875
adios_parse_index_offsets_v1 (&md->b);
877
adios_init_buffer_read_process_group_index (&md->b);
878
MPI_File_seek (md->fh, md->b.pg_index_offset
881
MPI_File_read (md->fh, md->b.buff, md->b.pg_size, MPI_BYTE
885
adios_parse_process_group_index_v1 (&md->b
889
// find the largest time index so we can append properly
890
struct adios_index_process_group_struct_v1 * p;
891
uint32_t max_time_index = 0;
895
if (p->time_index > max_time_index)
896
max_time_index = p->time_index;
899
fd->group->time_index = ++max_time_index;
900
MPI_Bcast (&fd->group->time_index, 1, MPI_INT, 0
904
adios_init_buffer_read_vars_index (&md->b);
905
MPI_File_seek (md->fh, md->b.vars_index_offset
908
MPI_File_read (md->fh, md->b.buff, md->b.vars_size, MPI_BYTE
911
adios_parse_vars_index_v1 (&md->b, &md->old_vars_root);
913
adios_init_buffer_read_attributes_index (&md->b);
914
MPI_File_seek (md->fh, md->b.attrs_index_offset
917
MPI_File_read (md->fh, md->b.buff, md->b.attrs_size
918
,MPI_BYTE, &md->status
920
adios_parse_attributes_index_v1 (&md->b
924
fd->base_offset = md->b.end_of_pgs;
925
fd->pg_start_in_file = fd->base_offset;
930
fd->pg_start_in_file = 0;
931
MPI_Bcast (&fd->group->time_index, 1, MPI_INT, 0
936
MPI_File_close (&md->fh);
941
fd->pg_start_in_file = 0;
944
MPI_File_close (&md->fh);
947
// figure out the offsets and create the file with proper striping
948
// before the MPI_File_open is called
949
adios_mpi_build_file_offset (md, fd, name);
950
//set_stripe_size (fd, md, name);
952
// cascade the opens to avoid trashing the metadata server
955
// we know it exists, because we created it if it didn't
956
// when reading the old file so can just open wronly
957
// but adding the create for consistency with write mode
958
// so it is easier to merge write/append later
959
err = MPI_File_open (MPI_COMM_SELF, name
960
,MPI_MODE_WRONLY | MPI_MODE_CREATE
966
MPI_Isend (&flag, 1, MPI_INT, next, current
967
,md->group_comm, &md->req
973
MPI_Recv (&flag, 1, MPI_INT, previous, previous
974
,md->group_comm, &md->status
978
MPI_Isend (&flag, 1, MPI_INT, next, current
979
,md->group_comm, &md->req
982
err = MPI_File_open (MPI_COMM_SELF, name
989
if (err != MPI_SUCCESS)
991
char e [MPI_MAX_ERROR_STRING];
993
memset (e, 0, MPI_MAX_ERROR_STRING);
994
MPI_Error_string (err, e, &len);
995
adios_error (err_file_open_error,
996
"MPI method, rank %d: open for append failed for %s: '%s'\n",
1000
return adios_flag_no;
1008
adios_error (err_invalid_file_mode,
1009
"MPI method: Unknown file mode requested: %d\n",
1014
return adios_flag_no;
1020
if (fd->shared_buffer == adios_flag_no && fd->mode != adios_mode_read)
1023
// write the process group header
1024
adios_write_process_group_header_v1 (fd, fd->write_size_bytes);
1026
MPI_File_seek (md->fh, fd->base_offset, MPI_SEEK_SET);
1028
err = MPI_File_write (md->fh, fd->buffer, fd->bytes_written, MPI_BYTE
1033
uint64_t total_written = 0;
1034
uint64_t to_write = fd->bytes_written;
1037
char * buf_ptr = fd->buffer;
1038
while (total_written < fd->bytes_written)
1040
write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
1041
err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
1042
MPI_Get_count(&md->status, MPI_BYTE, &count);
1043
if (count != write_len)
1048
total_written += count;
1051
//err = total_written;
1054
if (err != MPI_SUCCESS)
1056
char e [MPI_MAX_ERROR_STRING];
1058
memset (e, 0, MPI_MAX_ERROR_STRING);
1059
MPI_Error_string (err, e, &len);
1060
adios_error (err_write_error,
1061
"MPI method, rank %d: adios_group_size() failed to "
1062
"write header to %s: '%s'\n",
1063
md->rank, fd->name, e);
1066
return adios_flag_no;
1070
MPI_Get_count (&md->status, MPI_BYTE, &count);
1071
if (count != fd->bytes_written)
1073
log_warn ("a:MPI method tried to write %llu, only wrote %llu\n",
1074
fd->bytes_written, count);
1076
fd->base_offset += count;
1078
fd->bytes_written = 0;
1079
adios_shared_buffer_free (&md->b);
1081
// setup for writing vars
1082
adios_write_open_vars_v1 (fd);
1083
md->vars_start = fd->base_offset;
1084
md->vars_header_size = fd->offset;
1085
fd->base_offset += fd->offset;
1086
MPI_File_seek (md->fh, md->vars_header_size, MPI_SEEK_CUR);
1088
fd->bytes_written = 0;
1089
adios_shared_buffer_free (&md->b);
1093
gettimeofday (&timing.t22, NULL);
1095
return fd->shared_buffer;
1098
void adios_mpi_write (struct adios_file_struct * fd
1099
,struct adios_var_struct * v
1101
,struct adios_method_struct * method
1104
struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
1105
method->method_data;
1107
if (v->got_buffer == adios_flag_yes)
1109
if (data != v->data) // if the user didn't give back the same thing
1111
if (v->free_data == adios_flag_yes)
1114
adios_method_buffer_free (v->data_size);
1119
// we already saved all of the info, so we're ok.
1124
if (fd->shared_buffer == adios_flag_no)
1127
// var payload sent for sizing information
1128
adios_write_var_header_v1 (fd, v);
1131
err = MPI_File_write (md->fh, fd->buffer, fd->bytes_written
1132
,MPI_BYTE, &md->status
1136
uint64_t total_written = 0;
1137
uint64_t to_write = fd->bytes_written;
1140
char * buf_ptr = fd->buffer;
1141
while (total_written < fd->bytes_written)
1143
write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
1144
err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
1145
MPI_Get_count(&md->status, MPI_BYTE, &count);
1146
if (count != write_len)
1151
total_written += count;
1154
//err = total_written;
1157
if (err != MPI_SUCCESS)
1159
char e [MPI_MAX_ERROR_STRING];
1161
memset (e, 0, MPI_MAX_ERROR_STRING);
1162
MPI_Error_string (err, e, &len);
1163
adios_error (err_write_error,
1164
"MPI method, rank %d: adios_write() of header of variable %s to "
1165
"file %s failed: '%s'\n ",
1166
md->rank, v->name, fd->name, e);
1170
MPI_Get_count (&md->status, MPI_BYTE, &count);
1171
if (count != fd->bytes_written)
1173
log_warn ("MPI method, rank %d: tried to write %llu bytes, "
1174
"only wrote %d of header of variable %s\n",
1175
md->rank, fd->bytes_written, count, v->name);
1177
fd->base_offset += count;
1179
fd->bytes_written = 0;
1180
adios_shared_buffer_free (&md->b);
1183
// adios_write_var_payload_v1 (fd, v);
1184
uint64_t var_size = adios_get_var_size (v, fd->group, v->data);
1186
if (fd->base_offset + var_size > fd->pg_start_in_file + fd->write_size_bytes)
1187
adios_error (err_out_of_bound,
1188
"MPI method, rank %d: adios_write() of variable %s exceeds pg bound.\n"
1189
"File is corrupted. Need to enlarge group size in adios_group_size().\n"
1190
"Group size=%llu, offset at end of this variable data=%llu\n",
1192
fd->write_size_bytes,
1193
fd->base_offset - fd->pg_start_in_file + var_size);
1195
err = MPI_File_write (md->fh, v->data, var_size, MPI_BYTE, &md->status);
1198
uint64_t total_written = 0;
1199
uint64_t to_write = var_size;
1202
char * buf_ptr = v->data;
1203
while (total_written < var_size)
1205
write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
1206
err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
1207
MPI_Get_count(&md->status, MPI_BYTE, &count);
1208
if (count != write_len)
1213
total_written += count;
1216
//err = total_written;
1219
if (err != MPI_SUCCESS)
1221
char e [MPI_MAX_ERROR_STRING];
1223
memset (e, 0, MPI_MAX_ERROR_STRING);
1224
MPI_Error_string (err, e, &len);
1225
adios_error (err_write_error,
1226
"MPI method, rank %d: adios_write() of variable %s to "
1227
"file %s failed: '%s'\n ",
1228
md->rank, v->name, fd->name, e);
1231
MPI_Get_count (&md->status, MPI_BYTE, &count);
1232
if (count != var_size)
1234
log_warn ("MPI method, rank %d: tried to write %llu bytes, "
1235
"only wrote %d of variable %s\n",
1236
md->rank, var_size, count, v->name);
1238
fd->base_offset += count;
1240
fd->bytes_written = 0;
1241
adios_shared_buffer_free (&md->b);
1244
if (timing.write_count == timing.write_size)
1246
timing.write_size += 10;
1247
timing.t24 = realloc (timing.t24, sizeof (struct timeval)
1250
assert (timing.t24 != 0);
1252
gettimeofday (&(timing.t24 [timing.write_count++]), NULL);
1256
void adios_mpi_get_write_buffer (struct adios_file_struct * fd
1257
,struct adios_var_struct * v
1260
,struct adios_method_struct * method
1263
uint64_t mem_allowed;
1264
struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
1265
method->method_data;
1274
if (v->data && v->free_data)
1276
adios_method_buffer_free (v->data_size);
1280
mem_allowed = adios_method_buffer_alloc (*size);
1281
if (mem_allowed == *size)
1283
*buffer = malloc (*size);
1286
adios_method_buffer_free (mem_allowed);
1287
adios_error (err_no_memory,
1288
"MPI method, rank %d: cannot allocate %llu bytes for variable %s\n",
1289
md->rank, *size, v->name);
1290
v->got_buffer = adios_flag_no;
1291
v->free_data = adios_flag_no;
1299
v->got_buffer = adios_flag_yes;
1300
v->free_data = adios_flag_yes;
1301
v->data_size = mem_allowed;
1307
adios_method_buffer_free (mem_allowed);
1309
adios_error (err_buffer_overflow,
1310
"MPI method, rank %d: OVERFLOW: Cannot get requested ADIOS buffer of %llu "
1311
"bytes for variable %s. Remaining buffer size was %llu\n",
1312
md->rank, *size, v->name, mem_allowed);
1318
void adios_mpi_read (struct adios_file_struct * fd
1319
,struct adios_var_struct * v, void * buffer
1320
,uint64_t buffer_size
1321
,struct adios_method_struct * method
1325
v->data_size = buffer_size;
1328
static void adios_mpi_do_read (struct adios_file_struct * fd
1329
,struct adios_method_struct * method
1332
struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
1333
method->method_data;
1334
struct adios_var_struct * v = fd->group->vars;
1336
struct adios_parse_buffer_struct data;
1340
data.buffer_len = 0;
1342
switch (md->b.version & ADIOS_VERSION_NUM_MASK)
1346
// the three section headers
1347
struct adios_process_group_header_struct_v1 pg_header;
1348
struct adios_vars_header_struct_v1 vars_header;
1349
struct adios_attributes_header_struct_v1 attrs_header;
1351
struct adios_var_header_struct_v1 var_header;
1352
struct adios_var_payload_struct_v1 var_payload;
1353
struct adios_attribute_struct_v1 attribute;
1357
adios_init_buffer_read_process_group (&md->b);
1358
MPI_File_seek (md->fh, md->b.read_pg_offset
1361
MPI_File_read (md->fh, md->b.buff, md->b.read_pg_size, MPI_BYTE
1364
adios_parse_process_group_header_v1 (&md->b, &pg_header);
1366
adios_parse_vars_header_v1 (&md->b, &vars_header);
1368
for (i = 0; i < vars_header.count; i++)
1370
memset (&var_payload, 0
1371
,sizeof (struct adios_var_payload_struct_v1)
1373
adios_parse_var_data_header_v1 (&md->b, &var_header);
1375
struct adios_var_struct * v1 = v;
1378
if ( strcasecmp (var_header.name, v1->name)
1379
|| strcasecmp (var_header.path, v1->path)
1392
var_payload.payload = v1->data;
1393
adios_parse_var_data_payload_v1 (&md->b, &var_header
1400
log_warn ("MPI method, rank %d: read: skipping name: %s path: %s\n",
1401
md->rank, var_header.name, var_header.path);
1402
adios_parse_var_data_payload_v1 (&md->b, &var_header
1407
adios_clear_var_header_v1 (&var_header);
1411
adios_parse_attributes_header_v1 (&md->b, &attrs_header);
1413
for (i = 0; i < attrs_header.count; i++)
1415
adios_parse_attribute_v1 (&md->b, &attribute);
1416
adios_clear_attribute_v1 (&attribute);
1419
adios_clear_process_group_header_v1 (&pg_header);
1425
adios_error (err_invalid_file_version,
1426
"MPI method read: ADIOS file version unknown: %u\n",
1431
adios_buffer_struct_clear (&md->b);
1434
void adios_mpi_close (struct adios_file_struct * fd
1435
,struct adios_method_struct * method
1438
struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
1439
method->method_data;
1440
struct adios_attribute_struct * a = fd->group->attributes;
1442
struct adios_index_process_group_struct_v1 * new_pg_root = 0;
1443
struct adios_index_var_struct_v1 * new_vars_root = 0;
1444
struct adios_index_attribute_struct_v1 * new_attrs_root = 0;
1446
gettimeofday (&timing.t23, NULL);
1447
timing.t19.tv_sec = timing.t23.tv_sec;
1448
timing.t19.tv_usec = timing.t23.tv_usec;
1449
static int iteration = 0;
1454
case adios_mode_read:
1456
// read the index to find the place to start reading
1457
adios_mpi_do_read (fd, method);
1458
struct adios_var_struct * v = fd->group->vars;
1468
case adios_mode_write:
1471
uint64_t buffer_size = 0;
1472
uint64_t buffer_offset = 0;
1473
uint64_t index_start = md->b.pg_index_offset;
1476
if (fd->shared_buffer == adios_flag_no)
1479
// set it up so that it will start at 0, but have correct sizes
1480
MPI_File_get_position (md->fh, &new_off);
1481
fd->offset = fd->base_offset - md->vars_start;
1483
fd->buffer_size = 0;
1484
adios_write_close_vars_v1 (fd);
1485
// fd->vars_start gets updated with the size written
1486
MPI_File_seek (md->fh, md->vars_start, MPI_SEEK_SET);
1488
err = MPI_File_write (md->fh, fd->buffer, md->vars_header_size
1489
,MPI_BYTE, &md->status
1493
uint64_t total_written = 0;
1494
uint64_t to_write = md->vars_header_size;
1497
char * buf_ptr = fd->buffer;
1498
while (total_written < md->vars_header_size)
1500
write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
1501
err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
1502
MPI_Get_count(&md->status, MPI_BYTE, &count);
1503
if (count != write_len)
1508
total_written += count;
1511
//err = total_written;
1514
if (err != MPI_SUCCESS)
1516
char e [MPI_MAX_ERROR_STRING];
1518
memset (e, 0, MPI_MAX_ERROR_STRING);
1519
MPI_Error_string (err, e, &len);
1520
adios_error (err_write_error,
1521
"MPI method, rank %d: adios_close(): writing of variable header "
1522
"of %llu bytes to file %s failed: '%s'\n",
1523
md->rank, md->vars_header_size, fd->name, e);
1527
MPI_Get_count (&md->status, MPI_BYTE, &count);
1528
if (count != md->vars_header_size)
1530
log_warn ("MPI method, rank %d: adios_close() tried to write %llu bytes "
1531
"of variable header but only wrote %d\n",
1532
md->rank, md->vars_header_size, count);
1535
fd->bytes_written = 0;
1536
adios_shared_buffer_free (&md->b);
1538
adios_write_open_attributes_v1 (fd);
1539
md->vars_start = new_off;
1540
md->vars_header_size = fd->offset;
1541
MPI_File_seek (md->fh, new_off + md->vars_header_size
1543
); // go back to end, but after attr header
1544
fd->base_offset += fd->offset; // add size of header
1546
fd->bytes_written = 0;
1548
if (!fd->group->process_id) { // from ADIOS 1.4, only rank 0 writes attributes
1551
adios_write_attribute_v1 (fd, a);
1552
if (fd->base_offset + fd->bytes_written > fd->pg_start_in_file + fd->write_size_bytes)
1553
adios_error (err_out_of_bound,
1554
"MPI method, rank %d: writing of the attributes exceeds pg bound.\n"
1555
"File is corrupted. Need to enlarge group size in adios_group_size().\n"
1556
"Group size=%llu, offset at end of this variable data=%llu\n",
1558
fd->write_size_bytes,
1559
fd->base_offset - fd->pg_start_in_file + fd->bytes_written);
1561
err = MPI_File_write (md->fh, fd->buffer, fd->bytes_written
1562
,MPI_BYTE, &md->status
1566
uint64_t total_written = 0;
1567
uint64_t to_write = fd->bytes_written;
1570
char * buf_ptr = fd->buffer;
1571
while (total_written < fd->bytes_written)
1573
write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
1574
err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
1575
MPI_Get_count(&md->status, MPI_BYTE, &count);
1576
if (count != write_len)
1581
total_written += count;
1584
//err = total_written;
1587
if (err != MPI_SUCCESS)
1589
char e [MPI_MAX_ERROR_STRING];
1591
memset (e, 0, MPI_MAX_ERROR_STRING);
1592
MPI_Error_string (err, e, &len);
1593
adios_error (err_write_error,
1594
"MPI method, rank %d: adios_close(): writing of attributes "
1595
"of %llu bytes to file %s failed: '%s'\n",
1596
md->rank, fd->bytes_written, fd->name, e);
1599
MPI_Get_count (&md->status, MPI_BYTE, &count);
1600
if (count != fd->bytes_written)
1602
log_warn ("MPI method, rank %d: adios_close() tried to write "
1603
"%llu bytes of attributes but only wrote %d\n",
1604
md->rank, fd->bytes_written, count);
1606
fd->base_offset += count;
1608
fd->bytes_written = 0;
1609
adios_shared_buffer_free (&md->b);
1615
// set it up so that it will start at 0, but have correct sizes
1616
fd->offset = fd->base_offset - md->vars_start;
1618
fd->buffer_size = 0;
1619
adios_write_close_attributes_v1 (fd);
1620
MPI_File_seek (md->fh, md->vars_start, MPI_SEEK_SET);
1621
// fd->vars_start gets updated with the size written
1623
err = MPI_File_write (md->fh, fd->buffer, md->vars_header_size
1624
,MPI_BYTE, &md->status
1628
uint64_t total_written = 0;
1629
uint64_t to_write = md->vars_header_size;
1632
char * buf_ptr = fd->buffer;
1633
while (total_written < md->vars_header_size)
1635
write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
1636
err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
1637
MPI_Get_count(&md->status, MPI_BYTE, &count);
1638
if (count != write_len)
1643
total_written += count;
1646
//err = total_written;
1649
if (err != MPI_SUCCESS)
1651
char e [MPI_MAX_ERROR_STRING];
1653
memset (e, 0, MPI_MAX_ERROR_STRING);
1654
MPI_Error_string (err, e, &len);
1655
adios_error (err_write_error,
1656
"MPI method, rank %d: adios_close(): writing of variable header "
1657
"of %llu bytes to file %s failed: '%s'\n",
1658
md->rank, md->vars_header_size, fd->name, e);
1661
MPI_Get_count (&md->status, MPI_BYTE, &count);
1662
if (count != md->vars_header_size)
1664
log_warn ("MPI method, rank %d: adios_close() tried to write %llu bytes "
1665
"of variable header but only wrote %d\n",
1666
md->rank, md->vars_header_size, count);
1669
fd->bytes_written = 0;
1673
gettimeofday (&timing.t19, NULL);
1674
gettimeofday (&timing.t12, NULL);
1676
// build index appending to any existing index
1677
adios_build_index_v1 (fd, &md->old_pg_root, &md->old_vars_root
1678
,&md->old_attrs_root
1680
// if collective, gather the indexes from the rest and call
1681
if (md->group_comm != MPI_COMM_NULL)
1685
int * index_sizes = malloc (4 * md->size);
1686
int * index_offsets = malloc (4 * md->size);
1687
char * recv_buffer = 0;
1689
uint32_t total_size = 0;
1692
MPI_Gather (&size, 1, MPI_INT
1693
,index_sizes, 1, MPI_INT
1697
for (i = 0; i < md->size; i++)
1699
index_offsets [i] = total_size;
1700
total_size += index_sizes [i];
1703
recv_buffer = malloc (total_size);
1705
MPI_Gatherv (&size, 0, MPI_BYTE
1706
,recv_buffer, index_sizes, index_offsets
1707
,MPI_BYTE, 0, md->group_comm
1710
char * buffer_save = md->b.buff;
1711
uint64_t buffer_size_save = md->b.length;
1712
uint64_t offset_save = md->b.offset;
1714
for (i = 1; i < md->size; i++)
1716
md->b.buff = recv_buffer + index_offsets [i];
1717
md->b.length = index_sizes [i];
1720
adios_parse_process_group_index_v1 (&md->b
1723
adios_parse_vars_index_v1 (&md->b, &new_vars_root);
1724
// do not merge attributes from other processes from 1.4
1726
adios_parse_attributes_index_v1 (&md->b
1730
adios_merge_index_v1 (&md->old_pg_root
1732
,&md->old_attrs_root
1733
,new_pg_root, new_vars_root
1740
md->b.buff = buffer_save;
1741
md->b.length = buffer_size_save;
1742
md->b.offset = offset_save;
1746
free (index_offsets);
1750
adios_write_index_v1 (&buffer, &buffer_size, &buffer_offset
1755
uint32_t tmp_buffer_size = (uint32_t) buffer_size;
1756
MPI_Gather (&tmp_buffer_size, 1, MPI_INT, 0, 0, MPI_INT
1759
MPI_Gatherv (buffer, buffer_size, MPI_BYTE
1767
gettimeofday (&timing.t13, NULL);
1769
if (fd->shared_buffer == adios_flag_yes)
1771
// if we need to write > 2 GB, need to do it in parts
1772
// since count is limited to INT32_MAX (signed 32-bit max).
1773
uint64_t bytes_written = 0;
1774
int32_t to_write = 0;
1775
if (fd->bytes_written > INT32_MAX)
1777
to_write = INT32_MAX;
1781
to_write = (int32_t) fd->bytes_written;
1784
if (fd->base_offset + fd->bytes_written >
1785
fd->pg_start_in_file + fd->write_size_bytes)
1787
adios_error (err_out_of_bound,
1788
"MPI method, rank %d: size of buffered data exceeds pg bound.\n"
1789
"File is corrupted. Need to enlarge group size in adios_group_size().\n"
1790
"Group size=%llu, offset at end of variable buffer=%llu\n",
1792
fd->write_size_bytes,
1793
fd->base_offset - fd->pg_start_in_file + fd->bytes_written);
1796
while (bytes_written < fd->bytes_written)
1798
// everyone writes their data
1800
MPI_File_seek (md->fh, fd->base_offset + bytes_written
1803
err = MPI_File_write (md->fh, fd->buffer + bytes_written
1804
,to_write, MPI_BYTE, &md->status
1806
if (err != MPI_SUCCESS)
1808
char e [MPI_MAX_ERROR_STRING];
1810
memset (e, 0, MPI_MAX_ERROR_STRING);
1811
MPI_Error_string (err, e, &len);
1812
adios_error (err_write_error,
1813
"MPI method, rank %d: adios_close(): writing of buffered data "
1814
"[%llu..%llu] to file %s failed: '%s'\n",
1815
md->rank, bytes_written, bytes_written+to_write-1,
1818
bytes_written += to_write;
1819
if (fd->bytes_written > bytes_written)
1821
if (fd->bytes_written - bytes_written > INT32_MAX)
1823
to_write = INT32_MAX;
1827
to_write = fd->bytes_written - bytes_written;
1835
adios_write_index_v1 (&buffer, &buffer_size, &buffer_offset
1836
,index_start, md->old_pg_root
1840
adios_write_version_v1 (&buffer, &buffer_size, &buffer_offset);
1842
MPI_File_seek (md->fh, md->b.pg_index_offset, MPI_SEEK_SET);
1844
err = MPI_File_write (md->fh, buffer, buffer_offset, MPI_BYTE
1849
uint64_t total_written = 0;
1850
uint64_t to_write = buffer_offset;
1853
char * buf_ptr = buffer;
1854
while (total_written < buffer_offset)
1856
write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
1858
struct timeval a, b;
1859
gettimeofday (&a, NULL);
1861
err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
1863
gettimeofday (&b, NULL);
1864
timeval_subtract (&timing.t8, &b, &a);
1866
MPI_Get_count(&md->status, MPI_BYTE, &count);
1867
if (count != write_len)
1869
log_error ("MPI method, rank %d: Need to do multi-write 1 (tried: "
1870
"%d wrote: %d) errno %d\n",
1871
md->rank, write_len, count, errno);
1875
total_written += count;
1878
//err = total_written;
1881
if (err != MPI_SUCCESS)
1883
char e [MPI_MAX_ERROR_STRING];
1885
memset (e, 0, MPI_MAX_ERROR_STRING);
1886
MPI_Error_string (err, e, &len);
1887
adios_error (err_write_error,
1888
"MPI method, rank %d: adios_close(): writing of index data "
1889
"of %llu bytes to file %s failed: '%s'\n",
1890
md->rank, buffer_offset, fd->name, e);
1894
gettimeofday (&timing.t20, NULL);
1895
gettimeofday (&timing.t14, NULL);
1906
adios_clear_index_v1 (new_pg_root, new_vars_root, new_attrs_root);
1907
adios_clear_index_v1 (md->old_pg_root, md->old_vars_root
1913
md->old_pg_root = 0;
1914
md->old_vars_root = 0;
1915
md->old_attrs_root = 0;
1917
gettimeofday (&timing.t11, NULL);
1923
case adios_mode_append:
1924
case adios_mode_update:
1927
uint64_t buffer_size = 0;
1928
uint64_t buffer_offset = 0;
1929
uint64_t index_start = md->b.pg_index_offset;
1932
if (fd->shared_buffer == adios_flag_no)
1935
// set it up so that it will start at 0, but have correct sizes
1936
MPI_File_get_position (md->fh, &new_off);
1937
fd->offset = fd->base_offset - md->vars_start;
1939
fd->buffer_size = 0;
1940
adios_write_close_vars_v1 (fd);
1941
// fd->vars_start gets updated with the size written
1942
MPI_File_seek (md->fh, md->vars_start, MPI_SEEK_SET);
1944
err = MPI_File_write (md->fh, fd->buffer, md->vars_header_size
1945
,MPI_BYTE, &md->status
1949
uint64_t total_written = 0;
1950
uint64_t to_write = md->vars_header_size;
1953
char * buf_ptr = fd->buffer;
1954
while (total_written < md->vars_header_size)
1956
write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
1957
err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
1958
MPI_Get_count(&md->status, MPI_BYTE, &count);
1959
if (count != write_len)
1964
total_written += count;
1967
//err = total_written;
1970
if (err != MPI_SUCCESS)
1972
char e [MPI_MAX_ERROR_STRING];
1974
memset (e, 0, MPI_MAX_ERROR_STRING);
1975
MPI_Error_string (err, e, &len);
1976
adios_error (err_write_error,
1977
"MPI method, rank %d: adios_close(): writing of variable header "
1978
"of %llu bytes to file %s failed: '%s'\n",
1979
md->rank, md->vars_header_size, fd->name, e);
1983
MPI_Get_count (&md->status, MPI_BYTE, &count);
1984
if (count != md->vars_header_size)
1986
log_warn ("MPI method, rank %d: adios_close() tried to write %llu bytes "
1987
"of variable header but only wrote %d\n",
1988
md->rank, md->vars_header_size, count);
1991
fd->bytes_written = 0;
1992
adios_shared_buffer_free (&md->b);
1994
adios_write_open_attributes_v1 (fd);
1995
md->vars_start = new_off;
1996
md->vars_header_size = fd->offset;
1997
MPI_File_seek (md->fh, new_off + md->vars_header_size
1999
); // go back to end, but after attr header
2000
fd->base_offset += fd->offset; // add size of header
2002
fd->bytes_written = 0;
2004
if (!fd->group->process_id) { // from ADIOS 1.4, only rank 0 writes attributes
2007
adios_write_attribute_v1 (fd, a);
2008
if (fd->base_offset + fd->bytes_written > fd->pg_start_in_file + fd->write_size_bytes)
2009
adios_error (err_out_of_bound,
2010
"MPI method, rank %d: writing of the attributes exceeds pg bound.\n"
2011
"File is corrupted. Need to enlarge group size in adios_group_size().\n"
2012
"Group size=%llu, offset at end of this variable data=%llu\n",
2014
fd->write_size_bytes,
2015
fd->base_offset - fd->pg_start_in_file + fd->bytes_written);
2017
err = MPI_File_write (md->fh, fd->buffer, fd->bytes_written
2018
,MPI_BYTE, &md->status
2022
uint64_t total_written = 0;
2023
uint64_t to_write = fd->bytes_written;
2026
char * buf_ptr = fd->buffer;
2027
while (total_written < fd->bytes_written)
2029
write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
2030
err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
2031
MPI_Get_count(&md->status, MPI_BYTE, &count);
2032
if (count != write_len)
2037
total_written += count;
2040
//err = total_written;
2043
if (err != MPI_SUCCESS)
2045
char e [MPI_MAX_ERROR_STRING];
2047
memset (e, 0, MPI_MAX_ERROR_STRING);
2048
MPI_Error_string (err, e, &len);
2049
adios_error (err_write_error,
2050
"MPI method, rank %d: adios_close(): writing of attributes "
2051
"of %llu bytes to file %s failed: '%s'\n",
2052
md->rank, fd->bytes_written, fd->name, e);
2055
MPI_Get_count (&md->status, MPI_BYTE, &count);
2056
if (count != fd->bytes_written)
2058
log_warn ("MPI method, rank %d: adios_close() tried to write "
2059
"%llu bytes of attributes but only wrote %d\n",
2060
md->rank, fd->bytes_written, count);
2062
fd->base_offset += count;
2064
fd->bytes_written = 0;
2065
adios_shared_buffer_free (&md->b);
2071
// set it up so that it will start at 0, but have correct sizes
2072
fd->offset = fd->base_offset - md->vars_start;
2074
fd->buffer_size = 0;
2075
adios_write_close_attributes_v1 (fd);
2076
MPI_File_seek (md->fh, md->vars_start, MPI_SEEK_SET);
2077
// fd->vars_start gets updated with the size written
2079
err = MPI_File_write (md->fh, fd->buffer, md->vars_header_size
2080
,MPI_BYTE, &md->status
2084
uint64_t total_written = 0;
2085
uint64_t to_write = md->vars_header_size;
2088
char * buf_ptr = fd->buffer;
2089
while (total_written < md->vars_header_size)
2091
write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
2092
err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
2093
MPI_Get_count(&md->status, MPI_BYTE, &count);
2094
if (count != write_len)
2099
total_written += count;
2102
//err = total_written;
2105
if (err != MPI_SUCCESS)
2107
char e [MPI_MAX_ERROR_STRING];
2109
memset (e, 0, MPI_MAX_ERROR_STRING);
2110
MPI_Error_string (err, e, &len);
2111
adios_error (err_write_error,
2112
"MPI method, rank %d: adios_close(): writing of variable header "
2113
"of %llu bytes to file %s failed: '%s'\n",
2114
md->rank, md->vars_header_size, fd->name, e);
2117
MPI_Get_count (&md->status, MPI_BYTE, &count);
2118
if (count != md->vars_header_size)
2120
log_warn ("MPI method, rank %d: adios_close() tried to write %llu bytes "
2121
"of variable header but only wrote %d\n",
2122
md->rank, md->vars_header_size, count);
2125
fd->bytes_written = 0;
2128
// build index appending to any existing index
2129
adios_build_index_v1 (fd, &md->old_pg_root, &md->old_vars_root
2130
,&md->old_attrs_root
2132
// if collective, gather the indexes from the rest and call
2133
if (md->group_comm != MPI_COMM_NULL)
2137
int * index_sizes = malloc (4 * md->size);
2138
int * index_offsets = malloc (4 * md->size);
2139
char * recv_buffer = 0;
2141
uint32_t total_size = 0;
2144
MPI_Gather (&size, 1, MPI_INT
2145
,index_sizes, 1, MPI_INT
2149
for (i = 0; i < md->size; i++)
2151
index_offsets [i] = total_size;
2152
total_size += index_sizes [i];
2155
recv_buffer = malloc (total_size);
2157
MPI_Gatherv (&size, 0, MPI_BYTE
2158
,recv_buffer, index_sizes, index_offsets
2159
,MPI_BYTE, 0, md->group_comm
2162
char * buffer_save = md->b.buff;
2163
uint64_t buffer_size_save = md->b.length;
2164
uint64_t offset_save = md->b.offset;
2166
for (i = 1; i < md->size; i++)
2168
md->b.buff = recv_buffer + index_offsets [i];
2169
md->b.length = index_sizes [i];
2172
adios_parse_process_group_index_v1 (&md->b
2175
adios_parse_vars_index_v1 (&md->b, &new_vars_root);
2176
// do not merge attributes from other processes from 1.4
2178
adios_parse_attributes_index_v1 (&md->b
2182
adios_merge_index_v1 (&md->old_pg_root
2184
,&md->old_attrs_root
2185
,new_pg_root, new_vars_root
2192
md->b.buff = buffer_save;
2193
md->b.length = buffer_size_save;
2194
md->b.offset = offset_save;
2198
free (index_offsets);
2202
adios_write_index_v1 (&buffer, &buffer_size, &buffer_offset
2208
int _buffer_size = buffer_size;
2210
MPI_Gather (&_buffer_size, 1, MPI_INT, 0, 0, MPI_INT
2213
MPI_Gatherv (buffer, buffer_size, MPI_BYTE
2220
if (fd->shared_buffer == adios_flag_yes)
2222
if (fd->base_offset + fd->bytes_written >
2223
fd->pg_start_in_file + fd->write_size_bytes)
2225
adios_error (err_out_of_bound,
2226
"MPI method, rank %d: size of buffered data exceeds pg bound.\n"
2227
"File is corrupted. Need to enlarge group size in adios_group_size().\n"
2228
"Group size=%llu, offset at end of variable buffer=%llu\n",
2230
fd->write_size_bytes,
2231
fd->base_offset - fd->pg_start_in_file + fd->bytes_written);
2233
// everyone writes their data
2234
MPI_File_seek (md->fh, fd->base_offset, MPI_SEEK_SET);
2236
err = MPI_File_write (md->fh, fd->buffer, fd->bytes_written
2237
,MPI_BYTE, &md->status
2241
uint64_t total_written = 0;
2242
uint64_t to_write = fd->bytes_written;
2245
char * buf_ptr = fd->buffer;
2246
while (total_written < fd->bytes_written)
2248
write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
2249
err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
2250
MPI_Get_count(&md->status, MPI_BYTE, &count);
2251
if (count != write_len)
2256
total_written += count;
2259
//err = total_written;
2262
if (err != MPI_SUCCESS)
2264
char e [MPI_MAX_ERROR_STRING];
2266
memset (e, 0, MPI_MAX_ERROR_STRING);
2267
MPI_Error_string (err, e, &len);
2268
adios_error (err_write_error,
2269
"MPI method, rank %d: adios_close(): writing of buffered data "
2270
"of %llu bytes to file %s failed: '%s'\n",
2271
md->rank, fd->bytes_written, fd->name, e);
2277
adios_write_index_v1 (&buffer, &buffer_size, &buffer_offset
2278
,index_start, md->old_pg_root
2282
adios_write_version_v1 (&buffer, &buffer_size, &buffer_offset);
2284
MPI_File_seek (md->fh, md->b.pg_index_offset, MPI_SEEK_SET);
2286
err = MPI_File_write (md->fh, buffer, buffer_offset, MPI_BYTE
2291
uint64_t total_written = 0;
2292
uint64_t to_write = buffer_offset;
2295
char * buf_ptr = buffer;
2296
while (total_written < buffer_offset)
2298
write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
2299
err = MPI_File_write (md->fh, buf_ptr, write_len, MPI_BYTE, &md->status);
2300
MPI_Get_count(&md->status, MPI_BYTE, &count);
2301
if (count != write_len)
2303
log_error ("MPI method, rank %d: Need to do multi-write 2 (tried: "
2304
"%d wrote: %d) errno %d\n",
2305
md->rank, write_len, count, errno);
2309
total_written += count;
2312
//err = total_written;
2315
if (err != MPI_SUCCESS)
2317
char e [MPI_MAX_ERROR_STRING];
2319
memset (e, 0, MPI_MAX_ERROR_STRING);
2320
MPI_Error_string (err, e, &len);
2321
adios_error (err_write_error,
2322
"MPI method, rank %d: adios_close(): writing of index data "
2323
"of %llu bytes to file %s failed: '%s'\n",
2324
md->rank, buffer_offset, fd->name, e);
2330
adios_clear_index_v1 (new_pg_root, new_vars_root, new_attrs_root);
2331
adios_clear_index_v1 (md->old_pg_root, md->old_vars_root
2337
md->old_pg_root = 0;
2338
md->old_vars_root = 0;
2339
md->old_attrs_root = 0;
2346
adios_error (err_invalid_file_mode,
2347
"MPI method: Unknown file mode: %d in adios_close()\n",
2355
MPI_File_sync (md->fh);
2357
MPI_File_close (&md->fh);
2361
gettimeofday (&timing.t28, NULL);
2362
print_metrics (md, iteration++);
2364
if ( md->group_comm != MPI_COMM_WORLD
2365
&& md->group_comm != MPI_COMM_SELF
2366
&& md->group_comm != MPI_COMM_NULL
2369
md->group_comm = MPI_COMM_NULL;
2374
memset (&md->status, 0, sizeof (MPI_Status));
2375
md->group_comm = MPI_COMM_NULL;
2377
adios_clear_index_v1 (md->old_pg_root, md->old_vars_root
2380
md->old_pg_root = 0;
2381
md->old_vars_root = 0;
2382
md->old_attrs_root = 0;
2385
void adios_mpi_finalize (int mype, struct adios_method_struct * method)
2387
struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
2388
method->method_data;
2389
if (adios_mpi_initialized)
2391
adios_mpi_initialized = 0;
2392
MPI_Info_free (&md->info);
2396
void adios_mpi_end_iteration (struct adios_method_struct * method)
2400
void adios_mpi_start_calculation (struct adios_method_struct * method)
2404
void adios_mpi_stop_calculation (struct adios_method_struct * method)