2
* ADIOS is freely available under the terms of the BSD license described
3
* in the COPYING file in the top level directory of this source distribution.
5
* Copyright (c) 2008 - 2009. UT-BATTELLE, LLC. All rights reserved.
17
// see if we have MPI or other tools
25
#include "adios_transport_hooks.h"
26
#include "adios_bp_v1.h"
27
#include "adios_internals.h"
30
#if defined(__APPLE__)
31
# define O_LARGEFILE 0
34
static int adios_posix_initialized = 0;
36
struct adios_POSIX_data_struct
39
struct adios_bp_buffer_struct_v1 b;
41
// old index structs we read in and have to be merged in
42
struct adios_index_process_group_struct_v1 * old_pg_root;
43
struct adios_index_var_struct_v1 * old_vars_root;
44
struct adios_index_attribute_struct_v1 * old_attrs_root;
47
uint64_t vars_header_size;
49
// Metadata file handle
58
static void adios_var_to_comm (const char * comm_name
59
,enum ADIOS_FLAG host_language_fortran
66
int t = *(int *) data;
72
fprintf (stderr, "communicator not provided and none "
73
"listed in XML. Defaulting to "
77
*comm = MPI_COMM_SELF;
81
if (host_language_fortran == adios_flag_yes)
83
*comm = MPI_Comm_f2c (t);
87
*comm = *(MPI_Comm *) data;
93
if (!strcmp (comm_name, ""))
97
fprintf (stderr, "communicator not provided and none "
98
"listed in XML. Defaulting to "
102
*comm = MPI_COMM_SELF;
106
if (host_language_fortran == adios_flag_yes)
108
*comm = MPI_Comm_f2c (t);
112
*comm = *(MPI_Comm *) data;
120
fprintf (stderr, "communicator not provided but one "
121
"listed in XML. Defaulting to "
125
*comm = MPI_COMM_WORLD;
129
if (host_language_fortran == adios_flag_yes)
131
*comm = MPI_Comm_f2c (t);
135
*comm = *(MPI_Comm *) data;
143
fprintf (stderr, "coordination-communication not provided. "
144
"Using MPI_COMM_SELF instead\n"
147
*comm = MPI_COMM_SELF;
152
void adios_posix_init (const char * parameters
153
,struct adios_method_struct * method
156
struct adios_POSIX_data_struct * p = 0;
158
if (!adios_posix_initialized)
160
adios_posix_initialized = 1;
163
method->method_data = malloc (sizeof (struct adios_POSIX_data_struct));
164
p = (struct adios_POSIX_data_struct *) method->method_data;
165
adios_buffer_struct_init (&p->b);
167
p->old_vars_root = 0;
168
p->old_attrs_root = 0;
170
p->vars_header_size = 0;
173
p->group_comm = MPI_COMM_NULL;
179
int adios_posix_open (struct adios_file_struct * fd
180
,struct adios_method_struct * method, void * comm
186
char * name_with_rank, rank_string[16];
187
struct adios_POSIX_data_struct * p = (struct adios_POSIX_data_struct *)
190
// Need to figure out new the new fd->name, such as restart.bp.0, restart.bp.1....
191
adios_var_to_comm (fd->group->group_comm
192
,fd->group->adios_host_language_fortran
197
if (p->group_comm == MPI_COMM_NULL)
199
p->group_comm = MPI_COMM_SELF;
202
// if communicator is not MPI_COMM_NULL/MPI_COMM_SELF, subfiles will be generated in a dir.
203
if (p->group_comm != MPI_COMM_SELF)
205
char * n = strrchr (fd->name, '/');
215
MPI_Comm_rank (p->group_comm, &p->rank);
216
MPI_Comm_size (p->group_comm, &p->size);
218
sprintf (rank_string, "%d", p->rank);
219
// fd->name + '.' + MPI rank + '\0'
220
name_with_rank = malloc (strlen (n) + strlen (rank_string) + 2);
221
sprintf (name_with_rank, "%s.%s", n, rank_string);
223
// e.g., subfile_name is restart.bp.dir/restart.bp.0
224
subfile_name = malloc (strlen (fd->name)
226
+ strlen (method->base_path)
227
+ strlen (name_with_rank)
230
sprintf (subfile_name, "%s%s%s%s"
237
mdfile_name = malloc (strlen (method->base_path)
241
sprintf (mdfile_name, "%s%s"
246
free (name_with_rank);
251
// if the communicator is MPI_COMM_SELF, there won't be metadata file generated.
252
// The actually subfile name is the one supplied by the user
253
subfile_name = malloc (strlen (method->base_path) + strlen (fd->name) + 1);
254
sprintf (subfile_name, "%s%s", method->base_path, fd->name);
258
fd->subfile_index = p->rank;
261
if (stat (subfile_name, &s) == 0)
262
p->b.file_size = s.st_size;
266
case adios_mode_read:
268
p->b.f = open (subfile_name, O_RDONLY | O_LARGEFILE);
271
fprintf (stderr, "ADIOS POSIX: file not found: %s\n", fd->name);
278
fd->pg_start_in_file = 0;
283
case adios_mode_write:
286
// create dir to keep all the subfiles
287
if (p->group_comm != MPI_COMM_SELF)
291
char * dir_name = malloc (strlen (fd->name) + 4 + 1);
292
sprintf (dir_name, "%s%s"
297
mkdir (dir_name, S_IRWXU | S_IRWXG);
301
MPI_Barrier (p->group_comm);
304
p->b.f = open (subfile_name, O_WRONLY | O_CREAT | O_TRUNC | O_LARGEFILE
311
fprintf (stderr, "adios_posix_open failed for "
312
"base_path %s, subfile name %s\n"
313
,method->base_path, subfile_name
323
// open metadata file
324
if (p->group_comm != MPI_COMM_SELF)
328
p->mf = open (mdfile_name, O_WRONLY | O_CREAT | O_TRUNC | O_LARGEFILE
335
fprintf (stderr, "adios_posix_open failed for "
336
"base_path %s, metadata file name %s\n"
337
,method->base_path, mdfile_name
349
fd->pg_start_in_file = 0;
354
case adios_mode_append:
358
if (p->group_comm != MPI_COMM_SELF)
362
char * dir_name = malloc (strlen (fd->name) + 4 + 1);
363
sprintf (dir_name, "%s%s"
368
mkdir (dir_name, S_IRWXU | S_IRWXG);
372
MPI_Barrier (p->group_comm);
375
p->b.f = open (subfile_name, O_RDWR | O_LARGEFILE);
379
p->b.f = open (subfile_name, O_WRONLY | O_CREAT | O_LARGEFILE
386
fprintf (stderr, "adios_posix_open failed for "
387
"base_path %s, name %s\n"
388
,method->base_path, fd->name
398
// open metadata file
399
if (p->group_comm != MPI_COMM_SELF)
403
p->mf = open (mdfile_name, O_WRONLY | O_TRUNC | O_LARGEFILE
410
p->mf = open (mdfile_name, O_WRONLY| O_CREAT | O_LARGEFILE
417
fprintf (stderr, "adios_posix_open failed for "
418
"base_path %s, name %s\n"
419
,method->base_path, fd->name
433
// now we have to read the old stuff so we can merge it
434
// in at the end and set the base_offset for the old index
437
adios_posix_read_version (&p->b);
438
adios_parse_version (&p->b, &version);
440
switch (version & ADIOS_VERSION_NUM_MASK)
443
// read the old stuff and set the base offset
444
adios_posix_read_index_offsets (&p->b);
445
adios_parse_index_offsets_v1 (&p->b);
447
adios_posix_read_process_group_index (&p->b);
448
adios_parse_process_group_index_v1 (&p->b
452
// find the largest time index so we can append properly
453
struct adios_index_process_group_struct_v1 * pg;
454
uint32_t max_time_index = 0;
458
if (pg->time_index > max_time_index)
459
max_time_index = pg->time_index;
462
fd->group->time_index = ++max_time_index;
464
adios_posix_read_vars_index (&p->b);
465
adios_parse_vars_index_v1 (&p->b, &p->old_vars_root);
467
adios_posix_read_attributes_index (&p->b);
468
adios_parse_attributes_index_v1 (&p->b
472
fd->base_offset = p->b.end_of_pgs;
473
fd->pg_start_in_file = p->b.end_of_pgs;
477
fprintf (stderr, "Unknown bp version: %d. "
494
fprintf (stderr, "Unknown file mode: %d\n", fd->mode);
509
enum ADIOS_FLAG adios_posix_should_buffer (struct adios_file_struct * fd
510
,struct adios_method_struct * method
513
struct adios_POSIX_data_struct * p = (struct adios_POSIX_data_struct *)
516
if (fd->shared_buffer == adios_flag_no && fd->mode != adios_mode_read)
518
// write the process group header
519
adios_write_process_group_header_v1 (fd, fd->write_size_bytes);
521
lseek (p->b.f, fd->base_offset, SEEK_SET);
522
ssize_t s = write (p->b.f, fd->buffer, fd->bytes_written);
523
if (s != fd->bytes_written)
525
fprintf (stderr, "POSIX method tried to write %llu, "
531
fd->base_offset += s;
533
fd->bytes_written = 0;
534
adios_shared_buffer_free (&p->b);
536
// setup for writing vars
537
adios_write_open_vars_v1 (fd);
538
p->vars_start = lseek (p->b.f, fd->offset, SEEK_CUR); // save loc
539
p->vars_header_size = p->vars_start - fd->base_offset; // the size
540
p->vars_start -= fd->offset; // adjust to start of header
541
fd->base_offset += fd->offset; // add the size of the vars header
543
fd->bytes_written = 0;
544
adios_shared_buffer_free (&p->b);
547
return fd->shared_buffer; // buffer if there is space
550
void adios_posix_write (struct adios_file_struct * fd
551
,struct adios_var_struct * v
553
,struct adios_method_struct * method
556
struct adios_POSIX_data_struct * p = (struct adios_POSIX_data_struct *)
559
if (v->got_buffer == adios_flag_yes)
561
if (data != v->data) // if the user didn't give back the same thing
563
if (v->free_data == adios_flag_yes)
566
adios_method_buffer_free (v->data_size);
571
// we already saved all of the info, so we're ok.
576
if (fd->shared_buffer == adios_flag_no)
578
// var payload sent for sizing information
579
adios_write_var_header_v1 (fd, v);
580
ssize_t s = write (p->b.f, fd->buffer, fd->bytes_written);
581
if (s != fd->bytes_written)
583
fprintf (stderr, "POSIX method tried to write %llu, "
589
fd->base_offset += s;
591
fd->bytes_written = 0;
592
adios_shared_buffer_free (&p->b);
595
// adios_write_var_payload_v1 (fd, v);
596
uint64_t var_size = adios_get_var_size (v, fd->group, v->data);
597
if (fd->base_offset + var_size > fd->pg_start_in_file + fd->write_size_bytes)
598
fprintf (stderr, "adios_posix_write exceeds pg bound. File is corrupted. "
599
"Need to enlarge group size. \n");
602
uint64_t bytes_written = 0;
603
if (var_size > INT32_MAX)
605
to_write = INT32_MAX;
609
to_write = (int32_t) fd->bytes_written;
612
while (bytes_written < var_size)
614
bytes_written += write (p->b.f, v->data + bytes_written, to_write);
615
if (var_size > bytes_written)
617
if (var_size - bytes_written > INT32_MAX)
619
to_write = INT32_MAX;
623
to_write = var_size - bytes_written;
628
// s = write (p->b.f, v->data, var_size);
632
fprintf (stderr, "POSIX method tried to write %llu, "
638
fd->base_offset += s;
640
fd->bytes_written = 0;
641
adios_shared_buffer_free (&p->b);
645
void adios_posix_get_write_buffer (struct adios_file_struct * fd
646
,struct adios_var_struct * v
649
,struct adios_method_struct * method
652
uint64_t mem_allowed;
661
if (v->data && v->free_data)
663
adios_method_buffer_free (v->data_size);
667
mem_allowed = adios_method_buffer_alloc (*size);
668
if (mem_allowed == *size)
670
*buffer = malloc (*size);
673
adios_method_buffer_free (mem_allowed);
674
fprintf (stderr, "Out of memory allocating %llu bytes for %s\n"
677
v->got_buffer = adios_flag_no;
678
v->free_data = adios_flag_no;
686
v->got_buffer = adios_flag_yes;
687
v->free_data = adios_flag_yes;
688
v->data_size = mem_allowed;
694
adios_method_buffer_free (mem_allowed);
695
fprintf (stderr, "OVERFLOW: Cannot allocate requested buffer of %llu "
705
void adios_posix_read (struct adios_file_struct * fd
706
,struct adios_var_struct * v
708
,uint64_t buffer_size
709
,struct adios_method_struct * method
713
v->data_size = buffer_size;
716
static void adios_posix_do_write (struct adios_file_struct * fd
717
,struct adios_method_struct * method
719
,uint64_t buffer_size
722
struct adios_POSIX_data_struct * p = (struct adios_POSIX_data_struct *)
725
uint64_t bytes_written = 0;
727
if (fd->shared_buffer == adios_flag_yes)
729
lseek (p->b.f, p->b.end_of_pgs, SEEK_SET);
730
if (p->b.end_of_pgs + fd->bytes_written > fd->pg_start_in_file + fd->write_size_bytes)
731
fprintf (stderr, "adios_posix_write exceeds pg bound. File is corrupted. "
732
"Need to enlarge group size. \n");
734
if (fd->bytes_written > INT32_MAX)
736
to_write = INT32_MAX;
740
to_write = (int32_t) fd->bytes_written;
743
while (bytes_written < fd->bytes_written)
745
write (p->b.f, fd->buffer, to_write);
746
bytes_written += to_write;
747
if (fd->bytes_written > bytes_written)
749
if (fd->bytes_written - bytes_written > INT32_MAX)
751
to_write = INT32_MAX;
755
to_write = fd->bytes_written - bytes_written;
761
// index location calculation:
762
// for buffered, base_offset = 0, fd->offset = write loc
763
// for unbuffered, base_offset = write loc, fd->offset = 0
764
// for append buffered, base_offset = start, fd->offset = size
765
lseek (p->b.f, fd->base_offset + fd->offset, SEEK_SET);
766
write (p->b.f, buffer, buffer_size);
769
static void adios_posix_do_read (struct adios_file_struct * fd
770
,struct adios_method_struct * method
773
struct adios_POSIX_data_struct * p = (struct adios_POSIX_data_struct *)
775
struct adios_var_struct * v = fd->group->vars;
777
struct adios_parse_buffer_struct data;
783
uint32_t version = 0;
785
adios_posix_read_version (&p->b);
786
adios_parse_version (&p->b, &version);
788
switch (version & ADIOS_VERSION_NUM_MASK)
792
struct adios_index_process_group_struct_v1 * pg_root = 0;
793
struct adios_index_process_group_struct_v1 * pg_root_temp = 0;
794
struct adios_index_var_struct_v1 * vars_root = 0;
795
struct adios_index_attribute_struct_v1 * attrs_root = 0;
797
adios_posix_read_index_offsets (&p->b);
798
adios_parse_index_offsets_v1 (&p->b);
800
adios_posix_read_process_group_index (&p->b);
801
adios_parse_process_group_index_v1 (&p->b, &pg_root);
803
adios_posix_read_vars_index (&p->b);
804
adios_parse_vars_index_v1 (&p->b, &vars_root);
806
adios_posix_read_attributes_index (&p->b);
807
adios_parse_attributes_index_v1 (&p->b, &attrs_root);
810
// the three section headers
811
struct adios_process_group_header_struct_v1 pg_header;
812
struct adios_vars_header_struct_v1 vars_header;
813
struct adios_attributes_header_struct_v1 attrs_header;
815
struct adios_var_header_struct_v1 var_header;
816
struct adios_var_payload_struct_v1 var_payload;
817
struct adios_attribute_struct_v1 attribute;
821
pg_root_temp = pg_root;
822
while (pg_root_temp && pg_root_temp->next)
823
pg_root_temp = pg_root_temp->next;
825
p->b.read_pg_offset = pg_root_temp->offset_in_file;
826
if (pg_root_temp->next)
828
p->b.read_pg_size = pg_root_temp->next->offset_in_file
829
- pg_root_temp->offset_in_file;
833
p->b.read_pg_size = p->b.pg_index_offset
834
- pg_root_temp->offset_in_file;
837
adios_posix_read_process_group (&p->b);
838
adios_parse_process_group_header_v1 (&p->b, &pg_header);
840
adios_parse_vars_header_v1 (&p->b, &vars_header);
842
for (i = 0; i < vars_header.count; i++)
844
memset (&var_payload, 0
845
,sizeof (struct adios_var_payload_struct_v1)
847
adios_parse_var_data_header_v1 (&p->b, &var_header);
849
struct adios_var_struct * v1 = v;
852
if ( strcasecmp (var_header.name, v1->name)
853
|| strcasecmp (var_header.path, v1->path)
864
var_payload.payload = v1->data;
865
adios_parse_var_data_payload_v1 (&p->b, &var_header
872
adios_parse_var_data_payload_v1 (&p->b, &var_header
877
adios_clear_var_header_v1 (&var_header);
881
adios_parse_attributes_header_v1 (&p->b, &attrs_header);
883
for (i = 0; i < attrs_header.count; i++)
885
adios_parse_attribute_v1 (&p->b, &attribute);
886
adios_clear_attribute_v1 (&attribute);
889
adios_clear_process_group_header_v1 (&pg_header);
890
adios_clear_index_v1 (pg_root, vars_root, attrs_root);
895
fprintf (stderr, "POSIX read: file version unknown: %u\n"
901
adios_buffer_struct_clear (&p->b);
904
void adios_posix_close (struct adios_file_struct * fd
905
,struct adios_method_struct * method
908
struct adios_POSIX_data_struct * p = (struct adios_POSIX_data_struct *)
910
struct adios_attribute_struct * a = fd->group->attributes;
912
struct adios_index_process_group_struct_v1 * new_pg_root = 0;
913
struct adios_index_var_struct_v1 * new_vars_root = 0;
914
struct adios_index_attribute_struct_v1 * new_attrs_root = 0;
918
case adios_mode_write:
920
if (fd->shared_buffer == adios_flag_no)
923
// set it up so that it will start at 0, but have correct sizes
924
new_off = lseek (p->b.f, 0, SEEK_CUR);
925
fd->offset = fd->base_offset - p->vars_start;
928
adios_write_close_vars_v1 (fd);
929
// fd->vars_start gets updated with the size written
930
fd->offset = lseek (p->b.f, p->vars_start, SEEK_SET);
931
ssize_t s = write (p->b.f, fd->buffer, p->vars_header_size);
932
if (s != fd->vars_start)
934
fprintf (stderr, "POSIX method tried to write %llu, "
941
fd->bytes_written = 0;
942
adios_shared_buffer_free (&p->b);
944
new_off = lseek (p->b.f, new_off, SEEK_SET); // go back to end
945
adios_write_open_attributes_v1 (fd);
946
p->vars_start = lseek (p->b.f, fd->offset, SEEK_CUR); // save loc
947
p->vars_header_size = p->vars_start - fd->base_offset;
948
p->vars_start -= fd->offset; // adjust to start of header
949
fd->base_offset += fd->offset; // add size of header
951
fd->bytes_written = 0;
955
adios_write_attribute_v1 (fd, a);
956
if (fd->base_offset + fd->bytes_written > fd->pg_start_in_file + fd->write_size_bytes)
957
fprintf (stderr, "adios_posix_write exceeds pg bound. File is corrupted. "
958
"Need to enlarge group size. \n");
959
ssize_t s = write (p->b.f, fd->buffer, fd->bytes_written);
960
if (s != fd->bytes_written)
962
fprintf (stderr, "POSIX method tried to write %llu, "
968
fd->base_offset += s;
970
fd->bytes_written = 0;
971
adios_shared_buffer_free (&p->b);
976
// set it up so that it will start at 0, but have correct sizes
977
fd->offset = fd->base_offset - p->vars_start;
980
adios_write_close_attributes_v1 (fd);
981
fd->offset = lseek (p->b.f, p->vars_start, SEEK_SET);
982
// fd->vars_start gets updated with the size written
983
s = write (p->b.f, fd->buffer, p->vars_header_size);
984
if (s != p->vars_header_size)
986
fprintf (stderr, "POSIX method tried to write %llu, "
993
fd->bytes_written = 0;
996
// buffering or not, write the index
998
uint64_t buffer_size = 0;
999
uint64_t buffer_offset = 0;
1000
uint64_t index_start = fd->base_offset + fd->offset;
1003
adios_build_index_v1 (fd, &p->old_pg_root, &p->old_vars_root
1006
// if collective, gather the indexes from the rest and call
1007
// adios_merge_index_v1 (&new_pg_root, &new_vars_root, pg, vars);
1008
adios_write_index_v1 (&buffer, &buffer_size, &buffer_offset
1009
,index_start, p->old_pg_root, p->old_vars_root
1012
adios_write_version_v1 (&buffer, &buffer_size, &buffer_offset);
1013
adios_posix_do_write (fd, method, buffer, buffer_offset);
1015
if (p->group_comm != MPI_COMM_SELF)
1019
int * index_sizes = malloc (4 * p->size);
1020
int * index_offsets = malloc (4 * p->size);
1021
char * recv_buffer = 0;
1023
uint32_t size = 0, total_size = 0;
1025
MPI_Gather (&size, 1, MPI_INT
1026
,index_sizes, 1, MPI_INT
1030
for (i = 0; i < p->size; i++)
1032
index_offsets [i] = total_size;
1033
total_size += index_sizes [i];
1036
recv_buffer = malloc (total_size);
1037
MPI_Gatherv (&size, 0, MPI_BYTE
1038
,recv_buffer, index_sizes, index_offsets
1039
,MPI_BYTE, 0, p->group_comm
1042
char * buffer_save = p->b.buff;
1043
uint64_t buffer_size_save = p->b.length;
1044
uint64_t offset_save = p->b.offset;
1046
for (i = 1; i < p->size; i++)
1048
p->b.buff = recv_buffer + index_offsets [i];
1049
p->b.length = index_sizes [i];
1052
adios_parse_process_group_index_v1 (&p->b
1055
adios_parse_vars_index_v1 (&p->b, &new_vars_root);
1056
adios_parse_attributes_index_v1 (&p->b
1060
adios_merge_index_v1 (&p->old_pg_root
1063
,new_pg_root, new_vars_root
1071
p->b.buff = buffer_save;
1072
p->b.length = buffer_size_save;
1073
p->b.offset = offset_save;
1077
free (index_offsets);
1079
char * global_index_buffer = 0;
1080
uint64_t global_index_buffer_size = 0;
1081
uint64_t global_index_buffer_offset = 0;
1082
uint64_t global_index_start = 0;
1085
adios_write_index_v1 (&global_index_buffer, &global_index_buffer_size
1086
,&global_index_buffer_offset, global_index_start
1087
,p->old_pg_root, p->old_vars_root, p->old_attrs_root
1090
flag |= ADIOS_VERSION_HAVE_SUBFILE;
1092
adios_write_version_flag_v1 (&global_index_buffer
1093
,&global_index_buffer_size
1094
,&global_index_buffer_offset
1097
ssize_t s = write (p->mf, global_index_buffer, global_index_buffer_offset);
1098
if (s != global_index_buffer_offset)
1100
fprintf (stderr, "POSIX method tried to write %llu, "
1111
// Added this explicit cast to avoid truncation of low-order bytes on BGP
1112
int i_buffer_size = (int) buffer_size;
1113
MPI_Gather (&i_buffer_size, 1, MPI_INT
1118
MPI_Gatherv (buffer, buffer_size, MPI_BYTE
1127
adios_clear_index_v1 (new_pg_root, new_vars_root, new_attrs_root);
1132
case adios_mode_append:
1134
if (fd->shared_buffer == adios_flag_no)
1137
// set it up so that it will start at 0, but have correct sizes
1138
new_off = lseek (p->b.f, 0, SEEK_CUR);
1139
fd->offset = fd->base_offset - p->vars_start;
1141
fd->buffer_size = 0;
1142
adios_write_close_vars_v1 (fd);
1143
// fd->vars_start gets updated with the size written
1144
fd->offset = lseek (p->b.f, p->vars_start, SEEK_SET);
1145
ssize_t s = write (p->b.f, fd->buffer, p->vars_header_size);
1146
if (s != fd->vars_start)
1148
fprintf (stderr, "POSIX method tried to write %llu, "
1155
fd->bytes_written = 0;
1156
adios_shared_buffer_free (&p->b);
1158
new_off = lseek (p->b.f, new_off, SEEK_SET); // go back to end
1159
adios_write_open_attributes_v1 (fd);
1160
p->vars_start = lseek (p->b.f, fd->offset, SEEK_CUR); // save loc
1161
p->vars_header_size = p->vars_start - fd->base_offset;
1162
p->vars_start -= fd->offset; // adjust to start of header
1163
fd->base_offset += fd->offset; // add size of header
1165
fd->bytes_written = 0;
1169
adios_write_attribute_v1 (fd, a);
1170
ssize_t s = write (p->b.f, fd->buffer, fd->bytes_written);
1171
if (s != fd->bytes_written)
1173
fprintf (stderr, "POSIX method tried to write %llu, "
1179
fd->base_offset += s;
1181
fd->bytes_written = 0;
1182
adios_shared_buffer_free (&p->b);
1187
// set it up so that it will start at 0, but have correct sizes
1188
fd->offset = fd->base_offset - p->vars_start;
1190
fd->buffer_size = 0;
1191
adios_write_close_attributes_v1 (fd);
1192
fd->offset = lseek (p->b.f, p->vars_start, SEEK_SET);
1193
// fd->vars_start gets updated with the size written
1194
s = write (p->b.f, fd->buffer, p->vars_header_size);
1195
if (s != p->vars_header_size)
1197
fprintf (stderr, "POSIX method tried to write %llu, "
1199
,p->vars_header_size
1204
fd->bytes_written = 0;
1208
uint64_t buffer_size = 0;
1209
uint64_t buffer_offset = 0;
1210
uint64_t index_start = fd->base_offset + fd->offset;
1213
adios_build_index_v1 (fd, &p->old_pg_root, &p->old_vars_root
1216
adios_write_index_v1 (&buffer, &buffer_size, &buffer_offset
1217
,index_start, p->old_pg_root, p->old_vars_root
1221
if (p->group_comm != MPI_COMM_SELF)
1225
int * index_sizes = malloc (4 * p->size);
1226
int * index_offsets = malloc (4 * p->size);
1227
char * recv_buffer = 0;
1229
uint32_t size = 0, total_size = 0;
1231
MPI_Gather (&size, 1, MPI_INT
1232
,index_sizes, 1, MPI_INT
1236
for (i = 0; i < p->size; i++)
1238
index_offsets [i] = total_size;
1239
total_size += index_sizes [i];
1242
recv_buffer = malloc (total_size);
1243
MPI_Gatherv (&size, 0, MPI_BYTE
1244
,recv_buffer, index_sizes, index_offsets
1245
,MPI_BYTE, 0, p->group_comm
1248
char * buffer_save = p->b.buff;
1249
uint64_t buffer_size_save = p->b.length;
1250
uint64_t offset_save = p->b.offset;
1252
for (i = 1; i < p->size; i++)
1254
p->b.buff = recv_buffer + index_offsets [i];
1255
p->b.length = index_sizes [i];
1258
adios_parse_process_group_index_v1 (&p->b
1261
adios_parse_vars_index_v1 (&p->b, &new_vars_root);
1262
adios_parse_attributes_index_v1 (&p->b
1266
adios_merge_index_v1 (&p->old_pg_root
1269
,new_pg_root, new_vars_root
1278
adios_sort_index_v1 (&p->old_pg_root
1283
p->b.buff = buffer_save;
1284
p->b.length = buffer_size_save;
1285
p->b.offset = offset_save;
1289
free (index_offsets);
1291
char * global_index_buffer = 0;
1292
uint64_t global_index_buffer_size = 0;
1293
uint64_t global_index_buffer_offset = 0;
1294
uint64_t global_index_start = 0;
1297
adios_write_index_v1 (&global_index_buffer, &global_index_buffer_size
1298
,&global_index_buffer_offset, global_index_start
1299
,p->old_pg_root, p->old_vars_root, p->old_attrs_root
1302
flag |= ADIOS_VERSION_HAVE_SUBFILE;
1304
adios_write_version_flag_v1 (&global_index_buffer
1305
,&global_index_buffer_size
1306
,&global_index_buffer_offset
1310
ssize_t s = write (p->mf, global_index_buffer, global_index_buffer_offset);
1311
if (s != global_index_buffer_offset)
1313
fprintf (stderr, "POSIX method tried to write %llu, "
1314
"only wrote %llu, Mode: a\n"
1315
,global_index_buffer_offset
1322
free (global_index_buffer);
1326
MPI_Gather (&buffer_size, 1, MPI_INT
1331
MPI_Gatherv (buffer, buffer_size, MPI_BYTE
1338
adios_write_version_v1 (&buffer, &buffer_size, &buffer_offset);
1339
adios_posix_do_write (fd, method, buffer, buffer_offset);
1346
case adios_mode_read:
1348
// read the index to find the place to start reading
1349
adios_posix_do_read (fd, method);
1350
struct adios_var_struct * v = fd->group->vars;
1362
fprintf (stderr, "Unknown file mode: %d\n", fd->mode);
1368
adios_posix_close_internal (&p->b);
1369
adios_clear_index_v1 (p->old_pg_root, p->old_vars_root, p->old_attrs_root);
1371
p->old_vars_root = 0;
1372
p->old_attrs_root = 0;
1375
void adios_posix_finalize (int mype, struct adios_method_struct * method)
1377
// nothing to do here
1378
if (adios_posix_initialized)
1379
adios_posix_initialized = 0;
1382
void adios_posix_end_iteration (struct adios_method_struct * method)
1386
void adios_posix_start_calculation (struct adios_method_struct * method)
1390
void adios_posix_stop_calculation (struct adios_method_struct * method)