247
int find_myost (MPI_Comm comm)
249
uint32_t * nids, * osts, myid;
250
int i, nnids = get_unique_nids (comm, nids);
251
osts = (uint32_t *) malloc (nnids * 4);
253
if (fgr_nid2ost (nids, osts, nnids, ATLAS) == true)
255
uint32_t mynid = nid_atoi();
256
for (i = 0; i < nnids; i++)
266
// something is wrong
240
284
//adios_mpi_amr_set_striping_unit(MPI_File fh, char *filename, char *parameters)
241
285
adios_mpi_amr_set_striping_unit(struct adios_MPI_data_struct * md, char *parameters)
243
MPI_File fh = md->fh;
244
287
char * filename = md->subfile_name;
247
289
uint64_t striping_unit = 0;
248
290
uint16_t striping_count = 0;
249
char value[64], *temp_string, *p_count,*p_size;
250
int fd, old_mask, perm, n_ost_skipping, n_ost, n, i;
291
char *temp_string, *p_count,*p_size;
292
int fd, old_mask, perm, n_ost_skipping, n_ost, n, i, should_striping;
293
int random_offset_flag, name_len;
253
295
temp_string = (char *) malloc (strlen (parameters) + 1);
254
296
strcpy (temp_string, parameters);
255
297
trim_spaces (temp_string);
257
if (p_count = strstr (temp_string, "stripe_count"))
299
if ( (p_count = strstr (temp_string, "striping")) )
301
char * p = strchr (p_count, '=');
302
char * q = strtok (p, ";");
304
should_striping = atoi (q + 1);
306
should_striping = atoi (p + 1);
313
if (should_striping == 0)
318
strcpy (temp_string, parameters);
319
trim_spaces (temp_string);
321
if ( (p_count = strstr (temp_string, "stripe_count")) )
259
323
char * p = strchr (p_count, '=');
260
324
char * q = strtok (p, ";");
272
336
strcpy (temp_string, parameters);
273
337
trim_spaces (temp_string);
275
if (p_size = strstr (temp_string, "stripe_size"))
339
if ( (p_count = strstr (temp_string, "random_offset")) )
341
char * p = strchr (p_count, '=');
342
char * q = strtok (p, ";");
344
random_offset_flag = atoi (q + 1);
346
random_offset_flag = atoi (p + 1);
350
// By default, set stripe count to 1 to maximize concurrency.
351
random_offset_flag = 0;
354
strcpy (temp_string, parameters);
355
trim_spaces (temp_string);
357
if ( (p_size = strstr (temp_string, "stripe_size")) )
277
359
char * p = strchr (p_size, '=');
278
360
char * q = strtok (p, ";");
336
lum.lmm_stripe_offset = i;
419
int ost_id = find_myost (md->group_comm);
422
lum.lmm_stripe_offset = ost_id;
426
lum.lmm_stripe_offset = (random_offset_flag ? -1 : i);
429
lum.lmm_stripe_offset = (random_offset_flag ? -1 : i);
337
431
ioctl (fd, LL_IOC_LOV_SETSTRIPE
352
446
adios_mpi_amr_set_have_mdf (char * parameters, struct adios_MPI_data_struct * md)
354
int err = 0, flag, i, aggr_group_size, remain, index;
355
int nproc = md->size, rank = md->rank;
356
char value[64], *temp_string, *p_count,*p_size;
448
char *temp_string, *p_size;
358
450
temp_string = (char *) malloc (strlen (parameters) + 1);
359
451
strcpy (temp_string, parameters);
360
452
trim_spaces (temp_string);
362
if (p_size = strstr (temp_string, "have_metadata_file"))
454
if ( (p_size = strstr (temp_string, "have_metadata_file")) )
364
456
char * p = strchr (p_size, '=');
365
457
char * q = strtok (p, ";");
382
474
adios_mpi_amr_set_aggregation_parameters(char * parameters, struct adios_MPI_data_struct * md)
384
int err = 0, flag, i, aggr_group_size, remain, index;
476
int i, aggr_group_size, remain, index;
385
477
int nproc = md->size, rank = md->rank;
386
char value[64], *temp_string, *p_count,*p_size;
478
char *temp_string, *p_size;
388
480
temp_string = (char *) malloc (strlen (parameters) + 1);
391
483
strcpy (temp_string, parameters);
392
484
trim_spaces (temp_string);
394
if (p_size = strstr (temp_string, "num_ost"))
486
if ( (p_size = strstr (temp_string, "num_ost")) )
396
488
char * p = strchr (p_size, '=');
397
489
char * q = strtok (p, ";");
407
499
strcpy (temp_string, parameters);
408
500
trim_spaces (temp_string);
502
if ( (p_size = strstr (temp_string, "local-fs")) )
504
char * p = strchr (p_size, '=');
505
char * q = strtok (p, ";");
507
md->is_local_fs = atoi(q + 1);
509
md->is_local_fs = atoi(p + 1);
516
strcpy (temp_string, parameters);
517
trim_spaces (temp_string);
410
519
// set up # of aggregators
411
if (p_size = strstr (temp_string, "num_aggregators"))
520
if ( (p_size = strstr (temp_string, "num_aggregators")) )
413
522
char * p = strchr (p_size, '=');
414
523
char * q = strtok (p, ";");
432
strcpy (temp_string, parameters);
433
trim_spaces (temp_string);
435
if (p_size = strstr (temp_string, "have_metadata_file"))
541
// Get 'color' parameter. If 'color' is set,
542
// the num_aggregators will be disregarded.
543
// The actual # of aggregators will be caculated
544
// according to color.
545
strcpy (temp_string, parameters);
546
trim_spaces (temp_string);
548
if ( (p_size = strstr (temp_string, "color")) )
550
char * p = strchr (p_size, '=');
551
char * q = strtok (p, ";");
553
md->is_color_set = 1;
555
md->g_color1 = atoi (q + 1);
557
md->g_color1 = atoi (p + 1);
561
// by default, use BG
562
md->g_io_type = ADIOS_MPI_AMR_IO_BG;
565
strcpy (temp_string, parameters);
566
trim_spaces (temp_string);
568
if ( (p_size = strstr (temp_string, "have_metadata_file")) )
437
570
char * p = strchr (p_size, '=');
438
571
char * q = strtok (p, ";");
452
585
strcpy (temp_string, parameters);
453
586
trim_spaces (temp_string);
455
if (p_size = strstr (temp_string, "threading"))
588
if ( (p_size = strstr (temp_string, "threading")) )
457
590
char * p = strchr (p_size, '=');
458
591
char * q = strtok (p, ";");
474
607
md->g_ost_skipping_list = allocOSTList (md->g_num_ost);
476
if (p_size = strstr (temp_string, "osts_to_skip"))
609
if ( (p_size = strstr (temp_string, "osts_to_skip")) )
478
611
char * p = strchr (p_size, '=');
479
612
char * q = strtok (p, ";");
488
621
strcpy (temp_string, parameters);
489
622
trim_spaces (temp_string);
491
if (p_size = strstr (temp_string, "aggregation_type"))
624
if ( (p_size = strstr (temp_string, "aggregation_type")) )
493
626
char * p = strchr (p_size, '=');
494
627
char * q = strtok (p, ";");
520
653
memset (md->g_is_aggregator, 0, nproc * sizeof(int));
522
aggr_group_size = nproc / md->g_num_aggregators;
523
remain = nproc - (int) aggr_group_size * md->g_num_aggregators;
526
for (i = 0; i < md->g_num_aggregators; i++)
528
md->g_is_aggregator[index] = 1;
532
index += aggr_group_size + 1;
536
index += aggr_group_size;
542
md->g_color1 = rank / aggr_group_size;
543
md->g_color2 = rank % aggr_group_size;
547
if (rank < (aggr_group_size + 1) * remain)
549
md->g_color1 = rank / (aggr_group_size + 1);
550
md->g_color2 = rank % (aggr_group_size + 1);
554
md->g_color1 = remain + (rank - (aggr_group_size + 1) * remain) / aggr_group_size;
555
md->g_color2 = (rank - (aggr_group_size + 1) * remain)% aggr_group_size;
655
if (!md->is_color_set)
657
aggr_group_size = nproc / md->g_num_aggregators;
658
remain = nproc - (int) aggr_group_size * md->g_num_aggregators;
661
for (i = 0; i < md->g_num_aggregators; i++)
663
md->g_is_aggregator[index] = 1;
667
index += aggr_group_size + 1;
671
index += aggr_group_size;
677
md->g_color1 = rank / aggr_group_size;
678
md->g_color2 = rank % aggr_group_size;
682
if (rank < (aggr_group_size + 1) * remain)
684
md->g_color1 = rank / (aggr_group_size + 1);
685
md->g_color2 = rank % (aggr_group_size + 1);
689
md->g_color1 = remain + (rank - (aggr_group_size + 1) * remain) / aggr_group_size;
690
md->g_color2 = (rank - (aggr_group_size + 1) * remain)% aggr_group_size;
694
else // if color is set
698
MPI_Comm_split (md->group_comm, md->g_color1, md->rank, &new_comm);
699
MPI_Comm_rank (new_comm, &md->g_color2);
609
752
while (total_written < len)
611
write_len = (to_write > INT32_MAX) ? INT32_MAX : to_write;
754
write_len = (to_write > MAX_MPIWRITE_SIZE) ? MAX_MPIWRITE_SIZE : to_write;
612
755
MPI_File_write (fh, buf_ptr, write_len, MPI_BYTE, &status);
613
756
MPI_Get_count(&status, MPI_BYTE, &count);
614
757
if (count != write_len)
757
void * adios_mpi_amr_do_mkdir (void * param)
900
void * adios_mpi_amr_do_mkdir (char * path)
759
struct adios_file_struct * fd = (struct adios_file_struct *) param;
760
902
// 4 bytes for ".dir"
761
char * dir_name = malloc (strlen (fd->name) + 4 + 1);
762
sprintf (dir_name, "%s%s", fd->name, ".dir");
903
char * dir_name = malloc (strlen (path) + 4 + 1);
904
sprintf (dir_name, "%s%s", path, ".dir");
764
906
mkdir (dir_name, S_IRWXU | S_IRWXG);
782
MPI_File_open (MPI_COMM_SELF, td->md->subfile_name
783
,MPI_MODE_WRONLY | MPI_MODE_CREATE
925
err = MPI_File_open (MPI_COMM_SELF, td->md->subfile_name
926
,MPI_MODE_WRONLY | MPI_MODE_CREATE
931
if (err != MPI_SUCCESS)
933
char e [MPI_MAX_ERROR_STRING];
935
memset (e, 0, MPI_MAX_ERROR_STRING);
936
MPI_Error_string (err, e, &len);
937
adios_error (err_file_open_error,
938
"MPI_AMR method: MPI open failed for %s: '%s'\n",
939
td->md->subfile_name, e);
930
1094
pg_root = pg_root->next;
934
1099
enum ADIOS_FLAG adios_mpi_amr_should_buffer (struct adios_file_struct * fd
935
1100
,struct adios_method_struct * method
939
1103
struct adios_MPI_data_struct * md = (struct adios_MPI_data_struct *)
940
1104
method->method_data;
941
1105
char * name, * name_no_path, * ch;
944
int sig; // used for coordinating the MPI_File_open
1107
//int sig; // used for coordinating the MPI_File_open
951
1109
START_TIMER (ADIOS_TIMER_MPI_AMR_AD_SHOULD_BUFFER);
953
1111
name = malloc (strlen (method->base_path) + strlen (fd->name) + 1);
954
1112
sprintf (name, "%s%s", method->base_path, fd->name);
956
if (md->rank == md->size - 1)
960
previous = md->rank - 1;
963
1114
fd->base_offset = 0;
965
1116
switch (fd->mode)
975
1126
if (md->rank == 0)
977
1128
struct lov_user_md lum;
978
struct obd_uuid uuids[1024], * uuidp;
981
1131
// open metadata file
984
1134
adios_mpi_amr_set_have_mdf (method->parameters, md);
985
1135
if (md->g_have_mdf)
987
f = open(fd->name, O_CREAT | O_RDWR | O_LOV_DELAY_CREATE, 0644);
1137
f = open(name, O_CREAT | O_RDWR | O_LOV_DELAY_CREATE, 0644);
990
adios_error (err_file_open_error,"MPI_AMR method: open() failed: %s\n", strerror(errno));
1140
// adios_error (err_file_open_error,"MPI_AMR method: open() failed: %s\n", strerror(errno));
1141
adios_error (err_file_open_error,"MPI_AMR method: open() failed: %s\n", name);
1014
MPI_File_open (MPI_COMM_SELF, fd->name
1166
MPI_File_open (MPI_COMM_SELF, name
1015
1167
,MPI_MODE_WRONLY | MPI_MODE_CREATE
1021
adios_mpi_amr_do_mkdir (fd);
1173
// adios_mpi_amr_do_mkdir (name);
1024
1176
MPI_Bcast (&md->g_num_ost, 1, MPI_INT, 0, md->group_comm);
1027
1179
fd->pg_start_in_file = 0;
1028
1180
adios_mpi_amr_set_aggregation_parameters (method->parameters, md);
1182
if (is_aggregator (md->rank))
1184
if (md->is_local_fs)
1186
adios_mpi_amr_do_mkdir (name);
1192
adios_mpi_amr_do_mkdir (name);
1030
1197
// Check if fd->name contains path
1031
if (ch = strrchr (fd->name, '/'))
1198
if ( (ch = strrchr (fd->name, '/')) )
1033
1200
name_no_path = malloc (strlen (ch + 1) + 1);
1034
1201
strcpy (name_no_path, ch + 1);
1039
1206
strcpy (name_no_path, fd->name);
1042
name = realloc (name, strlen (fd->name) + 5 + strlen (method->base_path) + strlen (name_no_path) + 1 + 10 + 1);
1209
name = realloc (name, strlen (method->base_path) + strlen (fd->name) + 5 + strlen (name_no_path) + 1 + 10 + 1);
1043
1210
// create the subfile name, e.g. restart.bp.1
1044
1211
// 1 for '.' + 10 for subfile index + 1 for '\0'
1045
sprintf (name, "%s%s%s%s.%d", fd->name, ".dir/", method->base_path, name_no_path, md->g_color1);
1212
sprintf (name, "%s%s%s%s.%d", method->base_path, fd->name, ".dir/", name_no_path, md->g_color1);
1046
1213
md->subfile_name = strdup (name);
1047
1214
fd->subfile_index = (uint32_t)md->g_color1;
1754
1936
uint64_t buffer_size = 0;
1755
1937
uint64_t buffer_offset = 0;
1756
1938
uint64_t index_start = md->b.pg_index_offset, index_start1;
1757
int * pg_sizes = 0, * disp = 0, * sendbuf = 0, * recvbuf = 0, * attr_sizes = 0;
1939
int * pg_sizes = 0, * disp = 0;
1758
1940
void * aggr_buff = 0, * recv_buff = 0;
1759
1941
struct adios_MPI_thread_data_write write_thread_data;
1760
1942
int i, new_rank, new_group_size, new_rank2, new_group_size2, max_data_size = 0, total_data_size = 0, total_data_size1 = 0;
1935
2117
if (fd->shared_buffer == adios_flag_yes && !md->g_merging_pgs)
1937
2119
//printf ("do not merge pg\n");
1938
struct adios_bp_buffer_struct_v1 b;
1939
struct adios_process_group_header_struct_v1 pg_header;
1940
struct adios_vars_header_struct_v1 vars_header;
1941
int pg_size, header_size;
1942
uint64_t vars_count_offset;
1943
2121
MPI_Request request;
1944
2122
MPI_Status status;
1974
2152
if (2 * max_data_size > MAX_AGG_BUF)
1976
2154
log_warn ("MPI_AMR method (BG): The max allowed aggregation "
1977
"buffer is %llu bytes.\n"
1978
"But this ADIOS method needs %llu bytes for aggregation\n",
2155
"buffer is %d bytes.\n"
2156
"But this ADIOS method needs %d bytes for aggregation\n",
1979
2157
MAX_AGG_BUF, 2 * max_data_size);
2095
2273
if (!is_aggregator(md->rank))
2097
2275
uint64_t var_offset_to_add = 0, attr_offset_to_add = 0;
2098
uint64_t var_base_offset = 0, attr_base_offset = 0;
2100
// change to relative offset
2101
if (md->index->vars_root)
2103
var_base_offset = md->index->vars_root->characteristics [0].offset;
2106
if (md->index->attrs_root)
2108
attr_base_offset = md->index->attrs_root->characteristics [0].offset;
2111
2277
for (i = 0; i < new_rank; i++)
2509
2679
if (count != md->vars_header_size)
2511
log_warn ("d:MPI_AMR method tried to write %llu, only wrote %d\n",
2681
log_warn ("d:MPI_AMR method tried to write %llu, only wrote %llu\n",
2512
2682
md->vars_header_size, count);
2654
2824
if (fd->shared_buffer == adios_flag_yes && !md->g_merging_pgs)
2656
2826
//printf ("do not merge pg\n");
2657
struct adios_bp_buffer_struct_v1 b;
2658
struct adios_process_group_header_struct_v1 pg_header;
2659
struct adios_vars_header_struct_v1 vars_header;
2660
int pg_size, header_size;
2661
uint64_t vars_count_offset;
2663
2829
pg_size = fd->bytes_written;
2664
2830
pg_sizes = (int *) malloc (new_group_size * 4);
2965
3131
if (!is_aggregator(md->rank))
2967
3133
uint64_t var_offset_to_add = 0, attr_offset_to_add = 0;
2968
uint64_t var_base_offset = 0, attr_base_offset = 0;
2970
// change to relative offset
2971
if (md->index->vars_root)
2973
var_base_offset = md->index->vars_root->characteristics [0].offset;
2976
if (md->index->attrs_root)
2978
attr_base_offset = md->index->attrs_root->characteristics [0].offset;
2981
adios_mpi_amr_subtract_offset (var_base_offset
2987
3135
for (i = 0; i < new_rank; i++)