13
// see if we have MPI or other tools
16
#include "public/adios.h"
17
#include "public/adios_types.h"
18
#include "core/adios_transport_hooks.h"
19
#include "core/adios_internals.h"
20
#include "core/adios_internals_mxml.h"
21
#include "core/util.h"
22
#include "core/ds_metadata.h"
23
#include "core/adios_logger.h"
25
#include "dataspaces.h"
27
/*#define DATASPACES_NO_VERSIONING define it at configure as -DDATASPACES_NO_VERSIONING in CFLAGS */
29
static int adios_dataspaces_initialized = 0;
30
#define MAX_DS_NAMELEN 128
31
#define MAX_NUM_OF_FILES 20
32
//static char ds_type_var_name[MAX_DS_NAMELEN];
33
static char ds_var_name[MAX_DS_NAMELEN];
34
static unsigned int adios_dataspaces_verbose = 3;
36
struct adios_ds_data_struct
38
int rank; // dataspaces rank or MPI rank if MPI is available
39
int peers; // from xml parameter or group communicator
40
int appid; // from xml parameter or 1
41
int time_index; // versioning in DataSpaces, start from 0
42
int n_writes; // how many times adios_write has been called
46
int num_of_files; // how many files do we have with this method
47
char *fnames[MAX_NUM_OF_FILES]; // names of files (needed at finalize)
48
int fversions[MAX_NUM_OF_FILES]; // last steps of files (needed at finalize)
53
static int get_dim_rank_value(struct adios_dimension_item_struct * dim_info, struct adios_group_struct *group)
60
struct adios_var_struct *dim_var = NULL;
61
dim_var = adios_find_var_by_id(group->vars, dim_info->id);
62
if(!dim_var || !dim_var->data)
66
switch ( dim_var->type )
68
case adios_unsigned_byte:
69
rank = (*(uint8_t*) dim_var->data);
72
rank = (*(int8_t*) dim_var->data);
74
case adios_unsigned_short:
75
rank = (*(uint16_t*) dim_var->data);
78
rank = (*(int16_t*) dim_var->data);
80
case adios_unsigned_integer:
81
rank = (*(uint32_t*) dim_var->data);
84
rank = (*(int32_t*) dim_var->data);
86
case adios_unsigned_long:
87
rank = (*(uint64_t*) dim_var->data);
90
rank = (*(int64_t*) dim_var->data);
100
return dim_info->rank;
105
static int connect_to_dspaces (struct adios_ds_data_struct * p, MPI_Comm comm)
110
if (!globals_adios_is_dataspaces_connected()) {
112
MPI_Comm_rank (comm, &(p->rank));
113
MPI_Comm_size (comm, &num_peers);
115
// Application ID should be set by the application calling adios_set_application_id()
117
p->appid = globals_adios_get_application_id (&was_set);
121
log_debug ("adios_dataspaces: rank=%d connect to DATASPACES, peers=%d, appid=%d \n",
122
p->rank, num_peers, p->appid);
124
//Init the dart client
125
ret = dspaces_init (num_peers, p->appid);
127
log_error ("adios_dataspaces: rank=%d Failed to connect to DATASPACES: err=%d, rank=%d\n", p->rank, ret);
132
dspaces_rank (&(p->rank));
133
dspaces_peers (&(p->peers));
136
log_debug ("adios_dataspaces: rank=%d connected to DATASPACES: peers=%d\n", p->rank, p->peers);
137
globals_adios_set_dataspaces_connected_from_writer();
143
void adios_dataspaces_init (const PairStruct * parameters,
144
struct adios_method_struct * method
147
struct adios_ds_data_struct *p = 0;
148
if (!adios_dataspaces_initialized)
150
adios_dataspaces_initialized = 1;
153
method->method_data = calloc (1, sizeof (struct adios_ds_data_struct));
154
p = (struct adios_ds_data_struct*)method->method_data;
159
//Init the static data structure
165
p->mpi_comm = MPI_COMM_NULL;
169
connect_to_dspaces (p, method->init_comm);
171
log_info ("adios_dataspaces_init: done\n");
177
int adios_dataspaces_open (struct adios_file_struct * fd,
178
struct adios_method_struct * method,
183
struct adios_ds_data_struct *p = (struct adios_ds_data_struct *)
186
log_info ("adios_dataspaces_open: open %s, mode=%d, time_index=%d \n",
187
fd->name, fd->mode, p->time_index);
190
// if we have MPI and a communicator, we can get the exact size of this application
191
// that we need to tell DATASPACES
193
MPI_Comm_rank (p->mpi_comm, &(p->rank));
194
MPI_Comm_size (p->mpi_comm, &(p->peers));
197
// connect to DATASPACES at the very first adios_open(), disconnect in adios_finalize()
198
// connect only if the READ API has not connected yet
200
ret = connect_to_dspaces (p, p->mpi_comm);
205
if (fd->mode == adios_mode_write || fd->mode == adios_mode_append)
207
log_debug ("adios_dataspaces_open: rank=%d call write lock...\n", p->rank);
208
dspaces_lock_on_write (fd->name, &p->mpi_comm);
209
log_debug ("adios_dataspaces_open: rank=%d got write lock\n", p->rank);
211
else if (fd->mode == adios_mode_read)
213
dspaces_lock_on_read (fd->name, &p->mpi_comm);
219
enum ADIOS_FLAG adios_dataspaces_should_buffer (struct adios_file_struct * fd
220
,struct adios_method_struct * method
224
//if (fd->shared_buffer == adios_flag_no && fd->mode != adios_mode_read)
226
// write the process group header
227
//adios_write_process_group_header_v1 (fd, fd->write_size_bytes);
228
//adios_write_open_vars_v1 (fd);
230
// log_warn("WARNING: %s expects that fd->shared_buffer is false\n", __func__);
234
return adios_flag_no; // this will take care of it
238
void adios_dataspaces_write (struct adios_file_struct * fd
239
,struct adios_var_struct * v
241
,struct adios_method_struct * method
244
struct adios_ds_data_struct *p = (struct adios_ds_data_struct *)
246
struct adios_group_struct *group = fd->group;
248
// FIXME: type size of a string >2GB does not fit to int.
249
// adios_get_type_size returns uint64_t but dspaces_put handles only int
251
int var_type_size = (int) adios_get_type_size(v->type, v->data);
253
char * var_name = v->name;
256
//Get two offset coordinate values
257
unsigned int version;
259
int dims[3]={1,1,1}, gdims[3]={0,0,0}, lb[3]={0,0,0}, ub[3]={0,0,0}; /* lower and upper bounds for DataSpaces */
260
int didx[3]; // for reordering the dimensions
263
struct adios_dimension_struct* var_dimensions = v->dimensions;
264
// Calculate lower and upper bounds for each available dimension (up to 3 dims)
265
while( var_dimensions && ndims < 3)
267
dims[ndims] = get_dim_rank_value(&(var_dimensions->dimension), group);
268
gdims[ndims] = get_dim_rank_value(&(var_dimensions->global_dimension), group);
269
lb[ndims] = get_dim_rank_value(&(var_dimensions->local_offset), group);
270
if (dims[ndims] > 0) {
271
ub[ndims] = lb[ndims] + dims[ndims] - 1;
274
// time dimension (ldim=0 indicates this). Leave out from the dimensions.
275
//ub[ndims] = lb[ndims];
278
var_dimensions = var_dimensions->next;
281
#ifdef DATASPACES_NO_VERSIONING
282
version = 0; /* Update/overwrite data in DataSpaces (we write time_index as a variable at close)*/
284
version = p->time_index; /* Add new data as separate to DataSpaces */
287
if (v->path != NULL && v->path[0] != '\0' && strcmp(v->path,"/"))
288
snprintf(ds_var_name, MAX_DS_NAMELEN, "%s/%s/%s/%s", fd->name, fd->group->name, v->path, v->name);
290
snprintf(ds_var_name, MAX_DS_NAMELEN, "%s/%s//%s", fd->name, fd->group->name, v->name);
292
//snprintf(dspaces_type_var_name, MAX_DS_NAMELEN, "TYPE@%s", ds_var_name);
294
/* non-global variables are put in space ONLY by rank = 0 process */
295
if (gdims[0] == 0 && p->rank != 0) {
296
//fprintf(stderr, "rank=%d var_name=%s is not global. Skip\n", p->rank, ds_var_name);
301
//if (fd->shared_buffer == adios_flag_no)
303
// var payload sent for sizing information
304
//adios_write_var_header_v1 (fd, v);
308
v->write_offset = 1; // only !=0 offsets will be included in build index
309
adios_generate_var_characteristics_v1 (fd, v); // characteristics will be included in build index
310
adios_write_var_characteristics_v1 (fd, v);
313
log_debug ("var_name=%s, type=%s(%d) elemsize=%d, version=%d, ndims=%d, size=(%d,%d,%d), gdim=(%d,%d,%d), lb=(%d,%d,%d), ub=(%d,%d,%d)\n",
314
ds_var_name, adios_type_to_string_int(v->type), v->type, var_type_size, version, ndims,
315
dims[0], dims[1], dims[2], gdims[0], gdims[1], gdims[2], lb[0], lb[1], lb[2], ub[0], ub[1], ub[2]);
317
/* non-timed scalars are written in the metadata at close(), not here */
318
if (ndims == 0 && !hastime)
321
/* Put type info as T<varname>, integer in 0,0,0,0,0,0 position */
322
//err = dspaces_put(dspaces_type_var_name, version, 4, 0,0,0,0,0,0, &(v->type));
324
ds_dimension_ordering(ndims,
325
group->adios_host_language_fortran == adios_flag_yes,
328
dspaces_put(ds_var_name, version, var_type_size,
329
lb[didx[0]], lb[didx[1]], lb[didx[2]],
330
ub[didx[0]], ub[didx[1]], ub[didx[2]],
333
log_debug ("var_name=%s, dimension ordering=(%d,%d,%d), gdims=(%d,%d,%d), lb=(%d,%d,%d), ub=(%d,%d,%d)\n",
335
didx[0], didx[1], didx[2],
336
gdims[didx[0]], gdims[didx[1]], gdims[didx[2]],
337
lb[didx[0]], lb[didx[1]], lb[didx[2]],
338
ub[didx[0]], ub[didx[1]], ub[didx[2]]);
342
void adios_dataspaces_get_write_buffer (struct adios_file_struct * fd
343
,struct adios_var_struct * v
346
,struct adios_method_struct * method
349
uint64_t mem_allowed;
358
if (v->data && v->free_data == adios_flag_yes)
360
adios_method_buffer_free (v->data_size);
365
mem_allowed = adios_method_buffer_alloc (*size);
366
if (mem_allowed == *size)
368
*buffer = malloc (*size);
371
adios_method_buffer_free (mem_allowed);
372
log_error ("ERROR: Out of memory allocating %llu bytes for %s in %s:%s()\n"
373
,*size, v->name, __FILE__, __func__
375
v->got_buffer = adios_flag_no;
376
v->free_data = adios_flag_no;
384
v->got_buffer = adios_flag_yes;
385
v->free_data = adios_flag_yes;
386
v->data_size = mem_allowed;
392
adios_method_buffer_free (mem_allowed);
393
log_error ("OVERFLOW: Cannot allocate requested buffer of %llu "
394
"bytes for %s in %s:%s()\n"
404
/* NOT IMPLEMENTED. Use the Read API to read variables */
405
void adios_dataspaces_read (struct adios_file_struct * fd
406
,struct adios_var_struct * v, void * buffer
407
,uint64_t buffer_size
408
,struct adios_method_struct * method
411
struct adios_ds_data_struct *p = (struct adios_ds_data_struct *)
413
uint64_t var_type_size = adios_get_type_size(v->type, v->data);
416
char * var_name = v->name;
418
//Get two offset coordinate values
419
int version, offset1[3],offset2[3];
421
memset(offset1, 0, 3*sizeof(int));
422
memset(offset2, 0, 3*sizeof(int));
423
memset(dim_size, 0, 3*sizeof(int));
425
version = p->time_index;
427
//dspaces_lock_on_read_();
431
//dspaces_unlock_on_read_();
434
/* Gather var/attr indices from all processes to rank 0 */
435
static void adios_dataspaces_gather_indices (struct adios_file_struct * fd
436
,struct adios_method_struct * method
437
,struct adios_index_process_group_struct_v1 **pg_root
438
,struct adios_index_var_struct_v1 **vars_root
439
,struct adios_index_attribute_struct_v1 ** attrs_root
442
struct adios_ds_data_struct *p = (struct adios_ds_data_struct *)
444
struct adios_index_process_group_struct_v1 * my_pg_root = 0;
445
struct adios_index_var_struct_v1 * my_vars_root = 0;
446
struct adios_index_attribute_struct_v1 * my_attrs_root = 0;
447
struct adios_index_process_group_struct_v1 * new_pg_root = 0;
448
struct adios_index_var_struct_v1 * new_vars_root = 0;
449
struct adios_index_attribute_struct_v1 * new_attrs_root = 0;
451
// build local index first appending to any existing index
452
adios_build_index_v1 (fd, &my_pg_root, &my_vars_root, &my_attrs_root);
454
log_debug ("%s index after first build is pg=%x vars=%x attrs=%x\n",
455
__func__, my_pg_root, my_vars_root, my_attrs_root);
458
// gather all on rank 0
459
if (p->mpi_comm != MPI_COMM_NULL)
463
int * index_sizes = malloc (4 * p->peers);
464
int * index_offsets = malloc (4 * p->peers);
465
char * recv_buffer = 0;
467
uint32_t total_size = 0;
469
struct adios_bp_buffer_struct_v1 b;
471
MPI_Gather (&size, 1, MPI_INT
472
,index_sizes, 1, MPI_INT
476
for (i = 0; i < p->peers; i++)
478
index_offsets [i] = total_size;
479
total_size += index_sizes [i];
482
recv_buffer = malloc (total_size);
484
MPI_Gatherv (&size, 0, MPI_BYTE
485
,recv_buffer, index_sizes, index_offsets
486
,MPI_BYTE, 0, p->mpi_comm
489
for (i = 1; i < p->peers; i++)
491
b.buff = recv_buffer + index_offsets [i];
492
b.length = index_sizes [i];
495
adios_parse_process_group_index_v1 (&b
498
adios_parse_vars_index_v1 (&b, &new_vars_root);
499
adios_parse_attributes_index_v1 (&b
502
adios_merge_index_v1 (&my_pg_root
505
,new_pg_root, new_vars_root
508
adios_clear_index_v1 (new_pg_root, new_vars_root, new_attrs_root);
516
free (index_offsets);
521
uint64_t buffer_size = 0;
522
uint64_t buffer_offset = 0;
524
adios_write_index_v1 (&buffer, &buffer_size, &buffer_offset
525
,0, my_pg_root ,my_vars_root ,my_attrs_root);
527
uint32_t tmp_buffer_size = (uint32_t) buffer_size;
528
MPI_Gather (&tmp_buffer_size, 1, MPI_INT, 0, 0, MPI_INT
531
MPI_Gatherv (buffer, buffer_size, MPI_BYTE
542
*pg_root = my_pg_root;
543
*vars_root = my_vars_root;
544
*attrs_root = my_attrs_root;
545
log_debug ("%s index after gathering is pg=%x vars=%x attrs=%x\n",
546
__func__, my_pg_root, my_vars_root, my_attrs_root);
549
static int ds_get_full_name_len (char * path, char * name)
553
if (!path || !path[0] || !strcmp (path, "/")) {
554
// no path, just name + leading /
555
len = strlen(name) + 1;
557
len = strlen(path) + strlen(name) + 1;
562
static int ds_get_full_name (char * path, char * name, int maxlen,
567
if (!path || !path[0] || !strcmp (path, "/")) {
568
// no path, just name + leading /
569
len = strlen(name) + 1;
571
strncpy(out+1, name, maxlen-1);
574
strncpy(out, path, maxlen-1); // path +
575
out[len] = '/'; // / +
576
strncpy(out+len+1, name, maxlen-len-1); // name
577
len += strlen(name) + 1;
582
void ds_pack_group_info (struct adios_file_struct *fd
583
,struct adios_method_struct * method
584
,struct adios_index_var_struct_v1 *vars_root
585
,struct adios_index_attribute_struct_v1 * attrs_root
586
,char ** buffer, int *buffer_size, int *nvars, int *nattrs
589
struct adios_ds_data_struct *p = (struct adios_ds_data_struct *)
591
struct adios_index_var_struct_v1 * v = vars_root;
592
struct adios_index_attribute_struct_v1 * a = attrs_root;
594
int ndims; // whatever the type of v->characteristics->dims.count is, we write an int to buffer
595
int hastime; // true if variable has time dimension
596
uint64_t ldims[10], gdims[10]; // we can write only 3 dimensions, will drop time dim
599
int didx[3]; // dimension ordering indices
601
log_debug ("%s entered\n", __func__);
603
/* First cycle: count the size of info to allocate index buffer */
604
size = 3*sizeof(int); //header for buffer: length, nvars, nattrs
606
size += 4*sizeof(int) // name len, type, hastime, number of dims
607
+ ds_get_full_name_len (v->var_path, v->var_name) // full path
608
+ 3 * 8; // always write 3 dimensions in the index (even for scalars)
609
if (v->characteristics->dims.count == 0) {
610
// For scalars, we write the value into the index
611
if (v->type != adios_string)
612
size += adios_get_type_size(v->type, NULL);
614
size += adios_get_type_size(v->type, v->characteristics->value) + sizeof(int);
616
log_debug (" var %s/%s, size = %d\n", v->var_path, v->var_name, size);
623
+ ds_get_full_name_len (a->attr_path, a->attr_name)
624
+ sizeof(int); // type
625
if (a->type != adios_string)
626
size += adios_get_type_size(a->type, NULL);
628
size += adios_get_type_size(a->type, a->characteristics->value) + sizeof(int);
630
log_debug (" attr %s/%s, size = %d\n", a->attr_path, a->attr_name, size);
635
// Required for Cray Gemini: align buffer to 8 bytes boundaries
636
int align_bytes = 0; // number of extra bytes at the end
638
align_bytes = 8 - (size % 8);
640
log_debug (" after alignment, size = %d, align_bytes = %d\n", size, align_bytes);
643
*buffer = (char *) malloc (size);
646
/* Second cycle: fill up the buffer */
653
//header for buffer: length, nvars, nattrs
654
memcpy (b, buffer_size, sizeof(int));
656
memcpy (b, nvars, sizeof(int));
658
memcpy (b, nattrs, sizeof(int));
661
namelen = ds_get_full_name (v->var_path, v->var_name, sizeof(name), name);
662
memcpy (b, &namelen, sizeof(int)); // length of full path
664
memcpy (b, name, namelen); // full path
666
memcpy (b, &(v->type), sizeof(int)); // type
668
//ndims = MAX(v->characteristics->dims.count,3); // convert whatever type to int
669
//memcpy (b, &(v->characteristics->dims.count), sizeof(int)); // number of dimensions
670
log_debug("Variable %s, total dims = %d\n", name, v->characteristics->dims.count);
671
j = 0; // we can write only 3 dims, will drop the time dimension
673
for (i = 0; i<v->characteristics->dims.count; i++) {
674
ldims[j] = v->characteristics->dims.dims[j*3]; // ith dimension
675
gdims[j] = v->characteristics->dims.dims[j*3+1]; // ith dimension
676
log_debug(" , ldim = %lld gdim = %lld)\n", ldims[j], gdims[j]);
677
if (gdims[j] == 0 && ldims[j] == 1) {
678
// time dimension's global=0, local=1, skip
679
// FIXME: This is true for a local array of length 1 (not defined as global)
680
log_debug(" skip this dimension )\n");
686
for (i=j; i<3; i++) {
687
// fill up dimensions up to 3rd dim
691
ndims = (j < 3 ? j : 3); // we can have max 3 dimensions in DataSpaces
692
memcpy (b, &hastime, sizeof(int)); // has time dimension?
693
log_debug(" has time = %d (%d)\n", hastime, *(int*)b);
695
memcpy (b, &ndims, sizeof(int)); // number of dimensions
696
log_debug(" ndims = %d (%d)\n", ndims, *(int*)b);
698
ds_dimension_ordering(ndims,
699
fd->group->adios_host_language_fortran == adios_flag_yes,
701
for (i = 0; i < 3; i++) {
702
if (gdims[didx[i]]) {
704
memcpy (b, &(gdims[didx[i]]), 8); // ith dimension
706
// a local variable has no global dimensions
707
// in space, its local dimensions become the global dimensions
708
memcpy (b, &(ldims[didx[i]]), 8); // ith dimension
712
if (v->characteristics->dims.count == 0) {
713
// NOTE: ndims = 0 can mean a timed scalar, which has no characteristics->value!
714
// store scalar value too
715
if (v->type != adios_string) {
716
size = adios_get_type_size(v->type, NULL);
717
memcpy (b, v->characteristics->value, size);
720
size = adios_get_type_size(v->type, v->characteristics->value);
721
memcpy (b, &size, sizeof(int));
723
memcpy (b, v->characteristics->value, size);
731
namelen = ds_get_full_name (a->attr_path, a->attr_name, sizeof(name), name);
732
memcpy (b, &namelen, sizeof(int)); // length of full path
734
memcpy (b, name, namelen); // full path
736
memcpy (b, &(a->type), sizeof(int)); // type
738
// store scalar value too
739
if (a->type != adios_string) {
740
size = adios_get_type_size(a->type, NULL);
741
memcpy (b, a->characteristics->value, size);
744
size = adios_get_type_size(a->type, a->characteristics->value);
745
memcpy (b, &size, sizeof(int));
747
memcpy (b, a->characteristics->value, size);
756
memcpy (b, &zero, align_bytes);
761
if ( (int)(b-*buffer) > *buffer_size) {
762
log_error ("ERROR in %s. Calculated group index buffer size as %d, but filled after that with %d bytes\n",
763
__func__, *buffer_size, (int)(b-*buffer));
765
// written buffer might be shorter than calculated since we skip time dimensions.
766
// set the correct size now
767
*buffer_size = (int)(b-*buffer);
768
memcpy (*buffer, buffer_size, sizeof(int));
771
log_debug(" %s: buffer length = %d, content:\n", __func__, *buffer_size);
773
for (i=0; i<*buffer_size; i+=16) {
774
for (j=0; j<4; j++) {
775
log_debug_cont ("%3.3hhu %3.3hhu %3.3hhu %3.3hhu ",
776
b[i+4*j], b[i+4*j+1], b[i+4*j+2], b[i+4*j+3]);
778
log_debug_cont("\n");
782
log_debug ("%s exit\n", __func__);
785
/* FIXME: put this function into ds_metadata.c */
786
/* buff is allocated and must be freed after use */
787
void ds_pack_file_info (int time, int nvars, int nattrs, int group_index_len, char * groupname,
788
/*OUT*/char **buf, /*OUT*/int *buf_len)
791
*buf = (char *) malloc (*buf_len);
794
int namelen = strlen(groupname);
795
memcpy (b, buf_len, sizeof(int)); /* 0-: length of this buffer */
797
memcpy (b, &time, sizeof(int)); /* 4-: time */
799
memcpy (b, &nvars, sizeof(int)); /* 8-: number of variables */
801
memcpy (b, &nattrs, sizeof(int)); /* 12-: number of attributes */
803
memcpy (b, &group_index_len, sizeof(int)); /* 16-: length of group index*/
805
memcpy (b, &namelen, sizeof(int)); /* 20-: length of group name */
807
memcpy (b, groupname, namelen); /* 24-: group name */
811
void adios_dataspaces_close (struct adios_file_struct * fd
812
,struct adios_method_struct * method
815
struct adios_ds_data_struct *p = (struct adios_ds_data_struct *)
817
struct adios_index_process_group_struct_v1 * pg_root;
818
struct adios_index_var_struct_v1 * vars_root;
819
struct adios_index_attribute_struct_v1 * attrs_root;
820
struct adios_attribute_struct * a = fd->group->attributes;
821
int lb[3], ub[3], didx[3]; // for reordering DS dimensions
822
unsigned int version;
824
if (fd->mode == adios_mode_write || fd->mode == adios_mode_append)
826
// finalize variable info in fd buffer, next we call build_index
828
a->write_offset = 1; // only attributes with !=0 offset will be included in build index
832
//adios_write_close_vars_v1 (fd);
833
/* Gather var/attr indices from all processes to rank 0 */
834
adios_dataspaces_gather_indices (fd, method, &pg_root, &vars_root ,&attrs_root);
836
// make sure all processes have finished putting data to the space
837
// before we put metadata from rank 0
838
MPI_Barrier (p->mpi_comm);
842
/* Write two adios specific variables with the name of the file and name of the group into the space */
843
/* ADIOS Read API fopen() checks these variables to see if writing already happened */
844
#ifdef DATASPACES_NO_VERSIONING
845
version = 0; /* Update/overwrite data in DataSpaces */
847
version = p->time_index; /* Add new data as separate to DataSpaces */
850
/* Make metadata from indices */
854
ds_pack_group_info (fd, method, vars_root, attrs_root,
855
&indexbuf, &indexlen, &nvars, &nattrs);
858
/* Put GROUP@fn/gn header into space */
859
snprintf(ds_var_name, MAX_DS_NAMELEN, "GROUP@%s/%s", fd->name, fd->group->name);
860
log_debug ("%s: put %s with buf len %d into space\n", __func__, ds_var_name, indexlen);
861
ub[0] = indexlen-1; ub[1] = 0; ub[2] = 0;
862
ds_dimension_ordering(1, 0, 0, didx); // C ordering of 1D array into DS
863
dspaces_put(ds_var_name, version, 1, 0, 0, 0, /* lb 0..2 */
864
ub[didx[0]], ub[didx[1]], ub[didx[2]], indexbuf);
867
/* Create and put FILE@fn header into space */
868
char * file_info_buf; /* store FILE@fn's group list */
869
int file_info_buf_len; /* = 128 currently */
870
snprintf (ds_var_name, MAX_DS_NAMELEN, "FILE@%s", fd->name);
871
ds_pack_file_info (p->time_index, nvars, nattrs, indexlen, fd->group->name,
872
&file_info_buf, &file_info_buf_len);
873
log_debug ("%s: put %s = buflen=%d time=%d nvars=%d nattr=%d index=%d name=%d:%s into space\n",
874
__func__, ds_var_name,
875
*(int*)file_info_buf, *(int*)(file_info_buf+4),
876
*(int*)(file_info_buf+8), *(int*)(file_info_buf+12),
877
*(int*)(file_info_buf+16), *(int*)(file_info_buf+20),
879
/* Flip 1st and 2nd dimension for DataSpaces representation for a 1D array*/
880
ub[0] = file_info_buf_len-1; ub[1] = 0; ub[2] = 0;
881
ds_dimension_ordering(1, 0, 0, didx); // C ordering of 1D array into DS
882
dspaces_put_sync(); //wait on previous put to finish
883
dspaces_put(ds_var_name, version, 1, 0, 0, 0, /* lb 0..2 */
884
ub[didx[0]], ub[didx[1]], ub[didx[2]], file_info_buf);
886
/* Create and put VERSION@fn version info into space */
887
int version_buf[2] = {version, 0}; /* last version put in space; not terminated */
888
int version_buf_len = 2;
889
snprintf (ds_var_name, MAX_DS_NAMELEN, "VERSION@%s", fd->name);
890
log_debug ("%s: put %s with buf = [%d,%d] (len=%d integers) into space\n",
891
__func__, ds_var_name, version_buf[0], version_buf[1], version_buf_len);
892
ub[0] = version_buf_len-1; ub[1] = 0; ub[2] = 0;
893
ds_dimension_ordering(1, 0, 0, didx); // C ordering of 1D array into DS
894
dspaces_put_sync(); //wait on previous put to finish
895
dspaces_put(ds_var_name, 0, sizeof(int), 0, 0, 0, /* lb 0..2 */
896
ub[didx[0]], ub[didx[1]], ub[didx[2]], version_buf);
897
dspaces_put_sync(); //wait on previous put to finish
901
// remember this filename and its version for finalize
903
for (i=0; i<p->num_of_files; i++) {
904
if (!strcmp(fd->name, p->fnames[i]))
907
if (i == p->num_of_files) {
908
if (p->num_of_files < MAX_NUM_OF_FILES) {
909
p->fnames[ p->num_of_files ] = strdup(fd->name);
912
log_error ("%s: Max %d files can be written by one application "
913
"using the DATASPACES method\n",
914
__func__, MAX_NUM_OF_FILES);
917
if (i < p->num_of_files) {
918
p->fversions[i] = version;
922
// free allocated index lists
923
adios_clear_index_v1 (pg_root, vars_root, attrs_root);
925
// rank=0 may be in put_sync when others call unlock, which is a global op
926
MPI_Barrier (p->mpi_comm);
927
//log_debug("%s: call dspaces_put_sync()\n", __func__);
928
//dspaces_put_sync();
929
log_debug("%s: call dspaces_unlock_on_write(%s)\n", __func__, fd->name);
930
dspaces_unlock_on_write(fd->name, &p->mpi_comm);
932
else if( fd->mode == adios_mode_read )
934
dspaces_unlock_on_read(fd->name, &p->mpi_comm);
937
/* Increment the time index */
941
log_info ("%s: exit\n", __func__);
944
void adios_dataspaces_finalize (int mype, struct adios_method_struct * method)
946
struct adios_ds_data_struct *p = (struct adios_ds_data_struct *)
949
char ds_var_name[MAX_DS_NAMELEN];
951
int ub[3] = {1,0,0}; // we put 2 integers to space,
952
int didx[3]; // for reordering DS dimensions
953
int value[2] = {0, 1}; // integer to be written to space (terminated=1)
955
// tell the readers which files are finalized
956
ds_dimension_ordering(1, 0, 0, didx); // C ordering of 1D array into DS
957
for (i=0; i<p->num_of_files; i++) {
958
/* Put VERSION@fn into space. Indicates that this file will not be extended anymore. */
959
log_debug("%s: call dspaces_lock_on_write(%s), rank=%d\n", __func__, p->fnames[i], mype);
960
dspaces_lock_on_write(p->fnames[i], &p->mpi_comm); // lock is global operation in DataSpaces
962
value[0] = p->fversions[i];
963
snprintf(ds_var_name, MAX_DS_NAMELEN, "VERSION@%s", p->fnames[i]);
964
log_debug ("%s: update %s in the space [%d, %d]\n",
965
__func__, ds_var_name, value[0], value[1] );
966
dspaces_put(ds_var_name, 0, sizeof(int),
967
lb[didx[0]], lb[didx[1]], lb[didx[2]],
968
ub[didx[0]], ub[didx[1]], ub[didx[2]],
970
log_debug("%s: call dspaces_put_sync()\n", __func__);
973
log_debug("%s: call dspaces_unlock_on_write(%s), rank=%d\n", __func__, p->fnames[i], mype);
974
dspaces_unlock_on_write(p->fnames[i], &p->mpi_comm);
978
// disconnect from dataspaces if we are connected from writer but not anymore from reader
979
if (globals_adios_is_dataspaces_connected_from_writer() &&
980
!globals_adios_is_dataspaces_connected_from_both())
982
log_debug ("%s: call dspaces_barrier(), rank=%d\n", __func__,mype);
984
log_debug ("%s: call dspaces_finalize(), rank=%d\n", __func__,mype);
988
globals_adios_set_dataspaces_disconnected_from_writer();
990
adios_dataspaces_initialized = 0;
992
log_info("%s: exit\n", __func__);
995
void adios_dataspaces_end_iteration (struct adios_method_struct * method)
999
void adios_dataspaces_start_calculation (struct adios_method_struct * method)
1003
void adios_dataspaces_stop_calculation (struct adios_method_struct * method)