2
* ADIOS is freely available under the terms of the BSD license described
3
* in the COPYING file in the top level directory of this source distribution.
5
* Copyright (c) 2008 - 2009. UT-BATTELLE, LLC. All rights reserved.
21
#include "adios_transport_hooks.h"
22
#include "adios_bp_v1.h"
23
#include "adios_internals.h"
26
static int adios_mpi_lustre_initialized = 0;
28
#define COLLECT_METRICS 0
30
struct adios_MPI_data_struct
39
void * comm; // temporary until moved from should_buffer to open
41
struct adios_bp_buffer_struct_v1 b;
43
struct adios_index_process_group_struct_v1 * old_pg_root;
44
struct adios_index_var_struct_v1 * old_vars_root;
45
struct adios_index_attribute_struct_v1 * old_attrs_root;
48
uint64_t vars_header_size;
50
uint64_t striping_unit; // file system stripe size
55
// see adios_adaptive_finalize for what each represents
56
struct timeval t0, t1, t2, t3, t4, t5, t6, t7, t8, t9, t10, t11, t12, t13, t14, t15, t16, t17, t18, t19, t20, t21, t22, t23, t24, t25;
58
// Subtract the `struct timeval' values X and Y,
59
// storing the result in RESULT.
60
// Return 1 if the difference is negative, otherwise 0.
61
static int timeval_subtract (struct timeval * result
62
,struct timeval * x, struct timeval * y
65
// Perform the carry for the later subtraction by updating y.
66
if (x->tv_usec < y->tv_usec)
68
int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;
69
y->tv_usec -= 1000000 * nsec;
72
if (x->tv_usec - y->tv_usec > 1000000)
74
int nsec = (x->tv_usec - y->tv_usec) / 1000000;
75
y->tv_usec += 1000000 * nsec;
79
// Compute the time remaining to wait.
80
// tv_usec is certainly positive.
81
result->tv_sec = x->tv_sec - y->tv_sec;
82
result->tv_usec = x->tv_usec - y->tv_usec;
84
// Return 1 if result is negative.
85
return x->tv_sec < y->tv_sec;
89
void print_metrics (struct adios_MPI_data_struct * md, int iteration)
94
timeval_subtract (&diff, &t2, &t1);
95
printf ("cc\t%2d\tFile create (stripe setup):\t%02d.%06d\n"
96
,iteration, diff.tv_sec, diff.tv_usec);
98
timeval_subtract (&diff, &t6, &t5);
99
printf ("dd\t%2d\tMass file open:\t%02d.%06d\n"
100
,iteration, diff.tv_sec, diff.tv_usec);
102
timeval_subtract (&diff, &t17, &t16);
103
printf ("ee\t%2d\tBuild file offsets:\t%02d.%06d\n"
104
,iteration, diff.tv_sec, diff.tv_usec);
106
if (md->rank == md->size - 1)
108
timeval_subtract (&diff, &t10, &t9);
109
printf ("ff\t%2d\tGlobal index creation:\t%02d.%06d\n"
110
,iteration, diff.tv_sec, diff.tv_usec);
112
timeval_subtract (&diff, &t8, &t7);
113
printf ("gg\t%2d\tAll writes complete (w/ local index):\t%02d.%06d\n"
114
,iteration, diff.tv_sec, diff.tv_usec);
116
timeval_subtract (&diff, &t11, &t0);
117
printf ("hh\t%2d\tTotal time:\t%02d.%06d\n"
118
,iteration, diff.tv_sec, diff.tv_usec);
121
timeval_subtract (&diff, &t13, &t12);
122
printf ("ii\t%2d\tLocal index creation:\t%6d\t%02d.%06d\n"
123
,iteration, md->rank, diff.tv_sec, diff.tv_usec);
125
timeval_subtract (&diff, &t22, &t21);
126
printf ("kk\t%2d\tshould buffer time:\t%6d\t%02d.%06d\n"
127
,iteration, md->rank, diff.tv_sec, diff.tv_usec);
129
timeval_subtract (&diff, &t19, &t23);
130
printf ("ll\t%2d\tclose startup time:\t%6d\t%02d.%06d\n"
131
,iteration, md->rank, diff.tv_sec, diff.tv_usec);
133
timeval_subtract (&diff, &t19, &t0);
134
printf ("mm\t%2d\tsetup time:\t%6d\t%02d.%06d\n"
135
,iteration, md->rank, diff.tv_sec, diff.tv_usec);
137
timeval_subtract (&diff, &t14, &t20);
138
printf ("nn\t%2d\tcleanup time:\t%6d\t%02d.%06d\n"
139
,iteration, md->rank, diff.tv_sec, diff.tv_usec);
141
timeval_subtract (&diff, &t21, &t0);
142
printf ("oo\t%2d\topen->should_buffer time:\t%6d\t%02d.%06d\n"
143
,iteration, md->rank, diff.tv_sec, diff.tv_usec);
145
timeval_subtract (&diff, &t24, &t21);
146
printf ("pp\t%2d\tshould_buffer->write1 time:\t%6d\t%02d.%06d\n"
147
,iteration, md->rank, diff.tv_sec, diff.tv_usec);
149
timeval_subtract (&diff, &t25, &t24);
150
printf ("qq1\t%2d\twrite1->write2 time:\t%6d\t%02d.%06d\n"
151
,iteration, md->rank, diff.tv_sec, diff.tv_usec);
153
timeval_subtract (&diff, &t23, &t25);
154
printf ("qq2\t%2d\twrite2->close start time:\t%6d\t%02d.%06d\n"
155
,iteration, md->rank, diff.tv_sec, diff.tv_usec);
159
#if defined(__APPLE__)
160
# include <sys/param.h>
161
# include <sys/mount.h>
163
# include <sys/statfs.h>
166
// this should be determined at configure time
167
//#define ADIOS_LUSTRE
169
//#ifdef ADIOS_LUSTRE
170
#include <sys/ioctl.h>
171
//#include <lustre/lustre_user.h>
173
// from /usr/include/lustre/lustre_user.h
174
#define LUSTRE_SUPER_MAGIC 0x0BD00BD0
175
# define LOV_USER_MAGIC 0x0BD10BD0
176
# define LL_IOC_LOV_SETSTRIPE _IOW ('f', 154, long)
177
# define LL_IOC_LOV_GETSTRIPE _IOW ('f', 155, long)
178
#define O_LOV_DELAY_CREATE 0100000000
180
struct lov_user_ost_data { // per-stripe data structure
181
uint64_t l_object_id; // OST object ID
182
uint64_t l_object_gr; // OST object group (creating MDS number)
183
uint32_t l_ost_gen; // generation of this OST index
184
uint32_t l_ost_idx; // OST index in LOV
185
} __attribute__((packed));
186
struct lov_user_md { // LOV EA user data (host-endian)
187
uint32_t lmm_magic; // magic number = LOV_USER_MAGIC_V1
188
uint32_t lmm_pattern; // LOV_PATTERN_RAID0, LOV_PATTERN_RAID1
189
uint64_t lmm_object_id; // LOV object ID
190
uint64_t lmm_object_gr; // LOV object group
191
uint32_t lmm_stripe_size; // size of stripe in bytes
192
uint16_t lmm_stripe_count; // num stripes in use for this object
193
uint16_t lmm_stripe_offset; // starting stripe offset in lmm_objects
194
struct lov_user_ost_data lmm_objects[0]; // per-stripe data
195
} __attribute__((packed));
200
static void trim_spaces (char * str)
202
char * t = str, * p = NULL;
217
adios_mpi_lustre_set_striping_unit(char *filename, char *parameters, struct adios_MPI_data_struct * md)
219
MPI_File fh = md->fh;
220
int nproc = md->size;
223
// uint64_t striping_unit = 0;
224
uint64_t block_unit = 0;
225
uint16_t striping_count = 0;
226
uint16_t stripe_offset = -1;
227
char value[64], *temp_string, *p_count,*p_size;
230
int fd, old_mask, perm, num_ost, rc;
231
struct lov_user_md lum;
232
struct obd_uuid uuids[1024], * uuidp;
234
old_mask = umask(022);
236
perm = old_mask ^ 0666;
238
fd = open(filename, O_RDONLY | O_CREAT | O_LOV_DELAY_CREATE, perm);
243
// To get the number of ost's in the system
245
rc = llapi_lov_get_uuids(fd, uuids, &num_ost);
248
fprintf (stderr, "get uuids failed: %s\n"
256
temp_string = (char *) malloc (strlen (parameters) + 1);
257
strcpy (temp_string, parameters);
258
trim_spaces (temp_string);
260
if (p_count = strstr (temp_string, "stripe_count"))
262
char * p = strchr (p_count, '=');
263
char * q = strtok (p, ",");
265
striping_count = atoi (q + 1);
267
striping_count = atoi (p + 1);
272
striping_count = (nproc > num_ost ? -1 : nproc);
278
strcpy (temp_string, parameters);
279
trim_spaces (temp_string);
281
if (p_size = strstr (temp_string, "stripe_size"))
283
char * p = strchr (p_size, '=');
284
char * q = strtok (p, ",");
286
md->striping_unit = atoi(q + 1);
288
md->striping_unit = atoi(p + 1);
292
if (md->striping_unit <= 0)
293
md->striping_unit = 1048576;
296
strcpy (temp_string, parameters);
297
trim_spaces (temp_string);
299
if (p_size = strstr (temp_string, "stripe_offset"))
301
char * p = strchr (p_size, '=');
302
char * q = strtok (p, ",");
304
stripe_offset = atoi (q + 1);
306
stripe_offset = atoi (p + 1);
310
// let Lustre manage stripe offset
314
strcpy (temp_string, parameters);
315
trim_spaces (temp_string);
317
if (p_size = strstr (temp_string, "block_size"))
319
char * p = strchr (p_size, '=');
320
char * q = strtok (p, ",");
322
block_unit = atoi(q + 1);
324
block_unit = atoi(p + 1);
328
// set block_unit to 0 to make one large write
335
struct lov_user_md lum;
336
lum.lmm_magic = LOV_USER_MAGIC;
338
lum.lmm_stripe_size = md->striping_unit;
339
lum.lmm_stripe_count = striping_count;
340
lum.lmm_stripe_offset = stripe_offset;
341
ioctl (fd, LL_IOC_LOV_SETSTRIPE
345
if (err == 0 && lum.lmm_stripe_size > 0) {
346
md->striping_unit = lum.lmm_stripe_size;
351
printf("Warning: open failed on file %s %s.\n",filename,strerror(errno));
356
adios_mpi_lustre_set_block_unit(uint64_t *block_unit, char *parameters)
358
char *temp_string, *p_count,*p_size;
360
temp_string = (char *) malloc (strlen (parameters) + 1);
361
strcpy (temp_string, parameters);
362
trim_spaces (temp_string);
364
if (p_size = strstr (temp_string, "block_size"))
366
char * p = strchr (p_size, '=');
367
char * q = strtok (p, ",");
369
*block_unit = atoi(q + 1);
371
*block_unit = atoi(p + 1);
374
if (*block_unit <= 0)
375
*block_unit = 1048576;
381
adios_mpi_lustre_get_striping_unit(MPI_File fh, char *filename)
385
uint64_t striping_unit = 1048576;
390
gettimeofday (&t1, NULL);
393
// get striping_unit from MPI hint if it has
394
MPI_File_get_info(fh, &info_used);
395
MPI_Info_get(info_used, "striping_unit", 63, value, &flag);
396
MPI_Info_free(&info_used);
398
if (flag) return atoi(value);
400
// if striping_unit is not set in MPI file info, get it from system
401
err = statfs(filename, &fsbuf);
403
printf("Warning: statfs failed %s %s.\n",filename,strerror(errno));
404
return striping_unit;
407
if (!err && fsbuf.f_type == LUSTRE_SUPER_MAGIC) {
408
int fd, old_mask, perm;
410
old_mask = umask(022);
412
perm = old_mask ^ 0666;
414
fd = open(filename, O_RDONLY, perm);
416
struct lov_user_md lum;
417
memset (&lum, 0, sizeof(struct lov_user_md));
418
lum.lmm_magic = LOV_USER_MAGIC;
419
err = ioctl(fd, LL_IOC_LOV_GETSTRIPE, (void *) &lum);
420
if (err == 0 && lum.lmm_stripe_size > 0) {
421
striping_unit = lum.lmm_stripe_size;
426
printf("Warning: open failed on file %s %s.\n",filename,strerror(errno));
430
gettimeofday (&t2, NULL);
432
// set the file striping size
433
return striping_unit;
437
adios_mpi_lustre_striping_unit_write(MPI_File fh,
447
if (len == 0) return 0;
449
if (offset == -1) // use current position
450
MPI_File_get_position(fh, &offset);
452
MPI_File_seek (fh, offset, MPI_SEEK_SET);
454
if (block_unit > 0) {
455
MPI_Offset rem_off = offset;
456
uint64_t rem_size = len;
460
while (rem_size > 0) {
461
uint64_t rem_unit = block_unit - rem_off % block_unit;
462
int write_len = (rem_unit < rem_size) ? rem_unit : rem_size;
465
#ifdef _WKL_CHECK_STRIPE_IO
466
printf("adios_mpi_lustre_striping_unit_write offset=%12lld len=%12d\n",offset,write_len);offset+=write_len;
468
MPI_File_write (fh, buf_ptr, write_len, MPI_BYTE, &status);
469
MPI_Get_count(&status, MPI_BYTE, &ret_len);
470
if (ret_len < 0) {err = ret_len; break;}
472
if (ret_len != write_len) break;
473
buf_ptr += write_len;
474
rem_off += write_len;
475
rem_size -= write_len;
479
uint64_t total_written = 0;
480
uint64_t to_write = len;
483
char * buf_ptr = buf;
484
while (total_written < len)
486
write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
487
MPI_File_write (fh, buf_ptr, write_len, MPI_BYTE, &status);
488
MPI_Get_count(&status, MPI_BYTE, &count);
489
if (count != write_len)
494
total_written += count;
503
static void adios_var_to_comm (const char * comm_name
504
,enum ADIOS_FLAG host_language_fortran
511
int t = *(int *) data;
517
fprintf (stderr, "communicator not provided and none "
518
"listed in XML. Defaulting to "
522
*comm = MPI_COMM_SELF;
526
if (host_language_fortran == adios_flag_yes)
528
*comm = MPI_Comm_f2c (t);
532
*comm = *(MPI_Comm *) data;
538
if (!strcmp (comm_name, ""))
542
fprintf (stderr, "communicator not provided and none "
543
"listed in XML. Defaulting to "
547
*comm = MPI_COMM_SELF;
551
if (host_language_fortran == adios_flag_yes)
553
*comm = MPI_Comm_f2c (t);
557
*comm = *(MPI_Comm *) data;
565
fprintf (stderr, "communicator not provided but one "
566
"listed in XML. Defaulting to "
570
*comm = MPI_COMM_WORLD;
574
if (host_language_fortran == adios_flag_yes)
576
*comm = MPI_Comm_f2c (t);
580
*comm = *(MPI_Comm *) data;
588
fprintf (stderr, "coordination-communication not provided. "
589
"Using MPI_COMM_WORLD instead\n"
592
*comm = MPI_COMM_WORLD;
596
void adios_mpi_lustre_init (const char * parameters
597
,struct adios_method_struct * method
600
struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
602
if (!adios_mpi_lustre_initialized)
604
adios_mpi_lustre_initialized = 1;
606
method->method_data = malloc (sizeof (struct adios_MPI_data_struct));
607
md = (struct adios_MPI_data_struct *) method->method_data;
610
memset (&md->status, 0, sizeof (MPI_Status));
613
md->group_comm = MPI_COMM_NULL;
615
md->old_vars_root = 0;
616
md->old_attrs_root = 0;
618
md->vars_header_size = 0;
619
md->striping_unit = 0;
622
adios_buffer_struct_init (&md->b);
625
int adios_mpi_lustre_open (struct adios_file_struct * fd
626
,struct adios_method_struct * method, void * comm
629
struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
633
gettimeofday (&t0, NULL); // only used on rank == size - 1, but we don't
634
// have the comm yet to get the rank/size
636
// we have to wait for the group_size (should_buffer) to get the comm
637
// before we can do an open for any of the modes
644
void build_offsets (struct adios_bp_buffer_struct_v1 * b
645
,MPI_Offset * offsets, uint64_t size, char * group_name
646
,struct adios_index_process_group_struct_v1 * pg_root
651
if (!strcasecmp (pg_root->group_name, group_name))
657
size = pg_root->next->offset_in_file - pg_root->offset_in_file;
661
size = b->pg_index_offset - pg_root->offset_in_file;
664
offsets [pg_root->process_id * 3] = pg_root->offset_in_file;
665
offsets [pg_root->process_id * 3 + 1] = size;
666
offsets [pg_root->process_id * 3 + 2] = b->version;
669
pg_root = pg_root->next;
673
enum ADIOS_FLAG adios_mpi_lustre_should_buffer (struct adios_file_struct * fd
674
,struct adios_method_struct * method
678
struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
682
int flag; // used for coordinating the MPI_File_open
689
gettimeofday (&t21, NULL);
692
name = malloc (strlen (method->base_path) + strlen (fd->name) + 1);
693
sprintf (name, "%s%s", method->base_path, fd->name);
695
adios_var_to_comm (fd->group->group_comm
696
,fd->group->adios_host_language_fortran
700
if (md->group_comm != MPI_COMM_NULL)
702
MPI_Comm_rank (md->group_comm, &md->rank);
703
MPI_Comm_size (md->group_comm, &md->size);
705
fd->group->process_id = md->rank;
707
if (md->rank == md->size - 1)
711
previous = md->rank - 1;
716
#define LUSTRE_STRIPE_UNIT 65536
720
case adios_mode_read:
722
if (md->group_comm == MPI_COMM_NULL || md->rank == 0)
724
err = MPI_File_open (MPI_COMM_SELF, name, MPI_MODE_RDONLY
725
,MPI_INFO_NULL, &md->fh
727
if (err != MPI_SUCCESS)
729
char e [MPI_MAX_ERROR_STRING];
731
memset (e, 0, MPI_MAX_ERROR_STRING);
732
MPI_Error_string (err, e, &len);
733
fprintf (stderr, "MPI open read failed for %s: '%s'\n"
738
return adios_flag_no;
741
MPI_Offset file_size;
742
MPI_File_get_size (md->fh, &file_size);
743
md->b.file_size = file_size;
745
adios_init_buffer_read_version (&md->b);
746
MPI_File_seek (md->fh, md->b.file_size - md->b.length
749
MPI_File_read (md->fh, md->b.buff, md->b.length, MPI_BYTE
752
adios_parse_version (&md->b, &md->b.version);
754
adios_init_buffer_read_index_offsets (&md->b);
755
// already in the buffer
756
adios_parse_index_offsets_v1 (&md->b);
758
adios_init_buffer_read_process_group_index (&md->b);
759
MPI_File_seek (md->fh, md->b.pg_index_offset
762
MPI_File_read (md->fh, md->b.buff, md->b.pg_size, MPI_BYTE
765
adios_parse_process_group_index_v1 (&md->b
770
adios_init_buffer_read_vars_index (&md->b);
771
MPI_File_seek (md->fh, md->b.vars_index_offset
774
MPI_File_read (md->fh, md->b.buff, md->b.vars_size, MPI_BYTE
777
adios_parse_vars_index_v1 (&md->b, &md->old_vars_root);
779
adios_init_buffer_read_attributes_index (&md->b);
780
MPI_File_seek (md->fh, md->b.attrs_index_offset
783
MPI_File_read (md->fh, md->b.buff, md->b.attrs_size, MPI_BYTE
786
adios_parse_attributes_index_v1 (&md->b, &md->old_attrs_root);
789
fd->base_offset = md->b.end_of_pgs;
792
if ( md->group_comm != MPI_COMM_NULL
793
&& md->group_comm != MPI_COMM_SELF
798
MPI_Offset * offsets = malloc ( sizeof (MPI_Offset)
801
memset (offsets, 0, sizeof (MPI_Offset) * md->size * 3);
803
// go through the pg index to build the offsets array
804
build_offsets (&md->b, offsets, md->size
805
,fd->group->name, md->old_pg_root
807
MPI_Scatter (offsets, 3, MPI_LONG_LONG
808
,MPI_IN_PLACE, 3, MPI_LONG_LONG
811
md->b.read_pg_offset = offsets [0];
812
md->b.read_pg_size = offsets [1];
817
MPI_Offset offset [3];
818
offset [0] = offset [1] = offset [2] = 0;
821
,offset, 3, MPI_LONG_LONG
825
md->b.read_pg_offset = offset [0];
826
md->b.read_pg_size = offset [1];
827
md->b.version = offset [2];
831
// cascade the opens to avoid trashing the metadata server
834
// note rank 0 is already open
835
// don't open it again here
839
MPI_Isend (&flag, 1, MPI_INT, next, current
840
,md->group_comm, &md->req
846
MPI_Recv (&flag, 1, MPI_INT, previous, previous
847
,md->group_comm, &md->status
851
MPI_Isend (&flag, 1, MPI_INT, next, current
852
,md->group_comm, &md->req
855
err = MPI_File_open (MPI_COMM_SELF, name
862
if (err != MPI_SUCCESS)
864
char e [MPI_MAX_ERROR_STRING];
866
memset (e, 0, MPI_MAX_ERROR_STRING);
867
MPI_Error_string (err, e, &len);
868
fprintf (stderr, "MPI open write failed for %s: '%s'\n"
873
return adios_flag_no;
879
case adios_mode_write:
882
fd->pg_start_in_file = 0;
884
gettimeofday (&t16, NULL);
887
if (md->group_comm != MPI_COMM_NULL)
891
MPI_Offset * offsets = malloc ( sizeof (MPI_Offset)
895
// round up to LUSTRE_STRIPE_UNIT (64KB)
896
if (fd->write_size_bytes % LUSTRE_STRIPE_UNIT)
897
offsets [0] = (fd->write_size_bytes / LUSTRE_STRIPE_UNIT + 1)
898
* LUSTRE_STRIPE_UNIT;
900
offsets [0] = fd->write_size_bytes;
902
MPI_Gather (MPI_IN_PLACE, 1, MPI_LONG_LONG
903
,offsets, 1, MPI_LONG_LONG
907
uint64_t last_offset = offsets [0];
908
offsets [0] = fd->base_offset;
909
for (i = 1; i < md->size; i++)
911
uint64_t this_offset = offsets [i];
912
offsets [i] = offsets [i - 1] + last_offset;
913
last_offset = this_offset;
915
// How to handle that each processor has varying amount of data??
916
md->striping_unit = offsets[1] - offsets[0];
917
if (md->striping_unit > 4 * 1024 * 1024 * 1024L)
919
md->striping_unit = 4 * 1024 * 1024 * 1024L;
922
md->b.pg_index_offset = offsets [md->size - 1]
924
MPI_Scatter (offsets, 1, MPI_LONG_LONG
925
,MPI_IN_PLACE, 1, MPI_LONG_LONG
928
fd->base_offset = offsets [0];
929
fd->pg_start_in_file = fd->base_offset;
935
if (fd->write_size_bytes % LUSTRE_STRIPE_UNIT)
936
offset = (fd->write_size_bytes / LUSTRE_STRIPE_UNIT + 1)
937
* LUSTRE_STRIPE_UNIT;
939
offset = fd->write_size_bytes;
941
MPI_Gather (&offset, 1, MPI_LONG_LONG
947
,&offset, 1, MPI_LONG_LONG
950
fd->base_offset = offset;
951
fd->pg_start_in_file = fd->base_offset;
956
md->b.pg_index_offset = fd->write_size_bytes;
960
gettimeofday (&t6, NULL);
962
// cascade the opens to avoid trashing the metadata server
965
unlink (name); // make sure clean
967
if (method->parameters)
969
adios_mpi_lustre_set_striping_unit (name
973
adios_mpi_lustre_set_block_unit (&md->block_unit, method->parameters);
975
err = MPI_File_open (MPI_COMM_SELF, name
976
,MPI_MODE_WRONLY | MPI_MODE_CREATE
981
md->striping_unit = adios_mpi_lustre_get_striping_unit(md->fh, name);
985
MPI_Isend (&flag, 1, MPI_INT, next, current
986
,md->group_comm, &md->req
992
MPI_Recv (&flag, 1, MPI_INT, previous, previous
993
,md->group_comm, &md->status
997
MPI_Isend (&flag, 1, MPI_INT, next, current
998
,md->group_comm, &md->req
1002
adios_mpi_lustre_set_block_unit (&md->block_unit, method->parameters);
1003
err = MPI_File_open (MPI_COMM_SELF, name
1008
md->striping_unit = adios_mpi_lustre_get_striping_unit(md->fh, name);
1011
if (err != MPI_SUCCESS)
1013
char e [MPI_MAX_ERROR_STRING];
1015
memset (e, 0, MPI_MAX_ERROR_STRING);
1016
MPI_Error_string (err, e, &len);
1017
fprintf (stderr, "MPI open write failed for %s: '%s'\n"
1022
return adios_flag_no;
1028
case adios_mode_append:
1031
adios_buffer_struct_clear (&md->b);
1033
err = MPI_File_open (MPI_COMM_SELF, name, MPI_MODE_RDONLY
1034
,MPI_INFO_NULL, &md->fh
1037
if (err != MPI_SUCCESS)
1040
err = MPI_File_open (MPI_COMM_SELF, name
1041
,MPI_MODE_WRONLY | MPI_MODE_CREATE
1042
,MPI_INFO_NULL, &md->fh
1045
if (err != MPI_SUCCESS)
1047
char e [MPI_MAX_ERROR_STRING];
1049
memset (e, 0, MPI_MAX_ERROR_STRING);
1050
MPI_Error_string (err, e, &len);
1051
fprintf (stderr, "MPI open write failed for %s: '%s'\n"
1056
return adios_flag_no;
1058
md->striping_unit = adios_mpi_lustre_get_striping_unit(md->fh, name);
1063
if (md->group_comm == MPI_COMM_NULL || md->rank == 0)
1065
if (err != MPI_SUCCESS)
1067
md->b.file_size = 0;
1071
MPI_Offset file_size;
1072
MPI_File_get_size (md->fh, &file_size);
1073
md->b.file_size = file_size;
1076
adios_init_buffer_read_version (&md->b);
1077
MPI_File_seek (md->fh, md->b.file_size - md->b.length
1080
MPI_File_read (md->fh, md->b.buff, md->b.length, MPI_BYTE
1083
adios_parse_version (&md->b, &md->b.version);
1085
adios_init_buffer_read_index_offsets (&md->b);
1086
// already in the buffer
1087
adios_parse_index_offsets_v1 (&md->b);
1089
adios_init_buffer_read_process_group_index (&md->b);
1090
MPI_File_seek (md->fh, md->b.pg_index_offset
1093
MPI_File_read (md->fh, md->b.buff, md->b.pg_size, MPI_BYTE
1096
adios_parse_process_group_index_v1 (&md->b
1100
adios_init_buffer_read_vars_index (&md->b);
1101
MPI_File_seek (md->fh, md->b.vars_index_offset
1104
MPI_File_read (md->fh, md->b.buff, md->b.vars_size, MPI_BYTE
1107
adios_parse_vars_index_v1 (&md->b, &md->old_vars_root);
1109
adios_init_buffer_read_attributes_index (&md->b);
1110
MPI_File_seek (md->fh, md->b.attrs_index_offset
1113
MPI_File_read (md->fh, md->b.buff, md->b.attrs_size
1114
,MPI_BYTE, &md->status
1116
adios_parse_attributes_index_v1 (&md->b
1117
,&md->old_attrs_root
1120
fd->base_offset = md->b.end_of_pgs;
1121
fd->pg_start_in_file = fd->base_offset;
1125
fd->base_offset = 0;
1126
fd->pg_start_in_file = 0;
1129
MPI_File_close (&md->fh);
1133
fd->base_offset = 0;
1134
fd->pg_start_in_file = 0;
1137
// cascade the opens to avoid trashing the metadata server
1140
// we know it exists, because we created it if it didn't
1141
// when reading the old file so can just open wronly
1142
// but adding the create for consistency with write mode
1143
// so it is easier to merge write/append later
1144
err = MPI_File_open (MPI_COMM_SELF, name
1145
,MPI_MODE_WRONLY | MPI_MODE_CREATE
1149
md->striping_unit = adios_mpi_lustre_get_striping_unit(md->fh, name);
1152
MPI_Isend (&flag, 1, MPI_INT, next, current
1153
,md->group_comm, &md->req
1159
MPI_Recv (&flag, 1, MPI_INT, previous, previous
1160
,md->group_comm, &md->status
1164
MPI_Isend (&flag, 1, MPI_INT, next, current
1165
,md->group_comm, &md->req
1168
err = MPI_File_open (MPI_COMM_SELF, name
1173
md->striping_unit = adios_mpi_lustre_get_striping_unit(md->fh, name);
1176
if (err != MPI_SUCCESS)
1178
char e [MPI_MAX_ERROR_STRING];
1180
memset (e, 0, MPI_MAX_ERROR_STRING);
1181
MPI_Error_string (err, e, &len);
1182
fprintf (stderr, "MPI open write failed for %s: '%s'\n"
1187
return adios_flag_no;
1190
if (md->group_comm != MPI_COMM_NULL)
1194
MPI_Offset * offsets = malloc ( sizeof (MPI_Offset)
1198
if (fd->write_size_bytes % LUSTRE_STRIPE_UNIT)
1199
offsets [0] = (fd->write_size_bytes / LUSTRE_STRIPE_UNIT + 1)
1200
* LUSTRE_STRIPE_UNIT;
1202
offsets [0] = fd->write_size_bytes;
1204
MPI_Gather (MPI_IN_PLACE, 1, MPI_LONG_LONG
1205
,offsets, 1, MPI_LONG_LONG
1209
uint64_t last_offset = offsets [0];
1210
offsets [0] = fd->base_offset;
1211
for (i = 1; i < md->size; i++)
1213
uint64_t this_offset = offsets [i];
1214
offsets [i] = offsets [i - 1] + last_offset;
1215
last_offset = this_offset;
1217
md->b.pg_index_offset = offsets [md->size - 1]
1219
MPI_Scatter (offsets, 1, MPI_LONG_LONG
1220
,MPI_IN_PLACE, 1, MPI_LONG_LONG
1223
fd->base_offset = offsets [0];
1224
fd->pg_start_in_file = fd->base_offset;
1230
if (fd->write_size_bytes % LUSTRE_STRIPE_UNIT)
1231
offset = (fd->write_size_bytes / LUSTRE_STRIPE_UNIT + 1)
1232
* LUSTRE_STRIPE_UNIT;
1234
offset = fd->write_size_bytes;
1237
MPI_Gather (&offset, 1, MPI_LONG_LONG
1242
MPI_Scatter (0, 0, 0
1243
,&offset, 1, MPI_LONG_LONG
1246
fd->base_offset = offset;
1247
fd->pg_start_in_file = fd->base_offset;
1252
md->b.pg_index_offset = fd->write_size_bytes;
1255
// cascade the opens to avoid trashing the metadata server
1258
// we know it exists, because we created it if it didn't
1259
// when reading the old file so can just open wronly
1260
// but adding the create for consistency with write mode
1261
// so it is easier to merge write/append later
1262
err = MPI_File_open (MPI_COMM_SELF, name
1263
,MPI_MODE_WRONLY | MPI_MODE_CREATE
1267
md->striping_unit = adios_mpi_lustre_get_striping_unit(md->fh, name);
1270
MPI_Isend (&flag, 1, MPI_INT, next, current
1271
,md->group_comm, &md->req
1277
MPI_Recv (&flag, 1, MPI_INT, previous, previous
1278
,md->group_comm, &md->status
1282
MPI_Isend (&flag, 1, MPI_INT, next, current
1283
,md->group_comm, &md->req
1286
err = MPI_File_open (MPI_COMM_SELF, name
1291
md->striping_unit = adios_mpi_lustre_get_striping_unit(md->fh, name);
1294
if (err != MPI_SUCCESS)
1296
char e [MPI_MAX_ERROR_STRING];
1298
memset (e, 0, MPI_MAX_ERROR_STRING);
1299
MPI_Error_string (err, e, &len);
1300
fprintf (stderr, "MPI open write failed for %s: '%s'\n"
1305
return adios_flag_no;
1313
fprintf (stderr, "Unknown file mode: %d\n", fd->mode);
1317
return adios_flag_no;
1323
if (fd->shared_buffer == adios_flag_no && fd->mode != adios_mode_read)
1325
// write the process group header
1326
adios_write_process_group_header_v1 (fd, fd->write_size_bytes);
1329
count = adios_mpi_lustre_striping_unit_write(
1335
if (count != fd->bytes_written)
1337
fprintf (stderr, "a:MPI method tried to write %llu, "
1343
fd->base_offset += count;
1345
fd->bytes_written = 0;
1346
adios_shared_buffer_free (&md->b);
1348
// setup for writing vars
1349
adios_write_open_vars_v1 (fd);
1350
md->vars_start = fd->base_offset;
1351
md->vars_header_size = fd->offset;
1352
fd->base_offset += fd->offset;
1353
MPI_File_seek (md->fh, md->vars_header_size, MPI_SEEK_CUR);
1355
fd->bytes_written = 0;
1356
adios_shared_buffer_free (&md->b);
1360
gettimeofday (&t22, NULL);
1362
return fd->shared_buffer;
1365
void adios_mpi_lustre_write (struct adios_file_struct * fd
1366
,struct adios_var_struct * v
1368
,struct adios_method_struct * method
1371
struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
1372
method->method_data;
1374
if (v->got_buffer == adios_flag_yes)
1376
if (data != v->data) // if the user didn't give back the same thing
1378
if (v->free_data == adios_flag_yes)
1381
adios_method_buffer_free (v->data_size);
1386
// we already saved all of the info, so we're ok.
1391
if (fd->shared_buffer == adios_flag_no)
1393
// var payload sent for sizing information
1394
adios_write_var_header_v1 (fd, v);
1397
count = adios_mpi_lustre_striping_unit_write(
1403
if (count != fd->bytes_written)
1405
fprintf (stderr, "b:MPI method tried to write %llu, "
1411
fd->base_offset += count;
1413
fd->bytes_written = 0;
1414
adios_shared_buffer_free (&md->b);
1417
// adios_write_var_payload_v1 (fd, v);
1418
uint64_t var_size = adios_get_var_size (v, fd->group, v->data);
1419
if (fd->base_offset + var_size > fd->pg_start_in_file + fd->write_size_bytes)
1420
fprintf (stderr, "adios_mpi_write exceeds pg bound. File is corrupted. "
1421
"Need to enlarge group size. \n");
1422
count = adios_mpi_lustre_striping_unit_write(
1428
if (count != var_size)
1430
fprintf (stderr, "c:MPI method tried to write %llu, "
1436
fd->base_offset += count;
1438
fd->bytes_written = 0;
1439
adios_shared_buffer_free (&md->b);
1442
static int writes_seen = 0;
1444
if (writes_seen == 0) gettimeofday (&t24, NULL);
1445
else if (writes_seen == 1) gettimeofday (&t25, NULL);
1450
void adios_mpi_lustre_get_write_buffer (struct adios_file_struct * fd
1451
,struct adios_var_struct * v
1454
,struct adios_method_struct * method
1457
uint64_t mem_allowed;
1466
if (v->data && v->free_data)
1468
adios_method_buffer_free (v->data_size);
1472
mem_allowed = adios_method_buffer_alloc (*size);
1473
if (mem_allowed == *size)
1475
*buffer = malloc (*size);
1478
adios_method_buffer_free (mem_allowed);
1479
fprintf (stderr, "Out of memory allocating %llu bytes for %s\n"
1482
v->got_buffer = adios_flag_no;
1483
v->free_data = adios_flag_no;
1491
v->got_buffer = adios_flag_yes;
1492
v->free_data = adios_flag_yes;
1493
v->data_size = mem_allowed;
1499
adios_method_buffer_free (mem_allowed);
1500
fprintf (stderr, "OVERFLOW: Cannot allocate requested buffer of %llu "
1510
void adios_mpi_lustre_read (struct adios_file_struct * fd
1511
,struct adios_var_struct * v, void * buffer
1512
,uint64_t buffer_size
1513
,struct adios_method_struct * method
1517
v->data_size = buffer_size;
1520
static void adios_mpi_lustre_do_read (struct adios_file_struct * fd
1521
,struct adios_method_struct * method
1524
struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
1525
method->method_data;
1526
struct adios_var_struct * v = fd->group->vars;
1528
struct adios_parse_buffer_struct data;
1532
data.buffer_len = 0;
1534
switch (md->b.version & ADIOS_VERSION_NUM_MASK)
1538
// the three section headers
1539
struct adios_process_group_header_struct_v1 pg_header;
1540
struct adios_vars_header_struct_v1 vars_header;
1541
struct adios_attributes_header_struct_v1 attrs_header;
1543
struct adios_var_header_struct_v1 var_header;
1544
struct adios_var_payload_struct_v1 var_payload;
1545
struct adios_attribute_struct_v1 attribute;
1549
adios_init_buffer_read_process_group (&md->b);
1550
MPI_File_seek (md->fh, md->b.read_pg_offset
1553
MPI_File_read (md->fh, md->b.buff, md->b.read_pg_size, MPI_BYTE
1556
adios_parse_process_group_header_v1 (&md->b, &pg_header);
1558
adios_parse_vars_header_v1 (&md->b, &vars_header);
1560
for (i = 0; i < vars_header.count; i++)
1562
memset (&var_payload, 0
1563
,sizeof (struct adios_var_payload_struct_v1)
1565
adios_parse_var_data_header_v1 (&md->b, &var_header);
1567
struct adios_var_struct * v1 = v;
1570
if ( strcasecmp (var_header.name, v1->name)
1571
|| strcasecmp (var_header.path, v1->path)
1584
var_payload.payload = v1->data;
1585
adios_parse_var_data_payload_v1 (&md->b, &var_header
1592
printf ("MPI read: skipping name: %s path: %s\n"
1593
,var_header.name, var_header.path
1595
adios_parse_var_data_payload_v1 (&md->b, &var_header
1600
adios_clear_var_header_v1 (&var_header);
1604
adios_parse_attributes_header_v1 (&md->b, &attrs_header);
1606
for (i = 0; i < attrs_header.count; i++)
1608
adios_parse_attribute_v1 (&md->b, &attribute);
1609
adios_clear_attribute_v1 (&attribute);
1612
adios_clear_process_group_header_v1 (&pg_header);
1618
fprintf (stderr, "MPI read: file version unknown: %u\n"
1624
adios_buffer_struct_clear (&md->b);
1627
void adios_mpi_lustre_close (struct adios_file_struct * fd
1628
,struct adios_method_struct * method
1631
struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
1632
method->method_data;
1633
struct adios_attribute_struct * a = fd->group->attributes;
1635
struct adios_index_process_group_struct_v1 * new_pg_root = 0;
1636
struct adios_index_var_struct_v1 * new_vars_root = 0;
1637
struct adios_index_attribute_struct_v1 * new_attrs_root = 0;
1639
gettimeofday (&t23, NULL);
1640
static int iteration = 0;
1645
case adios_mode_read:
1647
// read the index to find the place to start reading
1648
adios_mpi_lustre_do_read (fd, method);
1649
struct adios_var_struct * v = fd->group->vars;
1659
case adios_mode_write:
1662
uint64_t buffer_size = 0;
1663
uint64_t buffer_offset = 0;
1664
uint64_t index_start = md->b.pg_index_offset;
1666
if (fd->shared_buffer == adios_flag_no)
1669
// set it up so that it will start at 0, but have correct sizes
1670
MPI_File_get_position (md->fh, &new_off);
1671
fd->offset = fd->base_offset - md->vars_start;
1673
fd->buffer_size = 0;
1674
adios_write_close_vars_v1 (fd);
1675
// fd->vars_start gets updated with the size written
1678
count = adios_mpi_lustre_striping_unit_write(
1682
md->vars_header_size,
1684
if (count != md->vars_header_size)
1686
fprintf (stderr, "d:MPI method tried to write %llu, "
1688
,md->vars_header_size
1693
fd->bytes_written = 0;
1694
adios_shared_buffer_free (&md->b);
1696
adios_write_open_attributes_v1 (fd);
1697
md->vars_start = new_off;
1698
md->vars_header_size = fd->offset;
1699
MPI_File_seek (md->fh, new_off + md->vars_header_size
1701
); // go back to end, but after attr header
1702
fd->base_offset += fd->offset; // add size of header
1704
fd->bytes_written = 0;
1708
adios_write_attribute_v1 (fd, a);
1709
if (fd->base_offset + fd->bytes_written > fd->pg_start_in_file + fd->write_size_bytes)
1710
fprintf (stderr, "adios_mpi_write exceeds pg bound. File is corrupted. "
1711
"Need to enlarge group size. \n");
1712
count = adios_mpi_lustre_striping_unit_write(
1718
if (count != fd->bytes_written)
1720
fprintf (stderr, "e:MPI method tried to write %llu, "
1726
fd->base_offset += count;
1728
fd->bytes_written = 0;
1729
adios_shared_buffer_free (&md->b);
1734
// set it up so that it will start at 0, but have correct sizes
1735
fd->offset = fd->base_offset - md->vars_start;
1737
fd->buffer_size = 0;
1738
adios_write_close_attributes_v1 (fd);
1739
// fd->vars_start gets updated with the size written
1740
count = adios_mpi_lustre_striping_unit_write(
1744
md->vars_header_size,
1746
if (count != md->vars_header_size)
1748
fprintf (stderr, "f:MPI method tried to write %llu, "
1750
,md->vars_header_size
1755
fd->bytes_written = 0;
1759
gettimeofday (&t19, NULL);
1762
gettimeofday (&t7, NULL);
1765
gettimeofday (&t12, NULL);
1767
// build index appending to any existing index
1768
adios_build_index_v1 (fd, &md->old_pg_root, &md->old_vars_root
1769
,&md->old_attrs_root
1771
// if collective, gather the indexes from the rest and call
1772
if (md->group_comm != MPI_COMM_NULL)
1776
int * index_sizes = malloc (4 * md->size);
1777
int * index_offsets = malloc (4 * md->size);
1778
char * recv_buffer = 0;
1780
uint32_t total_size = 0;
1783
MPI_Gather (&size, 1, MPI_INT
1784
,index_sizes, 1, MPI_INT
1788
for (i = 0; i < md->size; i++)
1790
index_offsets [i] = total_size;
1791
total_size += index_sizes [i];
1794
recv_buffer = malloc (total_size);
1796
MPI_Gatherv (&size, 0, MPI_BYTE
1797
,recv_buffer, index_sizes, index_offsets
1798
,MPI_BYTE, 0, md->group_comm
1801
char * buffer_save = md->b.buff;
1802
uint64_t buffer_size_save = md->b.length;
1803
uint64_t offset_save = md->b.offset;
1805
for (i = 1; i < md->size; i++)
1807
md->b.buff = recv_buffer + index_offsets [i];
1808
md->b.length = index_sizes [i];
1811
adios_parse_process_group_index_v1 (&md->b
1814
adios_parse_vars_index_v1 (&md->b, &new_vars_root);
1815
adios_parse_attributes_index_v1 (&md->b
1818
adios_merge_index_v1 (&md->old_pg_root
1820
,&md->old_attrs_root
1821
,new_pg_root, new_vars_root
1828
md->b.buff = buffer_save;
1829
md->b.length = buffer_size_save;
1830
md->b.offset = offset_save;
1834
free (index_offsets);
1838
adios_write_index_v1 (&buffer, &buffer_size, &buffer_offset
1844
MPI_Gather (&buffer_size, 1, MPI_INT, 0, 0, MPI_INT
1847
MPI_Gatherv (buffer, buffer_size, MPI_BYTE
1855
gettimeofday (&t13, NULL);
1857
if (fd->shared_buffer == adios_flag_yes)
1859
// everyone writes their data
1860
if (fd->base_offset + fd->bytes_written > fd->pg_start_in_file + fd->write_size_bytes)
1861
fprintf (stderr, "adios_mpi_write exceeds pg bound. File is corrupted. "
1862
"Need to enlarge group size. \n");
1864
adios_mpi_lustre_striping_unit_write(
1874
adios_write_index_v1 (&buffer, &buffer_size, &buffer_offset
1875
,index_start, md->old_pg_root
1879
adios_write_version_v1 (&buffer, &buffer_size, &buffer_offset);
1881
adios_mpi_lustre_striping_unit_write(
1883
md->b.pg_index_offset,
1889
gettimeofday (&t8, NULL);
1892
gettimeofday (&t20, NULL);
1895
gettimeofday (&t14, NULL);
1906
adios_clear_index_v1 (new_pg_root, new_vars_root, new_attrs_root);
1907
adios_clear_index_v1 (md->old_pg_root, md->old_vars_root
1913
md->old_pg_root = 0;
1914
md->old_vars_root = 0;
1915
md->old_attrs_root = 0;
1917
gettimeofday (&t11, NULL);
1918
t15.tv_sec = t11.tv_sec;
1919
t15.tv_usec = t11.tv_usec;
1925
case adios_mode_append:
1928
uint64_t buffer_size = 0;
1929
uint64_t buffer_offset = 0;
1930
uint64_t index_start = md->b.pg_index_offset;
1932
if (fd->shared_buffer == adios_flag_no)
1935
// set it up so that it will start at 0, but have correct sizes
1936
MPI_File_get_position (md->fh, &new_off);
1937
fd->offset = fd->base_offset - md->vars_start;
1939
fd->buffer_size = 0;
1940
adios_write_close_vars_v1 (fd);
1941
// fd->vars_start gets updated with the size written
1943
count = adios_mpi_lustre_striping_unit_write(
1947
md->vars_header_size,
1949
if (count != md->vars_header_size)
1951
fprintf (stderr, "d:MPI method tried to write %llu, "
1953
,md->vars_header_size
1958
fd->bytes_written = 0;
1959
adios_shared_buffer_free (&md->b);
1961
adios_write_open_attributes_v1 (fd);
1962
md->vars_start = new_off;
1963
md->vars_header_size = fd->offset;
1964
MPI_File_seek (md->fh, new_off + md->vars_header_size
1966
); // go back to end, but after attr header
1967
fd->base_offset += fd->offset; // add size of header
1969
fd->bytes_written = 0;
1973
adios_write_attribute_v1 (fd, a);
1974
count = adios_mpi_lustre_striping_unit_write(
1980
if (count != fd->bytes_written)
1982
fprintf (stderr, "e:MPI method tried to write %llu, "
1988
fd->base_offset += count;
1990
fd->bytes_written = 0;
1991
adios_shared_buffer_free (&md->b);
1996
// set it up so that it will start at 0, but have correct sizes
1997
fd->offset = fd->base_offset - md->vars_start;
1999
fd->buffer_size = 0;
2000
adios_write_close_attributes_v1 (fd);
2001
// fd->vars_start gets updated with the size written
2002
count = adios_mpi_lustre_striping_unit_write(
2006
md->vars_header_size,
2008
if (count != md->vars_header_size)
2010
fprintf (stderr, "f:MPI method tried to write %llu, "
2012
,md->vars_header_size
2017
fd->bytes_written = 0;
2020
// build index appending to any existing index
2021
adios_build_index_v1 (fd, &md->old_pg_root, &md->old_vars_root
2022
,&md->old_attrs_root
2024
// if collective, gather the indexes from the rest and call
2025
if (md->group_comm != MPI_COMM_NULL)
2029
int * index_sizes = malloc (4 * md->size);
2030
int * index_offsets = malloc (4 * md->size);
2031
char * recv_buffer = 0;
2033
uint32_t total_size = 0;
2036
MPI_Gather (&size, 1, MPI_INT
2037
,index_sizes, 1, MPI_INT
2041
for (i = 0; i < md->size; i++)
2043
index_offsets [i] = total_size;
2044
total_size += index_sizes [i];
2047
recv_buffer = malloc (total_size);
2049
MPI_Gatherv (&size, 0, MPI_BYTE
2050
,recv_buffer, index_sizes, index_offsets
2051
,MPI_BYTE, 0, md->group_comm
2054
char * buffer_save = md->b.buff;
2055
uint64_t buffer_size_save = md->b.length;
2056
uint64_t offset_save = md->b.offset;
2058
for (i = 1; i < md->size; i++)
2060
md->b.buff = recv_buffer + index_offsets [i];
2061
md->b.length = index_sizes [i];
2064
adios_parse_process_group_index_v1 (&md->b
2067
adios_parse_vars_index_v1 (&md->b, &new_vars_root);
2068
adios_parse_attributes_index_v1 (&md->b
2071
adios_merge_index_v1 (&md->old_pg_root
2073
,&md->old_attrs_root
2074
,new_pg_root, new_vars_root
2081
md->b.buff = buffer_save;
2082
md->b.length = buffer_size_save;
2083
md->b.offset = offset_save;
2087
free (index_offsets);
2091
adios_write_index_v1 (&buffer, &buffer_size, &buffer_offset
2097
MPI_Gather (&buffer_size, 1, MPI_INT, 0, 0, MPI_INT
2100
MPI_Gatherv (buffer, buffer_size, MPI_BYTE
2107
if (fd->shared_buffer == adios_flag_yes)
2109
// everyone writes their data
2110
adios_mpi_lustre_striping_unit_write(
2120
adios_write_index_v1 (&buffer, &buffer_size, &buffer_offset
2121
,index_start, md->old_pg_root
2125
adios_write_version_v1 (&buffer, &buffer_size, &buffer_offset);
2127
adios_mpi_lustre_striping_unit_write(
2129
md->b.pg_index_offset,
2137
adios_clear_index_v1 (new_pg_root, new_vars_root, new_attrs_root);
2138
adios_clear_index_v1 (md->old_pg_root, md->old_vars_root
2144
md->old_pg_root = 0;
2145
md->old_vars_root = 0;
2146
md->old_attrs_root = 0;
2153
fprintf (stderr, "Unknown file mode: %d\n", fd->mode);
2158
MPI_File_close (&md->fh);
2160
if ( md->group_comm != MPI_COMM_WORLD
2161
&& md->group_comm != MPI_COMM_SELF
2162
&& md->group_comm != MPI_COMM_NULL
2165
md->group_comm = MPI_COMM_NULL;
2170
memset (&md->status, 0, sizeof (MPI_Status));
2171
md->group_comm = MPI_COMM_NULL;
2173
adios_clear_index_v1 (md->old_pg_root, md->old_vars_root
2176
md->old_pg_root = 0;
2177
md->old_vars_root = 0;
2178
md->old_attrs_root = 0;
2180
print_metrics (md, iteration++);
2184
void adios_mpi_lustre_finalize (int mype, struct adios_method_struct * method)
2186
// nothing to do here
2187
if (adios_mpi_lustre_initialized)
2188
adios_mpi_lustre_initialized = 0;
2191
void adios_mpi_lustre_end_iteration (struct adios_method_struct * method)
2195
void adios_mpi_lustre_start_calculation (struct adios_method_struct * method)
2199
void adios_mpi_lustre_stop_calculation (struct adios_method_struct * method)