44
47
int rank; // dataspaces rank or MPI rank if MPI is available
45
48
int peers; // from xml parameter or group communicator
46
49
int appid; // from xml parameter or 1
47
int time_index; // versioning in DataSpaces, start from 0
48
50
int n_writes; // how many times adios_write has been called
52
MPI_Comm mpi_comm; // for use in open..close
53
MPI_Comm mpi_comm_init; // for use in init/finalize
52
55
int num_of_files; // how many files do we have with this method
53
56
char *fnames[MAX_NUM_OF_FILES]; // names of files (needed at finalize)
54
57
int fversions[MAX_NUM_OF_FILES]; // last steps of files (needed at finalize)
58
int mpi_ranks[MAX_NUM_OF_FILES]; // mpi rank of current process for each written file (needed at finalize)
55
59
struct adios_dimes_file_info file_info[MAX_NUM_OF_FILES]; // keep track of time index for each opened file
58
static int init_dimes_file_info(struct adios_dimes_data_struct *p)
61
for (i = 0; i < MAX_NUM_OF_FILES; i++) {
62
p->file_info[i].name = NULL;
63
p->file_info[i].time_index = 0;
67
static void free_dimes_file_info(struct adios_dimes_data_struct *p)
70
for (i = 0; i < MAX_NUM_OF_FILES; i++) {
71
if (p->file_info[i].name) {
72
free(p->file_info[i].name);
63
static int check_read_status_var(const char* fname, int last_version)
65
int stay_in_poll_loop = 1;
66
double t1 = adios_gettime();
68
uint64_t lb[MAX_DS_NDIM], ub[MAX_DS_NDIM], gdims[MAX_DS_NDIM];
70
int read_status_buf[1] = {-1};
71
int read_status_buf_len = 1;
73
while (stay_in_poll_loop) {
74
snprintf(ds_var_name, MAX_DS_NAMELEN, "READ_STATUS@%s", fname);
75
elemsize = sizeof(int); ndim = 1;
76
lb[0] = 0; ub[0] = read_status_buf_len-1;
77
gdims[0] = (ub[0]-lb[0]+1) * dspaces_get_num_space_server();
78
dspaces_define_gdim(ds_var_name, ndim, gdims);
79
int err = dspaces_get(ds_var_name, 0, elemsize, ndim, lb, ub, read_status_buf);
81
int version = read_status_buf[0];
82
log_debug("%s: ds_var_name %s read_status_buf = {%d}\n",
83
__func__, ds_var_name, version);
84
if (version == last_version) {
85
stay_in_poll_loop = 0;
88
log_error("%s: failed to read ds_var_name %s from space\n",
89
__func__, ds_var_name);
92
// check if we need to stay in loop
93
if (stay_in_poll_loop) {
94
double elapsed_time = adios_gettime() - t1;
95
if (check_read_status_timeout_sec >= 0.0 &&
96
elapsed_time > check_read_status_timeout_sec) {
97
stay_in_poll_loop = 0;
99
adios_nanosleep(check_read_status_poll_interval_ms/1000,
100
(int)(((uint64_t)check_read_status_poll_interval_ms * 1000000L)%1000000000L));
108
static int init_dimes_file_info(struct adios_dimes_data_struct *md)
111
for (i = 0; i < MAX_NUM_OF_FILES; i++) {
112
md->file_info[i].name = NULL;
113
md->file_info[i].time_index = 0;
117
static void free_dimes_file_info(struct adios_dimes_data_struct *md)
120
for (i = 0; i < MAX_NUM_OF_FILES; i++) {
121
if (md->file_info[i].name) {
122
free(md->file_info[i].name);
79
static struct adios_dimes_file_info* lookup_dimes_file_info(struct adios_dimes_data_struct *p, const char* fname)
129
static struct adios_dimes_file_info* lookup_dimes_file_info(struct adios_dimes_data_struct *md, const char* fname)
82
132
for (i = 0; i < MAX_NUM_OF_FILES; i++) {
83
if (p->file_info[i].name != NULL &&
84
strcmp(p->file_info[i].name, fname) == 0) {
85
return &p->file_info[i];
133
if (md->file_info[i].name != NULL &&
134
strcmp(md->file_info[i].name, fname) == 0) {
135
return &md->file_info[i];
89
139
for (i = 0; i < MAX_NUM_OF_FILES; i++) {
90
if (p->file_info[i].name == NULL) {
91
p->file_info[i].name = malloc(strlen(fname)+1);
92
strcpy(p->file_info[i].name, fname);
93
return &p->file_info[i];
140
if (md->file_info[i].name == NULL) {
141
md->file_info[i].name = malloc(strlen(fname)+1);
142
strcpy(md->file_info[i].name, fname);
143
return &md->file_info[i];
100
static int connect_to_dimes (struct adios_dimes_data_struct * p, MPI_Comm comm)
150
static int connect_to_dimes (struct adios_dimes_data_struct *md, MPI_Comm comm)
105
155
if (!globals_adios_is_dimes_connected()) {
107
MPI_Comm_rank (comm, &(p->rank));
157
MPI_Comm_rank (comm, &(md->rank));
108
158
MPI_Comm_size (comm, &num_peers);
110
160
// Application ID should be set by the application calling adios_set_application_id()
112
p->appid = globals_adios_get_application_id (&was_set);
162
md->appid = globals_adios_get_application_id (&was_set);
116
166
log_debug ("adios_dimes: rank=%d connect to DATASPACES, peers=%d, appid=%d \n",
117
p->rank, num_peers, p->appid);
167
md->rank, num_peers, md->appid);
119
169
//Init the dart client
120
ret = dspaces_init (num_peers, p->appid);
170
ret = dspaces_init (num_peers, md->appid, &md->mpi_comm_init, NULL);
122
log_error ("adios_dimes: rank=%d Failed to connect to DATASPACES: err=%d, rank=%d\n", p->rank, ret);
172
log_error ("adios_dimes: rank=%d Failed to connect to DATASPACES: err=%d, rank=%d\n", md->rank, ret);
127
dspaces_rank (&(p->rank));
128
dspaces_peers (&(p->peers));
177
dspaces_rank (&(md->rank));
178
dspaces_peers (&(md->peers));
131
log_debug ("adios_dimes: rank=%d connected to DATASPACES: peers=%d\n", p->rank, p->peers);
181
log_debug ("adios_dimes: rank=%d connected to DATASPACES: peers=%d\n", md->rank, md->peers);
134
184
globals_adios_set_dimes_connected_from_writer();
140
190
struct adios_method_struct * method
143
struct adios_dimes_data_struct *p = 0;
193
struct adios_dimes_data_struct *md = 0;
144
194
if (!adios_dimes_initialized)
146
196
adios_dimes_initialized = 1;
149
199
method->method_data = calloc (1, sizeof (struct adios_dimes_data_struct));
150
p = (struct adios_dimes_data_struct*)method->method_data;
200
md = (struct adios_dimes_data_struct*)method->method_data;
155
206
//Init the static data structure
161
p->mpi_comm = MPI_COMM_NULL;
211
md->mpi_comm = MPI_COMM_NULL;
212
md->mpi_comm_init = method->init_comm;
214
md->num_of_files = 0;
165
init_dimes_file_info(p);
166
connect_to_dimes (p, method->init_comm);
216
// process user parameters
217
const PairStruct *p = parameters;
219
if (!strcasecmp (p->name, "app_id")) {
221
md->appid = strtol(p->value, NULL, 10);
222
if (md->appid > 0 && !errno) {
223
log_debug ("App ID parameter set to %d for DIMES write method\n",
225
globals_adios_set_application_id (md->appid);
227
log_error ("Invalid 'app_id' parameter given to the DIMES write "
228
"method: '%s'\n", p->value);
230
} else if (!strcasecmp(p->name, "check_read_status")) {
232
check_read = strtol(p->value, NULL, 10);
233
if (!errno && (check_read == 0 || check_read == 2)) {
234
check_read_status = check_read;
235
log_debug("check_read_status set to %d for DIMES write method\n",
238
log_error("Invalid 'check_read_status' parameter given to the DIMES "
239
"write method: '%s'\n", p->value);
240
log_error("check_read_status=<value>, 0: disable, 1: at every step "
241
" (not supported yet), 2: at finalize (default value).\n");
243
} else if (!strcasecmp(p->name, "check_read_status_timeout_sec")) {
245
double timeout = strtof(p->value, NULL);
246
if (timeout > 0.0 && !errno) {
247
log_debug("check_read_status_timeout_sec set to %f seconds for DIMES write method\n", timeout);
248
check_read_status_timeout_sec = timeout;
250
log_error("Invalid 'check_read_status_timeout_sec' parameter given to the DIMES "
251
"write method: '%s'\n", p->value);
253
} else if (!strcasecmp(p->name, "check_read_status_poll_interval")) {
255
int pollinterval = strtol(p->value, NULL, 10);
256
if (pollinterval > 0 && !errno) {
257
log_debug("check_read_status_poll_interval set to %d milliseconds for DIMES write method\n", pollinterval);
258
check_read_status_poll_interval_ms = pollinterval;
260
log_error("Invalid 'check_read_status_poll_interval' parameter given to DIMES "
261
"write method: '%s'\n", p->value);
264
log_error ("Parameter name %s is not recognized by the DIMES write "
265
"method\n", p->name);
269
init_dimes_file_info(md);
270
connect_to_dimes (md, method->init_comm);
168
272
log_info ("adios_dimes_init: done\n");
180
struct adios_dimes_data_struct *p = (struct adios_dimes_data_struct *)
284
struct adios_dimes_data_struct *md = (struct adios_dimes_data_struct *)
181
285
method->method_data;
182
struct adios_dimes_file_info *info = lookup_dimes_file_info(p, fd->name);
286
struct adios_dimes_file_info *info = lookup_dimes_file_info(md, fd->name);
183
287
log_info ("adios_dimes_open: open %s, mode=%d, time_index=%d \n",
184
288
fd->name, fd->mode, info->time_index);
187
291
// if we have MPI and a communicator, we can get the exact size of this application
188
292
// that we need to tell DATASPACES
190
MPI_Comm_rank (p->mpi_comm, &(p->rank));
191
MPI_Comm_size (p->mpi_comm, &(p->peers));
294
MPI_Comm_rank (md->mpi_comm, &(md->rank));
295
MPI_Comm_size (md->mpi_comm, &(md->peers));
194
298
if (fd->mode == adios_mode_write || fd->mode == adios_mode_append)
196
log_debug ("adios_dimes_open: rank=%d call write lock...\n", p->rank);
197
dspaces_lock_on_write (fd->name, &p->mpi_comm);
198
log_debug ("adios_dimes_open: rank=%d got write lock\n", p->rank);
300
log_debug ("adios_dimes_open: rank=%d call write lock...\n", md->rank);
301
dspaces_lock_on_write (fd->name, &md->mpi_comm);
302
log_debug ("adios_dimes_open: rank=%d got write lock\n", md->rank);
199
303
// Free data objects written in the previous steps
200
304
dimes_put_sync_group(fd->name, info->time_index);
201
305
dimes_put_set_group(fd->name, info->time_index);
203
307
else if (fd->mode == adios_mode_read)
205
dspaces_lock_on_read (fd->name, &p->mpi_comm);
309
dspaces_lock_on_read (fd->name, &md->mpi_comm);
246
350
char * var_name = v->name;
353
char lb_str[256], ub_str[256], gdims_str[256], dims_str[256], didx_str[256];
249
354
//Get two offset coordinate values
250
355
unsigned int version;
252
int dims[3]={1,1,1}, gdims[3]={0,0,0}, lb[3]={0,0,0}, ub[3]={0,0,0}; /* lower and upper bounds for DataSpaces */
253
int didx[3]; // for reordering the dimensions
356
uint64_t dims[MAX_DS_NDIM], gdims[MAX_DS_NDIM], lb[MAX_DS_NDIM], ub[MAX_DS_NDIM]; /* lower and upper bounds for DataSpaces */
357
int didx[MAX_DS_NDIM]; // for reordering the dimensions
256
361
struct adios_dimension_struct* var_dimensions = v->dimensions;
257
362
// Calculate lower and upper bounds for each available dimension (up to 3 dims)
258
while( var_dimensions && ndims < 3)
363
while( var_dimensions && ndims < MAX_DS_NDIM)
260
365
dims[ndims] = adios_get_dim_value (&(var_dimensions->dimension));
261
366
gdims[ndims] = adios_get_dim_value (&(var_dimensions->global_dimension));
262
367
lb[ndims] = adios_get_dim_value (&(var_dimensions->local_offset));
263
if (dims[ndims] > 0) {
368
if (gdims[ndims] > 0 && dims[ndims] > 0) {
264
369
ub[ndims] = lb[ndims] + dims[ndims] - 1;
303
408
v->write_offset = 1; // only !=0 offsets will be included in build index
409
/* This is not needed here, this is already called in common_adios_write()
304
410
adios_generate_var_characteristics_v1 (fd, v); // characteristics will be included in build index
305
411
adios_write_var_characteristics_v1 (fd, v);
308
log_debug ("var_name=%s, type=%s(%d) elemsize=%d, version=%d, ndims=%d, size=(%d,%d,%d), gdim=(%d,%d,%d), lb=(%d,%d,%d), ub=(%d,%d,%d)\n",
414
dimes_int64s_to_str(ndims, lb, lb_str);
415
dimes_int64s_to_str(ndims, ub, ub_str);
416
dimes_int64s_to_str(ndims, dims, dims_str);
417
dimes_int64s_to_str(ndims, gdims, gdims_str);
418
log_debug ("var_name=%s, type=%s(%d) elemsize=%d, version=%d, ndims=%d, size=(%s), gdim=(%s), lb=(%s), ub=(%s)\n",
309
419
ds_var_name, adios_type_to_string_int(v->type), v->type, var_type_size, version, ndims,
310
dims[0], dims[1], dims[2], gdims[0], gdims[1], gdims[2], lb[0], lb[1], lb[2], ub[0], ub[1], ub[2]);
420
dims_str, gdims_str, lb_str, ub_str);
312
422
/* non-timed scalars are written in the metadata at close(), not here */
313
423
if (ndims == 0 && !hastime)
320
430
group->adios_host_language_fortran == adios_flag_yes,
321
431
0 /*pack*/, didx);
323
dimes_put(ds_var_name, version, var_type_size,
324
lb[didx[0]], lb[didx[1]], lb[didx[2]],
325
ub[didx[0]], ub[didx[1]], ub[didx[2]],
328
log_debug ("var_name=%s, dimension ordering=(%d,%d,%d), gdims=(%d,%d,%d), lb=(%d,%d,%d), ub=(%d,%d,%d)\n",
330
didx[0], didx[1], didx[2],
331
gdims[didx[0]], gdims[didx[1]], gdims[didx[2]],
332
lb[didx[0]], lb[didx[1]], lb[didx[2]],
333
ub[didx[0]], ub[didx[1]], ub[didx[2]]);
433
uint64_t lb_in[MAX_DS_NDIM], ub_in[MAX_DS_NDIM], gdims_in[MAX_DS_NDIM];
435
for (i = 0; i < ndims; i++) {
436
lb_in[i] = lb[didx[i]];
437
ub_in[i] = ub[didx[i]];
438
gdims_in[i] = gdims[didx[i]];
440
dimes_define_gdim(ds_var_name, ndims, gdims_in);
441
dimes_put(ds_var_name, version, var_type_size, ndims, lb_in, ub_in, data);
443
dimes_ints_to_str(ndims, didx, didx_str);
444
dimes_int64s_to_str(ndims, gdims_in, gdims_str);
445
dimes_int64s_to_str(ndims, lb_in, lb_str);
446
dimes_int64s_to_str(ndims, ub_in, ub_str);
447
log_debug ("var_name=%s, dimension ordering=(%s), gdims=(%s), lb=(%s), ub=(%s)\n",
448
ds_var_name, didx_str, gdims_str, lb_str, ub_str);
336
451
void adios_dimes_get_write_buffer (struct adios_file_struct * fd
837
938
/* Put GROUP@fn/gn header into space */
838
939
snprintf(ds_var_name, MAX_DS_NAMELEN, "GROUP@%s/%s", fd->name, fd->group->name);
839
log_debug ("%s: put %s with buf len %d into space\n", __func__, ds_var_name, indexlen);
840
ub[0] = indexlen-1; ub[1] = 0; ub[2] = 0;
841
dimes_dimension_ordering(1, 0, 0, didx); // C ordering of 1D array into DS
842
dspaces_put(ds_var_name, version, 1, 0, 0, 0, /* lb 0..2 */
843
ub[didx[0]], ub[didx[1]], ub[didx[2]], indexbuf);
940
log_debug ("%s: put %s buflen=%d (bytes) into space\n", __func__, ds_var_name, indexlen);
941
elemsize = 1; ndim = 1;
942
lb[0] = 0; ub[0] = indexlen-1;
943
gdims[0] = (ub[0]-lb[0]+1) * dspaces_get_num_space_server();
944
dspaces_define_gdim(ds_var_name, ndim, gdims);
945
dspaces_put(ds_var_name, version, elemsize, ndim, lb, ub, indexbuf);
846
948
/* Create and put FILE@fn header into space */
849
951
snprintf (ds_var_name, MAX_DS_NAMELEN, "FILE@%s", fd->name);
850
952
dimes_pack_file_info (info->time_index, nvars, nattrs, indexlen,
851
953
fd->group->name, &file_info_buf, &file_info_buf_len);
852
log_debug ("%s: put %s = buflen=%d time=%d nvars=%d nattr=%d index=%d name=%d:%s into space\n",
954
log_debug ("%s: put %s buflen=%d (bytes) time=%d nvars=%d nattr=%d index=%d name=%d:%s into space\n",
853
955
__func__, ds_var_name,
854
956
*(int*)file_info_buf, *(int*)(file_info_buf+4),
855
957
*(int*)(file_info_buf+8), *(int*)(file_info_buf+12),
856
958
*(int*)(file_info_buf+16), *(int*)(file_info_buf+20),
857
959
file_info_buf+24);
858
/* Flip 1st and 2nd dimension for DataSpaces representation for a 1D array*/
859
ub[0] = file_info_buf_len-1; ub[1] = 0; ub[2] = 0;
860
dimes_dimension_ordering(1, 0, 0, didx); // C ordering of 1D array into DS
861
960
dspaces_put_sync(); //wait on previous put to finish
862
dspaces_put(ds_var_name, version, 1, 0, 0, 0, /* lb 0..2 */
863
ub[didx[0]], ub[didx[1]], ub[didx[2]], file_info_buf);
961
elemsize = 1; ndim = 1;
962
lb[0] = 0; ub[0] = file_info_buf_len-1;
963
gdims[0] = (ub[0]-lb[0]+1) * dspaces_get_num_space_server();
964
dspaces_define_gdim(ds_var_name, ndim, gdims);
965
dspaces_put(ds_var_name, version, elemsize, ndim, lb, ub, file_info_buf);
865
967
/* Create and put VERSION@fn version info into space */
866
968
int version_buf[2] = {version, 0}; /* last version put in space; not terminated */
867
969
int version_buf_len = 2;
868
970
snprintf (ds_var_name, MAX_DS_NAMELEN, "VERSION@%s", fd->name);
869
log_debug ("%s: put %s with buf = [%d,%d] (len=%d integers) into space\n",
971
log_debug ("%s: put %s buf= [%d,%d] buflen=%d (integers) into space\n",
870
972
__func__, ds_var_name, version_buf[0], version_buf[1], version_buf_len);
871
ub[0] = version_buf_len-1; ub[1] = 0; ub[2] = 0;
872
dimes_dimension_ordering(1, 0, 0, didx); // C ordering of 1D array into DS
873
dspaces_put_sync(); //wait on previous put to finish
874
dspaces_put(ds_var_name, 0, sizeof(int), 0, 0, 0, /* lb 0..2 */
875
ub[didx[0]], ub[didx[1]], ub[didx[2]], version_buf);
876
dspaces_put_sync(); //wait on previous put to finish
973
dspaces_put_sync(); //wait on previous put to finish
974
elemsize = sizeof(int); ndim = 1;
975
lb[0] = 0; ub[0] = version_buf_len-1;
976
gdims[0] = (ub[0]-lb[0]+1) * dspaces_get_num_space_server();
977
dspaces_define_gdim(ds_var_name, ndim, gdims);
978
dspaces_put(ds_var_name, 0, elemsize, ndim, lb, ub, version_buf);
979
dspaces_put_sync(); //wait on previous put to finish
880
982
// remember this filename and its version for finalize
882
for (i=0; i<p->num_of_files; i++) {
883
if (!strcmp(fd->name, p->fnames[i]))
984
for (i=0; i<md->num_of_files; i++) {
985
if (!strcmp(fd->name, md->fnames[i]))
886
if (i == p->num_of_files) {
887
if (p->num_of_files < MAX_NUM_OF_FILES) {
888
p->fnames[ p->num_of_files ] = strdup(fd->name);
988
if (i == md->num_of_files) {
989
if (md->num_of_files < MAX_NUM_OF_FILES) {
990
md->fnames[ md->num_of_files ] = strdup(fd->name);
891
993
log_error ("%s: Max %d files can be written by one application "
892
994
"using the DATASPACES method\n",
893
995
__func__, MAX_NUM_OF_FILES);
896
if (i < p->num_of_files) {
897
p->fversions[i] = version;
998
if (i < md->num_of_files) {
999
md->fversions[i] = version;
1000
md->mpi_ranks[i] = md->rank;
925
1028
void adios_dimes_finalize (int mype, struct adios_method_struct * method)
927
struct adios_dimes_data_struct *p = (struct adios_dimes_data_struct *)
1030
struct adios_dimes_data_struct *md = (struct adios_dimes_data_struct *)
928
1031
method->method_data;
930
1033
char ds_var_name[MAX_DS_NAMELEN];
932
int ub[3] = {1,0,0}; // we put 2 integers to space,
933
int didx[3]; // for reordering DS dimensions
1034
uint64_t gdims[MAX_DS_NDIM], lb[MAX_DS_NDIM], ub[MAX_DS_NDIM];
934
1036
int value[2] = {0, 1}; // integer to be written to space (terminated=1)
936
free_dimes_file_info(p);
1038
free_dimes_file_info(md);
938
1040
// tell the readers which files are finalized
939
dimes_dimension_ordering(1, 0, 0, didx); // C ordering of 1D array into DS
940
for (i=0; i<p->num_of_files; i++) {
1041
for (i=0; i<md->num_of_files; i++) {
941
1042
/* Put VERSION@fn into space. Indicates that this file will not be extended anymore. */
942
log_debug("%s: call dspaces_lock_on_write(%s), rank=%d\n", __func__, p->fnames[i], mype);
943
dspaces_lock_on_write(p->fnames[i], &p->mpi_comm); // lock is global operation in DataSpaces
945
value[0] = p->fversions[i];
946
snprintf(ds_var_name, MAX_DS_NAMELEN, "VERSION@%s", p->fnames[i]);
1043
if (md->mpi_ranks[i] == 0) {
1044
if (check_read_status == 2) {
1045
check_read_status_var(md->fnames[i], md->fversions[i]);
1047
MPI_Comm mpi_comm = MPI_COMM_SELF;
1048
log_debug("%s: call dspaces_lock_on_write(%s), rank=%d\n", __func__, md->fnames[i], mype);
1049
dspaces_lock_on_write(md->fnames[i], &mpi_comm); // lock is global operation in DataSpaces
1051
value[0] = md->fversions[i];
1052
snprintf(ds_var_name, MAX_DS_NAMELEN, "VERSION@%s", md->fnames[i]);
947
1053
log_debug ("%s: update %s in the space [%d, %d]\n",
948
1054
__func__, ds_var_name, value[0], value[1] );
949
dspaces_put(ds_var_name, 0, sizeof(int),
950
lb[didx[0]], lb[didx[1]], lb[didx[2]],
951
ub[didx[0]], ub[didx[1]], ub[didx[2]],
1055
elemsize = sizeof(int); ndim = 1;
1056
lb[0] = 0; ub[0] = 1;
1057
gdims[0] = (ub[0]-lb[0]+1) * dspaces_get_num_space_server();
1058
dspaces_define_gdim(ds_var_name, ndim, gdims);
1059
dspaces_put(ds_var_name, 0, elemsize, ndim, lb, ub, &value);
953
1060
log_debug("%s: call dspaces_put_sync()\n", __func__);
954
1061
dspaces_put_sync();
1063
log_debug("%s: call dspaces_unlock_on_write(%s), rank=%d\n", __func__, md->fnames[i], mype);
1064
dspaces_unlock_on_write(md->fnames[i], &mpi_comm);
956
log_debug("%s: call dspaces_unlock_on_write(%s), rank=%d\n", __func__, p->fnames[i], mype);
957
dspaces_unlock_on_write(p->fnames[i], &p->mpi_comm);
1066
free(md->fnames[i]);
1069
if (check_read_status == 2) {
1070
// Note: dspaces_lock_on_write() above is only called by single process (whose md->rank == 0). MPI_Barrier ensures all writer processes to wait until reader application fetches data of last version.
1071
MPI_Barrier(md->mpi_comm_init);
961
1073
// Free all previsouly allocated RDMA buffers
962
1074
dimes_put_sync_all();